You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/03/25 13:21:13 UTC

[incubator-seatunnel] branch dev updated: [Feature][Transform] Support copy field list (#4404)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c048ee09b [Feature][Transform] Support copy field list (#4404)
c048ee09b is described below

commit c048ee09b2f7c51d1a3fb177cd3df4e8e5f422c6
Author: hailin0 <wa...@apache.org>
AuthorDate: Sat Mar 25 21:21:05 2023 +0800

    [Feature][Transform] Support copy field list (#4404)
---
 docs/en/transform-v2/copy.md                       |  37 +++---
 .../src/test/resources/copy_transform.conf         |  40 +++++-
 .../common/MultipleFieldOutputTransform.java       |  29 +++--
 .../transform/{ => copy}/CopyFieldTransform.java   | 138 ++++++++++++++-------
 .../{ => copy}/CopyFieldTransformFactory.java      |  23 +++-
 .../transform/copy/CopyTransformConfig.java        |  71 +++++++++++
 .../transform/CopyFieldTransformFactoryTest.java   |   2 +
 7 files changed, 260 insertions(+), 80 deletions(-)

diff --git a/docs/en/transform-v2/copy.md b/docs/en/transform-v2/copy.md
index 956b8c277..7a0e73f44 100644
--- a/docs/en/transform-v2/copy.md
+++ b/docs/en/transform-v2/copy.md
@@ -8,18 +8,13 @@ Copy a field to a new field.
 
 ## Options
 
-|    name    |  type  | required | default value |
-|------------|--------|----------|---------------|
-| src_field  | string | yes      |               |
-| dest_field | string | yes      |               |
+|  name  |  type  | required | default value |
+|--------|--------|----------|---------------|
+| fields | Object | yes      |               |
 
-### src_field [string]
+### fields [config]
 
-Src field name you want to copy
-
-### dest_field [string]
-
-This dest field name
+Specify the field copy relationship between input and output
 
 ### common options [string]
 
@@ -36,31 +31,35 @@ The data read from source is a table like this:
 | Kin Dom  | 20  | 123  |
 | Joy Dom  | 20  | 123  |
 
-We want copy field `name` to a new field `name1`, we can add `Copy` Transform like this
+We want copy fields `name`、`age` to a new fields `name1`、`name2`、`age1`, we can add `Copy` Transform like this
 
 ```
 transform {
   Copy {
     source_table_name = "fake"
     result_table_name = "fake1"
-    src_field = "name"
-    dest_field = "name1"
+    fields {
+      name1 = name
+      name2 = name
+      age1 = age
+    }
   }
 }
 ```
 
 Then the data in result table `fake1` will like this
 
-|   name   | age | card |  name1   |
-|----------|-----|------|----------|
-| Joy Ding | 20  | 123  | Joy Ding |
-| May Ding | 20  | 123  | May Ding |
-| Kin Dom  | 20  | 123  | Kin Dom  |
-| Joy Dom  | 20  | 123  | Joy Dom  |
+|   name   | age | card |  name1   |  name2   | age1 |
+|----------|-----|------|----------|----------|------|
+| Joy Ding | 20  | 123  | Joy Ding | Joy Ding | 20   |
+| May Ding | 20  | 123  | May Ding | May Ding | 20   |
+| Kin Dom  | 20  | 123  | Kin Dom  | Kin Dom  | 20   |
+| Joy Dom  | 20  | 123  | Joy Dom  | Joy Dom  | 20   |
 
 ## Changelog
 
 ### new version
 
 - Add Copy Transform Connector
+- Support copy fields to a new fields
 
diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/copy_transform.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/copy_transform.conf
index cd29bc156..68c9e4e52 100644
--- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/copy_transform.conf
+++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/copy_transform.conf
@@ -42,14 +42,23 @@ transform {
     src_field = "name"
     dest_field = "name1"
   }
+  Copy {
+    source_table_name = "fake1"
+    result_table_name = "fake2"
+    fields {
+      id_1 = "id"
+      name2 = "name"
+      name3 = "name"
+    }
+  }
 }
 
 sink {
   Console {
-    source_table_name = "fake1"
+    source_table_name = "fake2"
   }
   Assert {
-    source_table_name = "fake1"
+    source_table_name = "fake2"
     rules =
       {
         row_rules = [
@@ -68,6 +77,15 @@ sink {
               }
             ]
           },
+          {
+            field_name = id_1
+            field_type = int
+            field_value = [
+              {
+                rule_type = NOT_NULL
+              }
+            ]
+          },
           {
             field_name = name
             field_type = string
@@ -86,6 +104,24 @@ sink {
               }
             ]
           }
+          {
+            field_name = name2
+            field_type = string
+            field_value = [
+              {
+                rule_type = NOT_NULL
+              }
+            ]
+          }
+          {
+            field_name = name3
+            field_type = string
+            field_value = [
+              {
+                rule_type = NOT_NULL
+              }
+            ]
+          }
         ]
       }
   }
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
index 438435df2..2609ced1b 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
@@ -32,6 +32,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 @Slf4j
@@ -184,7 +185,7 @@ public abstract class MultipleFieldOutputTransform extends AbstractCatalogSuppor
                 TableSchema.builder()
                         .primaryKey(inputCatalogTable.getTableSchema().getPrimaryKey())
                         .constraintKey(inputCatalogTable.getTableSchema().getConstraintKeys());
-        List<Column> copyInputColumns =
+        List<Column> columns =
                 inputCatalogTable.getTableSchema().getColumns().stream()
                         .map(Column::copy)
                         .collect(Collectors.toList());
@@ -192,21 +193,31 @@ public abstract class MultipleFieldOutputTransform extends AbstractCatalogSuppor
         int addFieldCount = 0;
         this.fieldsIndex = new int[outputColumns.length];
         for (int i = 0; i < outputColumns.length; i++) {
-            for (int j = 0; j < copyInputColumns.size(); j++) {
-                if (copyInputColumns.get(j).getName().equals(outputColumns[i].getName())) {
-                    copyInputColumns.set(j, outputColumns[i]);
-                } else {
-                    addFieldCount++;
-                    copyInputColumns.add(outputColumns[i]);
+            Column outputColumn = outputColumns[i];
+            Optional<Column> optional =
+                    columns.stream()
+                            .filter(c -> c.getName().equals(outputColumn.getName()))
+                            .findFirst();
+            if (optional.isPresent()) {
+                Column originalColumn = optional.get();
+                int originalColumnIndex = columns.indexOf(originalColumn);
+                if (!originalColumn.getDataType().equals(outputColumn.getDataType())) {
+                    columns.set(
+                            originalColumnIndex, originalColumn.copy(outputColumn.getDataType()));
                 }
+                fieldsIndex[i] = originalColumnIndex;
+            } else {
+                addFieldCount++;
+                columns.add(outputColumn);
+                fieldsIndex[i] = columns.indexOf(outputColumn);
             }
         }
 
-        TableSchema outputTableSchema = builder.columns(copyInputColumns).build();
+        TableSchema outputTableSchema = builder.columns(columns).build();
         if (addFieldCount > 0) {
             int inputFieldLength =
                     inputCatalogTable.getTableSchema().toPhysicalRowDataType().getTotalFields();
-            int outputFieldLength = copyInputColumns.size();
+            int outputFieldLength = columns.size();
 
             rowContainerGenerator =
                     new SeaTunnelRowContainerGenerator() {
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/CopyFieldTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransform.java
similarity index 50%
rename from seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/CopyFieldTransform.java
rename to seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransform.java
index 7a60d71b8..b50a195ec 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/CopyFieldTransform.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransform.java
@@ -15,89 +15,139 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.transform;
+package org.apache.seatunnel.transform.copy;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.ConfigValidator;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
 import org.apache.seatunnel.api.table.type.ArrayType;
 import org.apache.seatunnel.api.table.type.MapType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform;
 import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
-import org.apache.seatunnel.transform.common.SingleFieldOutputTransform;
 
 import com.google.auto.service.AutoService;
+import lombok.NoArgsConstructor;
 
 import java.lang.reflect.Array;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
+@NoArgsConstructor
 @AutoService(SeaTunnelTransform.class)
-public class CopyFieldTransform extends SingleFieldOutputTransform {
-
-    public static final Option<String> SRC_FIELD =
-            Options.key("src_field")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("Src field you want to copy");
-
-    public static final Option<String> DEST_FIELD =
-            Options.key("dest_field")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("Copy Src field to Dest field");
-
-    private String srcField;
-    private int srcFieldIndex;
-    private SeaTunnelDataType srcFieldDataType;
-    private String destField;
+public class CopyFieldTransform extends MultipleFieldOutputTransform {
+    public static final String PLUGIN_NAME = "Copy";
+
+    private CopyTransformConfig config;
+    private List<String> fieldNames;
+    private List<Integer> fieldOriginalIndexs;
+    private List<SeaTunnelDataType> fieldTypes;
+
+    public CopyFieldTransform(CopyTransformConfig copyTransformConfig, CatalogTable catalogTable) {
+        super(catalogTable);
+        this.config = copyTransformConfig;
+        SeaTunnelRowType seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType();
+        initOutputFields(seaTunnelRowType, config.getFields());
+    }
 
     @Override
     public String getPluginName() {
-        return "Copy";
+        return PLUGIN_NAME;
     }
 
     @Override
     protected void setConfig(Config pluginConfig) {
-        CheckResult checkResult =
-                CheckConfigUtil.checkAllExists(pluginConfig, SRC_FIELD.key(), DEST_FIELD.key());
-        if (!checkResult.isSuccess()) {
-            throw new IllegalArgumentException("Failed to check config! " + checkResult.getMsg());
-        }
-
-        this.srcField = pluginConfig.getString(SRC_FIELD.key());
-        this.destField = pluginConfig.getString(DEST_FIELD.key());
+        ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig))
+                .validate(new CopyFieldTransformFactory().optionRule());
+        this.config = CopyTransformConfig.of(ReadonlyConfig.fromConfig(pluginConfig));
     }
 
     @Override
     protected void setInputRowType(SeaTunnelRowType inputRowType) {
-        srcFieldIndex = inputRowType.indexOf(srcField);
-        if (srcFieldIndex == -1) {
-            throw new IllegalArgumentException(
-                    "Cannot find [" + srcField + "] field in input row type");
+        initOutputFields(inputRowType, config.getFields());
+    }
+
+    private void initOutputFields(
+            SeaTunnelRowType inputRowType, LinkedHashMap<String, String> fields) {
+        List<String> fieldNames = new ArrayList<>();
+        List<Integer> fieldOriginalIndexs = new ArrayList<>();
+        List<SeaTunnelDataType> fieldsType = new ArrayList<>();
+        for (Map.Entry<String, String> field : fields.entrySet()) {
+            String srcField = field.getValue();
+            int srcFieldIndex = inputRowType.indexOf(srcField);
+            if (srcFieldIndex == -1) {
+                throw new IllegalArgumentException(
+                        "Cannot find [" + srcField + "] field in input row type");
+            }
+            fieldNames.add(field.getKey());
+            fieldOriginalIndexs.add(srcFieldIndex);
+            fieldsType.add(inputRowType.getFieldType(srcFieldIndex));
         }
-        srcFieldDataType = inputRowType.getFieldType(srcFieldIndex);
+        this.fieldNames = fieldNames;
+        this.fieldOriginalIndexs = fieldOriginalIndexs;
+        this.fieldTypes = fieldsType;
     }
 
     @Override
-    protected String getOutputFieldName() {
-        return destField;
+    protected String[] getOutputFieldNames() {
+        return fieldNames.toArray(new String[0]);
     }
 
     @Override
-    protected SeaTunnelDataType getOutputFieldDataType() {
-        return srcFieldDataType;
+    protected SeaTunnelDataType[] getOutputFieldDataTypes() {
+        return fieldTypes.toArray(new SeaTunnelDataType[0]);
     }
 
     @Override
-    protected Object getOutputFieldValue(SeaTunnelRowAccessor inputRow) {
-        return clone(srcFieldDataType, inputRow.getField(srcFieldIndex));
+    protected Column[] getOutputColumns() {
+        if (inputCatalogTable == null) {
+            Column[] columns = new Column[fieldNames.size()];
+            for (int i = 0; i < fieldNames.size(); i++) {
+                columns[i] =
+                        PhysicalColumn.of(fieldNames.get(i), fieldTypes.get(i), 200, true, "", "");
+            }
+            return columns;
+        }
+
+        Map<String, Column> catalogTableColumns =
+                inputCatalogTable.getTableSchema().getColumns().stream()
+                        .collect(Collectors.toMap(column -> column.getName(), column -> column));
+
+        List<Column> columns = new ArrayList<>();
+        for (Map.Entry<String, String> copyField : config.getFields().entrySet()) {
+            Column srcColumn = catalogTableColumns.get(copyField.getValue());
+            PhysicalColumn destColumn =
+                    PhysicalColumn.of(
+                            copyField.getKey(),
+                            srcColumn.getDataType(),
+                            srcColumn.getColumnLength(),
+                            srcColumn.isNullable(),
+                            srcColumn.getDefaultValue(),
+                            srcColumn.getComment());
+            columns.add(destColumn);
+        }
+        return columns.toArray(new Column[0]);
+    }
+
+    @Override
+    protected Object[] getOutputFieldValues(SeaTunnelRowAccessor inputRow) {
+        Object[] fieldValues = new Object[fieldNames.size()];
+        for (int i = 0; i < fieldOriginalIndexs.size(); i++) {
+            fieldValues[i] =
+                    clone(fieldTypes.get(i), inputRow.getField(fieldOriginalIndexs.get(i)));
+        }
+        return fieldValues;
     }
 
     private Object clone(SeaTunnelDataType dataType, Object value) {
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/CopyFieldTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransformFactory.java
similarity index 59%
rename from seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/CopyFieldTransformFactory.java
rename to seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransformFactory.java
index d6819339d..2271fa641 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/CopyFieldTransformFactory.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransformFactory.java
@@ -15,26 +15,37 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.transform;
+package org.apache.seatunnel.transform.copy;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableTransform;
 import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableFactoryContext;
 import org.apache.seatunnel.api.table.factory.TableTransformFactory;
 
 import com.google.auto.service.AutoService;
-
-import static org.apache.seatunnel.transform.CopyFieldTransform.DEST_FIELD;
-import static org.apache.seatunnel.transform.CopyFieldTransform.SRC_FIELD;
+import lombok.NonNull;
 
 @AutoService(Factory.class)
 public class CopyFieldTransformFactory implements TableTransformFactory {
     @Override
     public String factoryIdentifier() {
-        return "Copy";
+        return CopyFieldTransform.PLUGIN_NAME;
     }
 
     @Override
     public OptionRule optionRule() {
-        return OptionRule.builder().required(SRC_FIELD, DEST_FIELD).build();
+        return OptionRule.builder()
+                .bundled(CopyTransformConfig.SRC_FIELD, CopyTransformConfig.DEST_FIELD)
+                .bundled(CopyTransformConfig.FIELDS)
+                .build();
+    }
+
+    @Override
+    public TableTransform createTransform(@NonNull TableFactoryContext context) {
+        CopyTransformConfig copyTransformConfig = CopyTransformConfig.of(context.getOptions());
+        CatalogTable catalogTable = context.getCatalogTable();
+        return () -> new CopyFieldTransform(copyTransformConfig, catalogTable);
     }
 }
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyTransformConfig.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyTransformConfig.java
new file mode 100644
index 000000000..276424830
--- /dev/null
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyTransformConfig.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.copy;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import java.io.Serializable;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+
+@Getter
+@Setter
+public class CopyTransformConfig implements Serializable {
+    @Deprecated
+    public static final Option<String> SRC_FIELD =
+            Options.key("src_field")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Src field you want to copy");
+
+    @Deprecated
+    public static final Option<String> DEST_FIELD =
+            Options.key("dest_field")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Copy Src field to Dest field");
+
+    public static final Option<Map<String, String>> FIELDS =
+            Options.key("fields")
+                    .mapType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Specify the field copy relationship between input and output");
+
+    private LinkedHashMap<String, String> fields;
+
+    public static CopyTransformConfig of(ReadonlyConfig config) {
+        LinkedHashMap<String, String> fields = new LinkedHashMap<>();
+        Optional<Map<String, String>> optional = config.getOptional(FIELDS);
+        if (optional.isPresent()) {
+            fields.putAll(config.get(FIELDS));
+        } else {
+            fields.put(config.get(DEST_FIELD), config.get(SRC_FIELD));
+        }
+
+        CopyTransformConfig copyTransformConfig = new CopyTransformConfig();
+        copyTransformConfig.setFields(fields);
+        return copyTransformConfig;
+    }
+}
diff --git a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/CopyFieldTransformFactoryTest.java b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/CopyFieldTransformFactoryTest.java
index f1f905a80..9f22dbbbf 100644
--- a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/CopyFieldTransformFactoryTest.java
+++ b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/CopyFieldTransformFactoryTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.transform;
 
+import org.apache.seatunnel.transform.copy.CopyFieldTransformFactory;
+
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;