You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wa...@apache.org on 2023/01/03 06:34:21 UTC

[incubator-seatunnel] branch dev updated: [Feature][Transform-V2] add FieldMapper Transform (#3781)

This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1118c8317 [Feature][Transform-V2] add FieldMapper Transform (#3781)
1118c8317 is described below

commit 1118c8317b18d047abce3253a1b47777b595d6a8
Author: Eric <ga...@gmail.com>
AuthorDate: Tue Jan 3 14:34:16 2023 +0800

    [Feature][Transform-V2] add FieldMapper Transform (#3781)
    
    * add FieldMapper Transform
---
 .../connector-v2/{EnvConf.md => JobEnvConfig.md}   |   2 +-
 docs/en/transform-v2/field-mapper.md               |  64 +++++++++++
 docs/sidebars.js                                   |   2 +-
 .../seatunnel/e2e/transform/TestFieldMapperIT.java |  35 ++++++
 .../src/test/resources/field_mapper_transform.conf | 109 ++++++++++++++++++
 .../seatunnel/transform/FieldMapperTransform.java  | 122 +++++++++++++++++++++
 .../transform/FieldMapperTransformFactory.java     |  37 +++++++
 .../exception/FieldMapperTransformErrorCode.java   |  43 ++++++++
 .../exception/FieldMapperTransformException.java   |  36 ++++++
 .../transform/FieldMapperTransformFactoryTest.java |  30 +++++
 10 files changed, 478 insertions(+), 2 deletions(-)

diff --git a/docs/en/connector-v2/EnvConf.md b/docs/en/connector-v2/JobEnvConfig.md
similarity index 97%
rename from docs/en/connector-v2/EnvConf.md
rename to docs/en/connector-v2/JobEnvConfig.md
index d0616d039..195012f7e 100644
--- a/docs/en/connector-v2/EnvConf.md
+++ b/docs/en/connector-v2/JobEnvConfig.md
@@ -1,4 +1,4 @@
-# EnvConf
+# JobEnvConfig
 
 This document describes env configuration information,env unifies the environment variables of all engines.
 
diff --git a/docs/en/transform-v2/field-mapper.md b/docs/en/transform-v2/field-mapper.md
new file mode 100644
index 000000000..32b866e9e
--- /dev/null
+++ b/docs/en/transform-v2/field-mapper.md
@@ -0,0 +1,64 @@
+# FieldMapper
+
+> FieldMapper transform plugin
+
+## Description
+
+Add input schema and output schema mapping.
+
+## Options
+
+| name               | type   | required | default value |
+|--------------------|--------| -------- |---------------|
+| field_mapper       | Object | yes      |               |
+
+### field_mapper [config]
+
+Specify the field mapping relationship between input and output
+
+### common options [config]
+
+Transform plugin common parameters, please refer to [Transform Plugin](common-options.md) for details.
+
+## Example
+
+The data read from source is a table like this:
+
+| id  | name     | age | card |
+|-----|----------|-----|------|
+| 1   | Joy Ding | 20  | 123  |
+| 2   | May Ding | 20  | 123  |
+| 3   | Kin Dom  | 20  | 123  |
+| 4   | Joy Dom  | 20  | 123  |
+
+We want to delete `age` field and update the filed order to `id`, `card`, `name` and rename `name` to `new_name`. We can add `FieldMapper` transform like this
+
+```
+transform {
+  FieldMapper {
+    source_table_name = "fake"
+    result_table_name = "fake1"
+    field_mapper = {
+        id = id
+        card = card
+        name = new_name
+    }
+  }
+}
+```
+
+Then the data in result table `fake1` will like this
+
+| id  | card | new_name |
+|-----|------|----------|
+| 1   | 123  | Joy Ding |
+| 2   | 123  | May Ding |
+| 3   | 123  | Kin Dom  |
+| 4   | 123  | Joy Dom  |
+
+
+## Changelog
+
+### new version
+
+- Add Copy Transform Connector
\ No newline at end of file
diff --git a/docs/sidebars.js b/docs/sidebars.js
index ab4c1cef1..550e02338 100644
--- a/docs/sidebars.js
+++ b/docs/sidebars.js
@@ -132,8 +132,8 @@ const sidebars = {
                         }
                     ]
                 },
+                "connector-v2/JobEnvConfig",
                 "connector-v2/Error-Quick-Reference-Manual",
-                "connector-v2/EnvConf"
             ]
         },
         {
diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/java/org/apache/seatunnel/e2e/transform/TestFieldMapperIT.java b/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/java/org/apache/seatunnel/e2e/transform/TestFieldMapperIT.java
new file mode 100644
index 000000000..d1d61c938
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/java/org/apache/seatunnel/e2e/transform/TestFieldMapperIT.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.transform;
+
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+public class TestFieldMapperIT extends TestSuiteBase {
+
+    @TestTemplate
+    public void testFieldMapper(TestContainer container) throws IOException, InterruptedException {
+        Container.ExecResult execResult = container.executeJob("/field_mapper_transform.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/resources/field_mapper_transform.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/resources/field_mapper_transform.conf
new file mode 100644
index 000000000..2071e66d5
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/resources/field_mapper_transform.conf
@@ -0,0 +1,109 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    result_table_name = "fake"
+    row.num = 100
+    schema = {
+      fields {
+        id = "int"
+        name = "string"
+        age = "int"
+        string1 = "string"
+        int1 = "int"
+        c_bigint = "bigint"
+      }
+    }
+  }
+}
+
+transform {
+  FieldMapper {
+    source_table_name = "fake"
+    result_table_name = "fake1"
+    field_mapper = {
+        id = id
+        age = age_as
+        int1 = int1_as
+        name = name
+    }
+  }
+}
+
+sink {
+  Console {
+    source_table_name = "fake1"
+  }
+  Assert {
+    source_table_name = "fake1"
+    rules =
+      {
+        row_rules = [
+          {
+            rule_type = MIN_ROW
+            rule_value = 100
+          }
+        ],
+        field_rules = [
+          {
+            field_name = id
+            field_type = int
+            field_value = [
+              {
+                rule_type = NOT_NULL
+              }
+            ]
+          },
+          {
+            field_name = age_as
+            field_type = int
+            field_value = [
+                {
+                  rule_type = NOT_NULL
+                }
+            ]
+          },
+          {
+              field_name = int1_as
+              field_type = int
+              field_value = [
+                  {
+                    rule_type = NOT_NULL
+                  }
+              ]
+          },
+          {
+            field_name = name
+            field_type = string
+            field_value = [
+              {
+                rule_type = NOT_NULL
+              }
+            ]
+          }
+        ]
+      }
+  }
+}
\ No newline at end of file
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FieldMapperTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FieldMapperTransform.java
new file mode 100644
index 000000000..346740f67
--- /dev/null
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FieldMapperTransform.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform;
+
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.transform.common.AbstractSeaTunnelTransform;
+import org.apache.seatunnel.transform.exception.FieldMapperTransformErrorCode;
+import org.apache.seatunnel.transform.exception.FieldMapperTransformException;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.auto.service.AutoService;
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+@Slf4j
+@AutoService(SeaTunnelTransform.class)
+public class FieldMapperTransform extends AbstractSeaTunnelTransform {
+    public static final Option<Map<String, String>> FIELD_MAPPER = Options.key("field_mapper")
+        .mapType()
+        .noDefaultValue()
+        .withDescription("Specify the field mapping relationship between input and output");
+
+    private LinkedHashMap<String, String> fieldMapper = new LinkedHashMap<>();
+
+    private List<Integer> needReaderColIndex;
+
+    @Override
+    public String getPluginName() {
+        return "FieldMapper";
+    }
+
+    @Override
+    protected void setConfig(Config pluginConfig) {
+        if (!pluginConfig.hasPath(FIELD_MAPPER.key())) {
+            throw new FieldMapperTransformException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, "The configuration missing key: " + FIELD_MAPPER);
+        }
+        this.fieldMapper = convertConfigToSortedMap(pluginConfig.getConfig(FIELD_MAPPER.key()));
+    }
+
+    private static LinkedHashMap<String, String> convertConfigToSortedMap(Config config) {
+        // Because the entrySet in typesafe config couldn't keep key-value order
+        // So use jackson parsing schema information into a map to keep key-value order
+        ConfigRenderOptions options = ConfigRenderOptions.concise();
+        String json = config.root().render(options);
+        ObjectNode jsonNodes = JsonUtils.parseObject(json);
+        LinkedHashMap<String, String> fieldsMap = new LinkedHashMap<>();
+        jsonNodes.fields().forEachRemaining(field -> {
+            String key = field.getKey();
+            JsonNode value = field.getValue();
+
+            if (value.isTextual()) {
+                fieldsMap.put(key, value.textValue());
+            } else {
+                String errorMsg = String.format("The value [%s] of key [%s] that in config is not text", value, key);
+                throw new FieldMapperTransformException(CommonErrorCode.ILLEGAL_ARGUMENT, errorMsg);
+            }
+        });
+        return fieldsMap;
+    }
+
+    @Override
+    protected SeaTunnelRowType transformRowType(SeaTunnelRowType inputRowType) {
+        needReaderColIndex = new ArrayList<>(fieldMapper.size());
+        List<String> outputFiledNameList = new ArrayList<>(fieldMapper.size());
+        List<SeaTunnelDataType<?>> outputDataTypeList = new ArrayList<>(fieldMapper.size());
+        ArrayList<String> inputFieldNames = Lists.newArrayList(inputRowType.getFieldNames());
+        fieldMapper.forEach((key, value) -> {
+            int fieldIndex = inputFieldNames.indexOf(key);
+            if (fieldIndex < 0) {
+                throw new FieldMapperTransformException(FieldMapperTransformErrorCode.INPUT_FIELD_NOT_FOUND,
+                        "Can not found field " + key + " from inputRowType");
+            }
+            needReaderColIndex.add(fieldIndex);
+            outputFiledNameList.add(value);
+            outputDataTypeList.add(inputRowType.getFieldTypes()[fieldIndex]);
+        });
+
+        return new SeaTunnelRowType(outputFiledNameList.toArray(new String[0]),
+            outputDataTypeList.toArray(new SeaTunnelDataType[0]));
+    }
+
+    @Override
+    protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {
+        Object[] outputDataArray = new Object[fieldMapper.size()];
+        for (int i = 0; i < outputDataArray.length; i++) {
+            outputDataArray[i] = inputRow.getField(needReaderColIndex.get(i));
+        }
+        return new SeaTunnelRow(outputDataArray);
+    }
+}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FieldMapperTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FieldMapperTransformFactory.java
new file mode 100644
index 000000000..0dd45063f
--- /dev/null
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FieldMapperTransformFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class FieldMapperTransformFactory implements TableTransformFactory {
+    @Override
+    public String factoryIdentifier() {
+        return "FieldMapper";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder().required(FieldMapperTransform.FIELD_MAPPER).build();
+    }
+}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FieldMapperTransformErrorCode.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FieldMapperTransformErrorCode.java
new file mode 100644
index 000000000..b178c1a9a
--- /dev/null
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FieldMapperTransformErrorCode.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum FieldMapperTransformErrorCode implements SeaTunnelErrorCode {
+
+    INPUT_FIELD_NOT_FOUND("FIELD_MAPPER_TRANSFORM-01", "field mapper input field not found in inputRowType");
+
+    private final String code;
+    private final String description;
+
+    FieldMapperTransformErrorCode(String code, String description) {
+        this.code = code;
+        this.description = description;
+    }
+
+    @Override
+    public String getCode() {
+        return this.code;
+    }
+
+    @Override
+    public String getDescription() {
+        return this.description;
+    }
+}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FieldMapperTransformException.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FieldMapperTransformException.java
new file mode 100644
index 000000000..7fd59fbc0
--- /dev/null
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FieldMapperTransformException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class FieldMapperTransformException extends SeaTunnelRuntimeException {
+    public FieldMapperTransformException(SeaTunnelErrorCode seaTunnelErrorCode,
+                                         String errorMessage) {
+        super(seaTunnelErrorCode, errorMessage);
+    }
+
+    public FieldMapperTransformException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) {
+        super(seaTunnelErrorCode, errorMessage, cause);
+    }
+
+    public FieldMapperTransformException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) {
+        super(seaTunnelErrorCode, cause);
+    }
+}
diff --git a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/FieldMapperTransformFactoryTest.java b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/FieldMapperTransformFactoryTest.java
new file mode 100644
index 000000000..cfa434322
--- /dev/null
+++ b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/FieldMapperTransformFactoryTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class FieldMapperTransformFactoryTest {
+
+    @Test
+    public void testOptionRule() throws Exception {
+        FieldMapperTransformFactory transformFactory = new FieldMapperTransformFactory();
+        Assertions.assertNotNull(transformFactory.optionRule());
+    }
+}