You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wa...@apache.org on 2023/01/03 06:34:21 UTC
[incubator-seatunnel] branch dev updated: [Feature][Transform-V2] add FieldMapper Transform (#3781)
This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 1118c8317 [Feature][Transform-V2] add FieldMapper Transform (#3781)
1118c8317 is described below
commit 1118c8317b18d047abce3253a1b47777b595d6a8
Author: Eric <ga...@gmail.com>
AuthorDate: Tue Jan 3 14:34:16 2023 +0800
[Feature][Transform-V2] add FieldMapper Transform (#3781)
* add FieldMapper Transform
---
.../connector-v2/{EnvConf.md => JobEnvConfig.md} | 2 +-
docs/en/transform-v2/field-mapper.md | 64 +++++++++++
docs/sidebars.js | 2 +-
.../seatunnel/e2e/transform/TestFieldMapperIT.java | 35 ++++++
.../src/test/resources/field_mapper_transform.conf | 109 ++++++++++++++++++
.../seatunnel/transform/FieldMapperTransform.java | 122 +++++++++++++++++++++
.../transform/FieldMapperTransformFactory.java | 37 +++++++
.../exception/FieldMapperTransformErrorCode.java | 43 ++++++++
.../exception/FieldMapperTransformException.java | 36 ++++++
.../transform/FieldMapperTransformFactoryTest.java | 30 +++++
10 files changed, 478 insertions(+), 2 deletions(-)
diff --git a/docs/en/connector-v2/EnvConf.md b/docs/en/connector-v2/JobEnvConfig.md
similarity index 97%
rename from docs/en/connector-v2/EnvConf.md
rename to docs/en/connector-v2/JobEnvConfig.md
index d0616d039..195012f7e 100644
--- a/docs/en/connector-v2/EnvConf.md
+++ b/docs/en/connector-v2/JobEnvConfig.md
@@ -1,4 +1,4 @@
-# EnvConf
+# JobEnvConfig
This document describes env configuration information,env unifies the environment variables of all engines.
diff --git a/docs/en/transform-v2/field-mapper.md b/docs/en/transform-v2/field-mapper.md
new file mode 100644
index 000000000..32b866e9e
--- /dev/null
+++ b/docs/en/transform-v2/field-mapper.md
@@ -0,0 +1,64 @@
+# FieldMapper
+
+> FieldMapper transform plugin
+
+## Description
+
+Add input schema and output schema mapping.
+
+## Options
+
+| name | type | required | default value |
+|--------------------|--------| -------- |---------------|
+| field_mapper | Object | yes | |
+
+### field_mapper [config]
+
+Specify the field mapping relationship between input and output
+
+### common options [config]
+
+Transform plugin common parameters, please refer to [Transform Plugin](common-options.md) for details.
+
+## Example
+
+The data read from source is a table like this:
+
+| id | name | age | card |
+|-----|----------|-----|------|
+| 1 | Joy Ding | 20 | 123 |
+| 2 | May Ding | 20 | 123 |
+| 3 | Kin Dom | 20 | 123 |
+| 4 | Joy Dom | 20 | 123 |
+
+We want to delete `age` field and update the filed order to `id`, `card`, `name` and rename `name` to `new_name`. We can add `FieldMapper` transform like this
+
+```
+transform {
+ FieldMapper {
+ source_table_name = "fake"
+ result_table_name = "fake1"
+ field_mapper = {
+ id = id
+ card = card
+ name = new_name
+ }
+ }
+}
+```
+
+Then the data in result table `fake1` will like this
+
+| id | card | new_name |
+|-----|------|----------|
+| 1 | 123 | Joy Ding |
+| 2 | 123 | May Ding |
+| 3 | 123 | Kin Dom |
+| 4 | 123 | Joy Dom |
+
+
+## Changelog
+
+### new version
+
+- Add Copy Transform Connector
\ No newline at end of file
diff --git a/docs/sidebars.js b/docs/sidebars.js
index ab4c1cef1..550e02338 100644
--- a/docs/sidebars.js
+++ b/docs/sidebars.js
@@ -132,8 +132,8 @@ const sidebars = {
}
]
},
+ "connector-v2/JobEnvConfig",
"connector-v2/Error-Quick-Reference-Manual",
- "connector-v2/EnvConf"
]
},
{
diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/java/org/apache/seatunnel/e2e/transform/TestFieldMapperIT.java b/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/java/org/apache/seatunnel/e2e/transform/TestFieldMapperIT.java
new file mode 100644
index 000000000..d1d61c938
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/java/org/apache/seatunnel/e2e/transform/TestFieldMapperIT.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.transform;
+
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+public class TestFieldMapperIT extends TestSuiteBase {
+
+ @TestTemplate
+ public void testFieldMapper(TestContainer container) throws IOException, InterruptedException {
+ Container.ExecResult execResult = container.executeJob("/field_mapper_transform.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/resources/field_mapper_transform.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/resources/field_mapper_transform.conf
new file mode 100644
index 000000000..2071e66d5
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/resources/field_mapper_transform.conf
@@ -0,0 +1,109 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ result_table_name = "fake"
+ row.num = 100
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ age = "int"
+ string1 = "string"
+ int1 = "int"
+ c_bigint = "bigint"
+ }
+ }
+ }
+}
+
+transform {
+ FieldMapper {
+ source_table_name = "fake"
+ result_table_name = "fake1"
+ field_mapper = {
+ id = id
+ age = age_as
+ int1 = int1_as
+ name = name
+ }
+ }
+}
+
+sink {
+ Console {
+ source_table_name = "fake1"
+ }
+ Assert {
+ source_table_name = "fake1"
+ rules =
+ {
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 100
+ }
+ ],
+ field_rules = [
+ {
+ field_name = id
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = age_as
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = int1_as
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FieldMapperTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FieldMapperTransform.java
new file mode 100644
index 000000000..346740f67
--- /dev/null
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FieldMapperTransform.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform;
+
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.transform.common.AbstractSeaTunnelTransform;
+import org.apache.seatunnel.transform.exception.FieldMapperTransformErrorCode;
+import org.apache.seatunnel.transform.exception.FieldMapperTransformException;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.auto.service.AutoService;
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+@Slf4j
+@AutoService(SeaTunnelTransform.class)
+public class FieldMapperTransform extends AbstractSeaTunnelTransform {
+ public static final Option<Map<String, String>> FIELD_MAPPER = Options.key("field_mapper")
+ .mapType()
+ .noDefaultValue()
+ .withDescription("Specify the field mapping relationship between input and output");
+
+ private LinkedHashMap<String, String> fieldMapper = new LinkedHashMap<>();
+
+ private List<Integer> needReaderColIndex;
+
+ @Override
+ public String getPluginName() {
+ return "FieldMapper";
+ }
+
+ @Override
+ protected void setConfig(Config pluginConfig) {
+ if (!pluginConfig.hasPath(FIELD_MAPPER.key())) {
+ throw new FieldMapperTransformException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, "The configuration missing key: " + FIELD_MAPPER);
+ }
+ this.fieldMapper = convertConfigToSortedMap(pluginConfig.getConfig(FIELD_MAPPER.key()));
+ }
+
+ private static LinkedHashMap<String, String> convertConfigToSortedMap(Config config) {
+ // Because the entrySet in typesafe config couldn't keep key-value order
+ // So use jackson parsing schema information into a map to keep key-value order
+ ConfigRenderOptions options = ConfigRenderOptions.concise();
+ String json = config.root().render(options);
+ ObjectNode jsonNodes = JsonUtils.parseObject(json);
+ LinkedHashMap<String, String> fieldsMap = new LinkedHashMap<>();
+ jsonNodes.fields().forEachRemaining(field -> {
+ String key = field.getKey();
+ JsonNode value = field.getValue();
+
+ if (value.isTextual()) {
+ fieldsMap.put(key, value.textValue());
+ } else {
+ String errorMsg = String.format("The value [%s] of key [%s] that in config is not text", value, key);
+ throw new FieldMapperTransformException(CommonErrorCode.ILLEGAL_ARGUMENT, errorMsg);
+ }
+ });
+ return fieldsMap;
+ }
+
+ @Override
+ protected SeaTunnelRowType transformRowType(SeaTunnelRowType inputRowType) {
+ needReaderColIndex = new ArrayList<>(fieldMapper.size());
+ List<String> outputFiledNameList = new ArrayList<>(fieldMapper.size());
+ List<SeaTunnelDataType<?>> outputDataTypeList = new ArrayList<>(fieldMapper.size());
+ ArrayList<String> inputFieldNames = Lists.newArrayList(inputRowType.getFieldNames());
+ fieldMapper.forEach((key, value) -> {
+ int fieldIndex = inputFieldNames.indexOf(key);
+ if (fieldIndex < 0) {
+ throw new FieldMapperTransformException(FieldMapperTransformErrorCode.INPUT_FIELD_NOT_FOUND,
+ "Can not found field " + key + " from inputRowType");
+ }
+ needReaderColIndex.add(fieldIndex);
+ outputFiledNameList.add(value);
+ outputDataTypeList.add(inputRowType.getFieldTypes()[fieldIndex]);
+ });
+
+ return new SeaTunnelRowType(outputFiledNameList.toArray(new String[0]),
+ outputDataTypeList.toArray(new SeaTunnelDataType[0]));
+ }
+
+ @Override
+ protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {
+ Object[] outputDataArray = new Object[fieldMapper.size()];
+ for (int i = 0; i < outputDataArray.length; i++) {
+ outputDataArray[i] = inputRow.getField(needReaderColIndex.get(i));
+ }
+ return new SeaTunnelRow(outputDataArray);
+ }
+}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FieldMapperTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FieldMapperTransformFactory.java
new file mode 100644
index 000000000..0dd45063f
--- /dev/null
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FieldMapperTransformFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class FieldMapperTransformFactory implements TableTransformFactory {
+ @Override
+ public String factoryIdentifier() {
+ return "FieldMapper";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder().required(FieldMapperTransform.FIELD_MAPPER).build();
+ }
+}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FieldMapperTransformErrorCode.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FieldMapperTransformErrorCode.java
new file mode 100644
index 000000000..b178c1a9a
--- /dev/null
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FieldMapperTransformErrorCode.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum FieldMapperTransformErrorCode implements SeaTunnelErrorCode {
+
+ INPUT_FIELD_NOT_FOUND("FIELD_MAPPER_TRANSFORM-01", "field mapper input field not found in inputRowType");
+
+ private final String code;
+ private final String description;
+
+ FieldMapperTransformErrorCode(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ @Override
+ public String getCode() {
+ return this.code;
+ }
+
+ @Override
+ public String getDescription() {
+ return this.description;
+ }
+}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FieldMapperTransformException.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FieldMapperTransformException.java
new file mode 100644
index 000000000..7fd59fbc0
--- /dev/null
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FieldMapperTransformException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class FieldMapperTransformException extends SeaTunnelRuntimeException {
+ public FieldMapperTransformException(SeaTunnelErrorCode seaTunnelErrorCode,
+ String errorMessage) {
+ super(seaTunnelErrorCode, errorMessage);
+ }
+
+ public FieldMapperTransformException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) {
+ super(seaTunnelErrorCode, errorMessage, cause);
+ }
+
+ public FieldMapperTransformException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) {
+ super(seaTunnelErrorCode, cause);
+ }
+}
diff --git a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/FieldMapperTransformFactoryTest.java b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/FieldMapperTransformFactoryTest.java
new file mode 100644
index 000000000..cfa434322
--- /dev/null
+++ b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/FieldMapperTransformFactoryTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class FieldMapperTransformFactoryTest {
+
+ @Test
+ public void testOptionRule() throws Exception {
+ FieldMapperTransformFactory transformFactory = new FieldMapperTransformFactory();
+ Assertions.assertNotNull(transformFactory.optionRule());
+ }
+}