You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/03/27 08:20:22 UTC
[incubator-seatunnel] branch dev updated: Add support CatalogTable for FieldMapperTransform (#4423)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 13e64d441 Add support CatalogTable for FieldMapperTransform (#4423)
13e64d441 is described below
commit 13e64d44147acc648c00e6ab5de7def67586ae8d
Author: Eric <ga...@gmail.com>
AuthorDate: Mon Mar 27 16:20:14 2023 +0800
Add support CatalogTable for FieldMapperTransform (#4423)
---
.../test/resources/extractTopic_fake_to_kafka.conf | 1 +
.../test/resources/spark_date_time_transform.conf | 1 +
.../seatunnel-engine-examples/pom.xml | 5 +
.../seatunnel/transform/FieldMapperTransform.java | 139 ----------------
.../fieldmapper/FieldMapperTransform.java | 178 +++++++++++++++++++++
.../fieldmapper/FieldMapperTransformConfig.java | 48 ++++++
.../FieldMapperTransformFactory.java | 17 +-
.../transform/FieldMapperTransformFactoryTest.java | 2 +
8 files changed, 250 insertions(+), 141 deletions(-)
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/extractTopic_fake_to_kafka.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/extractTopic_fake_to_kafka.conf
index 62eb47680..25f0bd75e 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/extractTopic_fake_to_kafka.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/extractTopic_fake_to_kafka.conf
@@ -64,6 +64,7 @@ transform {
pattern = ".+"
replacement = "test_extract_topic"
is_regex = true
+ replace_first = true
}
}
diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/spark_date_time_transform.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/spark_date_time_transform.conf
index 647cb10e0..55710ce4d 100644
--- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/spark_date_time_transform.conf
+++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/spark_date_time_transform.conf
@@ -45,6 +45,7 @@ transform {
pattern = ".+"
replacement = "b"
is_regex = true
+ replace_first = true
}
}
diff --git a/seatunnel-examples/seatunnel-engine-examples/pom.xml b/seatunnel-examples/seatunnel-engine-examples/pom.xml
index 7acd3ca4d..33dcaed89 100644
--- a/seatunnel-examples/seatunnel-engine-examples/pom.xml
+++ b/seatunnel-examples/seatunnel-engine-examples/pom.xml
@@ -62,5 +62,10 @@
<artifactId>connector-console</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-assert</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</project>
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FieldMapperTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FieldMapperTransform.java
deleted file mode 100644
index 2f729281b..000000000
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FieldMapperTransform.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.transform;
-
-import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
-import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
-
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.api.transform.SeaTunnelTransform;
-import org.apache.seatunnel.common.exception.CommonErrorCode;
-import org.apache.seatunnel.common.utils.JsonUtils;
-import org.apache.seatunnel.transform.common.AbstractSeaTunnelTransform;
-import org.apache.seatunnel.transform.exception.FieldMapperTransformErrorCode;
-import org.apache.seatunnel.transform.exception.FieldMapperTransformException;
-
-import com.google.auto.service.AutoService;
-import com.google.common.collect.Lists;
-import lombok.extern.slf4j.Slf4j;
-
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-@Slf4j
-@AutoService(SeaTunnelTransform.class)
-public class FieldMapperTransform extends AbstractSeaTunnelTransform {
- public static final Option<Map<String, String>> FIELD_MAPPER =
- Options.key("field_mapper")
- .mapType()
- .noDefaultValue()
- .withDescription(
- "Specify the field mapping relationship between input and output");
-
- private LinkedHashMap<String, String> fieldMapper = new LinkedHashMap<>();
-
- private List<Integer> needReaderColIndex;
-
- @Override
- public String getPluginName() {
- return "FieldMapper";
- }
-
- @Override
- protected void setConfig(Config pluginConfig) {
- if (!pluginConfig.hasPath(FIELD_MAPPER.key())) {
- throw new FieldMapperTransformException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- "The configuration missing key: " + FIELD_MAPPER);
- }
- this.fieldMapper = convertConfigToSortedMap(pluginConfig.getConfig(FIELD_MAPPER.key()));
- }
-
- private static LinkedHashMap<String, String> convertConfigToSortedMap(Config config) {
- // Because the entrySet in typesafe config couldn't keep key-value order
- // So use jackson parsing schema information into a map to keep key-value order
- ConfigRenderOptions options = ConfigRenderOptions.concise();
- String json = config.root().render(options);
- ObjectNode jsonNodes = JsonUtils.parseObject(json);
- LinkedHashMap<String, String> fieldsMap = new LinkedHashMap<>();
- jsonNodes
- .fields()
- .forEachRemaining(
- field -> {
- String key = field.getKey();
- JsonNode value = field.getValue();
-
- if (value.isTextual()) {
- fieldsMap.put(key, value.textValue());
- } else {
- String errorMsg =
- String.format(
- "The value [%s] of key [%s] that in config is not text",
- value, key);
- throw new FieldMapperTransformException(
- CommonErrorCode.ILLEGAL_ARGUMENT, errorMsg);
- }
- });
- return fieldsMap;
- }
-
- @Override
- protected SeaTunnelRowType transformRowType(SeaTunnelRowType inputRowType) {
- needReaderColIndex = new ArrayList<>(fieldMapper.size());
- List<String> outputFiledNameList = new ArrayList<>(fieldMapper.size());
- List<SeaTunnelDataType<?>> outputDataTypeList = new ArrayList<>(fieldMapper.size());
- ArrayList<String> inputFieldNames = Lists.newArrayList(inputRowType.getFieldNames());
- fieldMapper.forEach(
- (key, value) -> {
- int fieldIndex = inputFieldNames.indexOf(key);
- if (fieldIndex < 0) {
- throw new FieldMapperTransformException(
- FieldMapperTransformErrorCode.INPUT_FIELD_NOT_FOUND,
- "Can not found field " + key + " from inputRowType");
- }
- needReaderColIndex.add(fieldIndex);
- outputFiledNameList.add(value);
- outputDataTypeList.add(inputRowType.getFieldTypes()[fieldIndex]);
- });
-
- return new SeaTunnelRowType(
- outputFiledNameList.toArray(new String[0]),
- outputDataTypeList.toArray(new SeaTunnelDataType[0]));
- }
-
- @Override
- protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {
- Object[] outputDataArray = new Object[fieldMapper.size()];
- for (int i = 0; i < outputDataArray.length; i++) {
- outputDataArray[i] = inputRow.getField(needReaderColIndex.get(i));
- }
- SeaTunnelRow outputRow = new SeaTunnelRow(outputDataArray);
- outputRow.setRowKind(inputRow.getRowKind());
- outputRow.setTableId(inputRow.getTableId());
- return outputRow;
- }
-}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransform.java
new file mode 100644
index 000000000..8644ba1ae
--- /dev/null
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransform.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.fieldmapper;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.ConfigValidator;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.transform.common.AbstractCatalogSupportTransform;
+import org.apache.seatunnel.transform.exception.FieldMapperTransformErrorCode;
+import org.apache.seatunnel.transform.exception.FieldMapperTransformException;
+import org.apache.seatunnel.transform.exception.TransformException;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.Lists;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+@Slf4j
+@NoArgsConstructor
+@AutoService(SeaTunnelTransform.class)
+public class FieldMapperTransform extends AbstractCatalogSupportTransform {
+ public static String PLUGIN_NAME = "FieldMapper";
+ private FieldMapperTransformConfig config;
+ private List<Integer> needReaderColIndex;
+
+ public FieldMapperTransform(
+ @NonNull FieldMapperTransformConfig config, @NonNull CatalogTable catalogTable) {
+ super(catalogTable);
+ this.config = config;
+ Map<String, String> fieldMapper = config.getFieldMapper();
+ SeaTunnelRowType seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType();
+ List<String> notFoundField =
+ fieldMapper.keySet().stream()
+ .filter(field -> seaTunnelRowType.indexOf(field) == -1)
+ .collect(Collectors.toList());
+ if (!CollectionUtils.isEmpty(notFoundField)) {
+ throw new TransformException(
+ FieldMapperTransformErrorCode.INPUT_FIELD_NOT_FOUND, notFoundField.toString());
+ }
+ }
+
+ @Override
+ public String getPluginName() {
+ return PLUGIN_NAME;
+ }
+
+ @Override
+ protected void setConfig(Config pluginConfig) {
+ ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig))
+ .validate(new FieldMapperTransformFactory().optionRule());
+ config = FieldMapperTransformConfig.of(ReadonlyConfig.fromConfig(pluginConfig));
+ }
+
+ @Override
+ protected SeaTunnelRowType transformRowType(SeaTunnelRowType inputRowType) {
+ Map<String, String> fieldMapper = config.getFieldMapper();
+ needReaderColIndex = new ArrayList<>(fieldMapper.size());
+ List<String> outputFiledNameList = new ArrayList<>(fieldMapper.size());
+ List<SeaTunnelDataType<?>> outputDataTypeList = new ArrayList<>(fieldMapper.size());
+ ArrayList<String> inputFieldNames = Lists.newArrayList(inputRowType.getFieldNames());
+ fieldMapper.forEach(
+ (key, value) -> {
+ int fieldIndex = inputFieldNames.indexOf(key);
+ if (fieldIndex < 0) {
+ throw new FieldMapperTransformException(
+ FieldMapperTransformErrorCode.INPUT_FIELD_NOT_FOUND,
+ "Can not found field " + key + " from inputRowType");
+ }
+ needReaderColIndex.add(fieldIndex);
+ outputFiledNameList.add(value);
+ outputDataTypeList.add(inputRowType.getFieldTypes()[fieldIndex]);
+ });
+
+ return new SeaTunnelRowType(
+ outputFiledNameList.toArray(new String[0]),
+ outputDataTypeList.toArray(new SeaTunnelDataType[0]));
+ }
+
+ @Override
+ protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {
+ Map<String, String> fieldMapper = config.getFieldMapper();
+ Object[] outputDataArray = new Object[fieldMapper.size()];
+ for (int i = 0; i < outputDataArray.length; i++) {
+ outputDataArray[i] = inputRow.getField(needReaderColIndex.get(i));
+ }
+ SeaTunnelRow outputRow = new SeaTunnelRow(outputDataArray);
+ outputRow.setRowKind(inputRow.getRowKind());
+ outputRow.setTableId(inputRow.getTableId());
+ return outputRow;
+ }
+
+ @Override
+ protected TableSchema transformTableSchema() {
+ Map<String, String> fieldMapper = config.getFieldMapper();
+
+ List<Column> inputColumns = inputCatalogTable.getTableSchema().getColumns();
+ SeaTunnelRowType seaTunnelRowType =
+ inputCatalogTable.getTableSchema().toPhysicalRowDataType();
+ List<Column> outputColumns = new ArrayList<>(fieldMapper.size());
+ needReaderColIndex = new ArrayList<>(fieldMapper.size());
+ ArrayList<String> inputFieldNames = Lists.newArrayList(seaTunnelRowType.getFieldNames());
+ fieldMapper.forEach(
+ (key, value) -> {
+ int fieldIndex = inputFieldNames.indexOf(key);
+ if (fieldIndex < 0) {
+ throw new FieldMapperTransformException(
+ FieldMapperTransformErrorCode.INPUT_FIELD_NOT_FOUND,
+ "Can not found field " + key + " from inputRowType");
+ }
+ Column oldColumn = inputColumns.get(fieldIndex);
+ PhysicalColumn outputColumn =
+ PhysicalColumn.of(
+ value,
+ oldColumn.getDataType(),
+ oldColumn.getColumnLength(),
+ oldColumn.isNullable(),
+ oldColumn.getDefaultValue(),
+ oldColumn.getComment());
+ outputColumns.add(outputColumn);
+ needReaderColIndex.add(fieldIndex);
+ });
+
+ List<ConstraintKey> outputConstraintKeys =
+ inputCatalogTable.getTableSchema().getConstraintKeys().stream()
+ .map(ConstraintKey::copy)
+ .collect(Collectors.toList());
+
+ PrimaryKey copiedPrimaryKey =
+ inputCatalogTable.getTableSchema().getPrimaryKey() == null
+ ? null
+ : inputCatalogTable.getTableSchema().getPrimaryKey().copy();
+ return TableSchema.builder()
+ .primaryKey(copiedPrimaryKey)
+ .columns(outputColumns)
+ .constraintKey(outputConstraintKeys)
+ .build();
+ }
+
+ @Override
+ protected TableIdentifier transformTableIdentifier() {
+ return inputCatalogTable.getTableId().copy();
+ }
+}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransformConfig.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransformConfig.java
new file mode 100644
index 000000000..991f27126
--- /dev/null
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransformConfig.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.fieldmapper;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import java.io.Serializable;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+@Getter
+@Setter
+public class FieldMapperTransformConfig implements Serializable {
+ public static final Option<Map<String, String>> FIELD_MAPPER =
+ Options.key("field_mapper")
+ .mapType()
+ .noDefaultValue()
+ .withDescription(
+ "Specify the field mapping relationship between input and output");
+
+ private Map<String, String> fieldMapper = new LinkedHashMap<>();
+
+ public static FieldMapperTransformConfig of(ReadonlyConfig config) {
+ FieldMapperTransformConfig fieldMapperTransformConfig = new FieldMapperTransformConfig();
+ fieldMapperTransformConfig.setFieldMapper(config.get(FIELD_MAPPER));
+ return fieldMapperTransformConfig;
+ }
+}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FieldMapperTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransformFactory.java
similarity index 64%
rename from seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FieldMapperTransformFactory.java
rename to seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransformFactory.java
index 0dd45063f..9e6334516 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FieldMapperTransformFactory.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransformFactory.java
@@ -15,10 +15,14 @@
* limitations under the License.
*/
-package org.apache.seatunnel.transform;
+package org.apache.seatunnel.transform.fieldmapper;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableTransform;
import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableFactoryContext;
import org.apache.seatunnel.api.table.factory.TableTransformFactory;
import com.google.auto.service.AutoService;
@@ -32,6 +36,15 @@ public class FieldMapperTransformFactory implements TableTransformFactory {
@Override
public OptionRule optionRule() {
- return OptionRule.builder().required(FieldMapperTransform.FIELD_MAPPER).build();
+ return OptionRule.builder().required(FieldMapperTransformConfig.FIELD_MAPPER).build();
+ }
+
+ @Override
+ public TableTransform createTransform(TableFactoryContext context) {
+ CatalogTable catalogTable = context.getCatalogTable();
+ ReadonlyConfig options = context.getOptions();
+ FieldMapperTransformConfig fieldMapperTransformConfig =
+ FieldMapperTransformConfig.of(options);
+ return () -> new FieldMapperTransform(fieldMapperTransformConfig, catalogTable);
}
}
diff --git a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/FieldMapperTransformFactoryTest.java b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/FieldMapperTransformFactoryTest.java
index cfa434322..7bbe9b1ad 100644
--- a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/FieldMapperTransformFactoryTest.java
+++ b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/FieldMapperTransformFactoryTest.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.transform;
+import org.apache.seatunnel.transform.fieldmapper.FieldMapperTransformFactory;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;