You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/03/25 13:21:13 UTC
[incubator-seatunnel] branch dev updated: [Feature][Transform] Support copy field list (#4404)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c048ee09b [Feature][Transform] Support copy field list (#4404)
c048ee09b is described below
commit c048ee09b2f7c51d1a3fb177cd3df4e8e5f422c6
Author: hailin0 <wa...@apache.org>
AuthorDate: Sat Mar 25 21:21:05 2023 +0800
[Feature][Transform] Support copy field list (#4404)
---
docs/en/transform-v2/copy.md | 37 +++---
.../src/test/resources/copy_transform.conf | 40 +++++-
.../common/MultipleFieldOutputTransform.java | 29 +++--
.../transform/{ => copy}/CopyFieldTransform.java | 138 ++++++++++++++-------
.../{ => copy}/CopyFieldTransformFactory.java | 23 +++-
.../transform/copy/CopyTransformConfig.java | 71 +++++++++++
.../transform/CopyFieldTransformFactoryTest.java | 2 +
7 files changed, 260 insertions(+), 80 deletions(-)
diff --git a/docs/en/transform-v2/copy.md b/docs/en/transform-v2/copy.md
index 956b8c277..7a0e73f44 100644
--- a/docs/en/transform-v2/copy.md
+++ b/docs/en/transform-v2/copy.md
@@ -8,18 +8,13 @@ Copy a field to a new field.
## Options
-| name | type | required | default value |
-|------------|--------|----------|---------------|
-| src_field | string | yes | |
-| dest_field | string | yes | |
+| name | type | required | default value |
+|--------|--------|----------|---------------|
+| fields | Object | yes | |
-### src_field [string]
+### fields [config]
-Src field name you want to copy
-
-### dest_field [string]
-
-This dest field name
+Specify the field copy relationship between input and output
### common options [string]
@@ -36,31 +31,35 @@ The data read from source is a table like this:
| Kin Dom | 20 | 123 |
| Joy Dom | 20 | 123 |
-We want copy field `name` to a new field `name1`, we can add `Copy` Transform like this
+We want copy fields `name`、`age` to a new fields `name1`、`name2`、`age1`, we can add `Copy` Transform like this
```
transform {
Copy {
source_table_name = "fake"
result_table_name = "fake1"
- src_field = "name"
- dest_field = "name1"
+ fields {
+ name1 = name
+ name2 = name
+ age1 = age
+ }
}
}
```
Then the data in result table `fake1` will like this
-| name | age | card | name1 |
-|----------|-----|------|----------|
-| Joy Ding | 20 | 123 | Joy Ding |
-| May Ding | 20 | 123 | May Ding |
-| Kin Dom | 20 | 123 | Kin Dom |
-| Joy Dom | 20 | 123 | Joy Dom |
+| name | age | card | name1 | name2 | age1 |
+|----------|-----|------|----------|----------|------|
+| Joy Ding | 20 | 123 | Joy Ding | Joy Ding | 20 |
+| May Ding | 20 | 123 | May Ding | May Ding | 20 |
+| Kin Dom | 20 | 123 | Kin Dom | Kin Dom | 20 |
+| Joy Dom | 20 | 123 | Joy Dom | Joy Dom | 20 |
## Changelog
### new version
- Add Copy Transform Connector
+- Support copy fields to a new fields
diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/copy_transform.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/copy_transform.conf
index cd29bc156..68c9e4e52 100644
--- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/copy_transform.conf
+++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/copy_transform.conf
@@ -42,14 +42,23 @@ transform {
src_field = "name"
dest_field = "name1"
}
+ Copy {
+ source_table_name = "fake1"
+ result_table_name = "fake2"
+ fields {
+ id_1 = "id"
+ name2 = "name"
+ name3 = "name"
+ }
+ }
}
sink {
Console {
- source_table_name = "fake1"
+ source_table_name = "fake2"
}
Assert {
- source_table_name = "fake1"
+ source_table_name = "fake2"
rules =
{
row_rules = [
@@ -68,6 +77,15 @@ sink {
}
]
},
+ {
+ field_name = id_1
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
{
field_name = name
field_type = string
@@ -86,6 +104,24 @@ sink {
}
]
}
+ {
+ field_name = name2
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ {
+ field_name = name3
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
]
}
}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
index 438435df2..2609ced1b 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
@@ -32,6 +32,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
+import java.util.Optional;
import java.util.stream.Collectors;
@Slf4j
@@ -184,7 +185,7 @@ public abstract class MultipleFieldOutputTransform extends AbstractCatalogSuppor
TableSchema.builder()
.primaryKey(inputCatalogTable.getTableSchema().getPrimaryKey())
.constraintKey(inputCatalogTable.getTableSchema().getConstraintKeys());
- List<Column> copyInputColumns =
+ List<Column> columns =
inputCatalogTable.getTableSchema().getColumns().stream()
.map(Column::copy)
.collect(Collectors.toList());
@@ -192,21 +193,31 @@ public abstract class MultipleFieldOutputTransform extends AbstractCatalogSuppor
int addFieldCount = 0;
this.fieldsIndex = new int[outputColumns.length];
for (int i = 0; i < outputColumns.length; i++) {
- for (int j = 0; j < copyInputColumns.size(); j++) {
- if (copyInputColumns.get(j).getName().equals(outputColumns[i].getName())) {
- copyInputColumns.set(j, outputColumns[i]);
- } else {
- addFieldCount++;
- copyInputColumns.add(outputColumns[i]);
+ Column outputColumn = outputColumns[i];
+ Optional<Column> optional =
+ columns.stream()
+ .filter(c -> c.getName().equals(outputColumn.getName()))
+ .findFirst();
+ if (optional.isPresent()) {
+ Column originalColumn = optional.get();
+ int originalColumnIndex = columns.indexOf(originalColumn);
+ if (!originalColumn.getDataType().equals(outputColumn.getDataType())) {
+ columns.set(
+ originalColumnIndex, originalColumn.copy(outputColumn.getDataType()));
}
+ fieldsIndex[i] = originalColumnIndex;
+ } else {
+ addFieldCount++;
+ columns.add(outputColumn);
+ fieldsIndex[i] = columns.indexOf(outputColumn);
}
}
- TableSchema outputTableSchema = builder.columns(copyInputColumns).build();
+ TableSchema outputTableSchema = builder.columns(columns).build();
if (addFieldCount > 0) {
int inputFieldLength =
inputCatalogTable.getTableSchema().toPhysicalRowDataType().getTotalFields();
- int outputFieldLength = copyInputColumns.size();
+ int outputFieldLength = columns.size();
rowContainerGenerator =
new SeaTunnelRowContainerGenerator() {
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/CopyFieldTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransform.java
similarity index 50%
rename from seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/CopyFieldTransform.java
rename to seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransform.java
index 7a60d71b8..b50a195ec 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/CopyFieldTransform.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransform.java
@@ -15,89 +15,139 @@
* limitations under the License.
*/
-package org.apache.seatunnel.transform;
+package org.apache.seatunnel.transform.copy;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.ConfigValidator;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform;
import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
-import org.apache.seatunnel.transform.common.SingleFieldOutputTransform;
import com.google.auto.service.AutoService;
+import lombok.NoArgsConstructor;
import java.lang.reflect.Array;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+@NoArgsConstructor
@AutoService(SeaTunnelTransform.class)
-public class CopyFieldTransform extends SingleFieldOutputTransform {
-
- public static final Option<String> SRC_FIELD =
- Options.key("src_field")
- .stringType()
- .noDefaultValue()
- .withDescription("Src field you want to copy");
-
- public static final Option<String> DEST_FIELD =
- Options.key("dest_field")
- .stringType()
- .noDefaultValue()
- .withDescription("Copy Src field to Dest field");
-
- private String srcField;
- private int srcFieldIndex;
- private SeaTunnelDataType srcFieldDataType;
- private String destField;
+public class CopyFieldTransform extends MultipleFieldOutputTransform {
+ public static final String PLUGIN_NAME = "Copy";
+
+ private CopyTransformConfig config;
+ private List<String> fieldNames;
+ private List<Integer> fieldOriginalIndexs;
+ private List<SeaTunnelDataType> fieldTypes;
+
+ public CopyFieldTransform(CopyTransformConfig copyTransformConfig, CatalogTable catalogTable) {
+ super(catalogTable);
+ this.config = copyTransformConfig;
+ SeaTunnelRowType seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType();
+ initOutputFields(seaTunnelRowType, config.getFields());
+ }
@Override
public String getPluginName() {
- return "Copy";
+ return PLUGIN_NAME;
}
@Override
protected void setConfig(Config pluginConfig) {
- CheckResult checkResult =
- CheckConfigUtil.checkAllExists(pluginConfig, SRC_FIELD.key(), DEST_FIELD.key());
- if (!checkResult.isSuccess()) {
- throw new IllegalArgumentException("Failed to check config! " + checkResult.getMsg());
- }
-
- this.srcField = pluginConfig.getString(SRC_FIELD.key());
- this.destField = pluginConfig.getString(DEST_FIELD.key());
+ ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig))
+ .validate(new CopyFieldTransformFactory().optionRule());
+ this.config = CopyTransformConfig.of(ReadonlyConfig.fromConfig(pluginConfig));
}
@Override
protected void setInputRowType(SeaTunnelRowType inputRowType) {
- srcFieldIndex = inputRowType.indexOf(srcField);
- if (srcFieldIndex == -1) {
- throw new IllegalArgumentException(
- "Cannot find [" + srcField + "] field in input row type");
+ initOutputFields(inputRowType, config.getFields());
+ }
+
+ private void initOutputFields(
+ SeaTunnelRowType inputRowType, LinkedHashMap<String, String> fields) {
+ List<String> fieldNames = new ArrayList<>();
+ List<Integer> fieldOriginalIndexs = new ArrayList<>();
+ List<SeaTunnelDataType> fieldsType = new ArrayList<>();
+ for (Map.Entry<String, String> field : fields.entrySet()) {
+ String srcField = field.getValue();
+ int srcFieldIndex = inputRowType.indexOf(srcField);
+ if (srcFieldIndex == -1) {
+ throw new IllegalArgumentException(
+ "Cannot find [" + srcField + "] field in input row type");
+ }
+ fieldNames.add(field.getKey());
+ fieldOriginalIndexs.add(srcFieldIndex);
+ fieldsType.add(inputRowType.getFieldType(srcFieldIndex));
}
- srcFieldDataType = inputRowType.getFieldType(srcFieldIndex);
+ this.fieldNames = fieldNames;
+ this.fieldOriginalIndexs = fieldOriginalIndexs;
+ this.fieldTypes = fieldsType;
}
@Override
- protected String getOutputFieldName() {
- return destField;
+ protected String[] getOutputFieldNames() {
+ return fieldNames.toArray(new String[0]);
}
@Override
- protected SeaTunnelDataType getOutputFieldDataType() {
- return srcFieldDataType;
+ protected SeaTunnelDataType[] getOutputFieldDataTypes() {
+ return fieldTypes.toArray(new SeaTunnelDataType[0]);
}
@Override
- protected Object getOutputFieldValue(SeaTunnelRowAccessor inputRow) {
- return clone(srcFieldDataType, inputRow.getField(srcFieldIndex));
+ protected Column[] getOutputColumns() {
+ if (inputCatalogTable == null) {
+ Column[] columns = new Column[fieldNames.size()];
+ for (int i = 0; i < fieldNames.size(); i++) {
+ columns[i] =
+ PhysicalColumn.of(fieldNames.get(i), fieldTypes.get(i), 200, true, "", "");
+ }
+ return columns;
+ }
+
+ Map<String, Column> catalogTableColumns =
+ inputCatalogTable.getTableSchema().getColumns().stream()
+ .collect(Collectors.toMap(column -> column.getName(), column -> column));
+
+ List<Column> columns = new ArrayList<>();
+ for (Map.Entry<String, String> copyField : config.getFields().entrySet()) {
+ Column srcColumn = catalogTableColumns.get(copyField.getValue());
+ PhysicalColumn destColumn =
+ PhysicalColumn.of(
+ copyField.getKey(),
+ srcColumn.getDataType(),
+ srcColumn.getColumnLength(),
+ srcColumn.isNullable(),
+ srcColumn.getDefaultValue(),
+ srcColumn.getComment());
+ columns.add(destColumn);
+ }
+ return columns.toArray(new Column[0]);
+ }
+
+ @Override
+ protected Object[] getOutputFieldValues(SeaTunnelRowAccessor inputRow) {
+ Object[] fieldValues = new Object[fieldNames.size()];
+ for (int i = 0; i < fieldOriginalIndexs.size(); i++) {
+ fieldValues[i] =
+ clone(fieldTypes.get(i), inputRow.getField(fieldOriginalIndexs.get(i)));
+ }
+ return fieldValues;
}
private Object clone(SeaTunnelDataType dataType, Object value) {
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/CopyFieldTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransformFactory.java
similarity index 59%
rename from seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/CopyFieldTransformFactory.java
rename to seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransformFactory.java
index d6819339d..2271fa641 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/CopyFieldTransformFactory.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransformFactory.java
@@ -15,26 +15,37 @@
* limitations under the License.
*/
-package org.apache.seatunnel.transform;
+package org.apache.seatunnel.transform.copy;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableTransform;
import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableFactoryContext;
import org.apache.seatunnel.api.table.factory.TableTransformFactory;
import com.google.auto.service.AutoService;
-
-import static org.apache.seatunnel.transform.CopyFieldTransform.DEST_FIELD;
-import static org.apache.seatunnel.transform.CopyFieldTransform.SRC_FIELD;
+import lombok.NonNull;
@AutoService(Factory.class)
public class CopyFieldTransformFactory implements TableTransformFactory {
@Override
public String factoryIdentifier() {
- return "Copy";
+ return CopyFieldTransform.PLUGIN_NAME;
}
@Override
public OptionRule optionRule() {
- return OptionRule.builder().required(SRC_FIELD, DEST_FIELD).build();
+ return OptionRule.builder()
+ .bundled(CopyTransformConfig.SRC_FIELD, CopyTransformConfig.DEST_FIELD)
+ .bundled(CopyTransformConfig.FIELDS)
+ .build();
+ }
+
+ @Override
+ public TableTransform createTransform(@NonNull TableFactoryContext context) {
+ CopyTransformConfig copyTransformConfig = CopyTransformConfig.of(context.getOptions());
+ CatalogTable catalogTable = context.getCatalogTable();
+ return () -> new CopyFieldTransform(copyTransformConfig, catalogTable);
}
}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyTransformConfig.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyTransformConfig.java
new file mode 100644
index 000000000..276424830
--- /dev/null
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyTransformConfig.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.copy;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import java.io.Serializable;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+
+@Getter
+@Setter
+public class CopyTransformConfig implements Serializable {
+ @Deprecated
+ public static final Option<String> SRC_FIELD =
+ Options.key("src_field")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Src field you want to copy");
+
+ @Deprecated
+ public static final Option<String> DEST_FIELD =
+ Options.key("dest_field")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Copy Src field to Dest field");
+
+ public static final Option<Map<String, String>> FIELDS =
+ Options.key("fields")
+ .mapType()
+ .noDefaultValue()
+ .withDescription(
+ "Specify the field copy relationship between input and output");
+
+ private LinkedHashMap<String, String> fields;
+
+ public static CopyTransformConfig of(ReadonlyConfig config) {
+ LinkedHashMap<String, String> fields = new LinkedHashMap<>();
+ Optional<Map<String, String>> optional = config.getOptional(FIELDS);
+ if (optional.isPresent()) {
+ fields.putAll(config.get(FIELDS));
+ } else {
+ fields.put(config.get(DEST_FIELD), config.get(SRC_FIELD));
+ }
+
+ CopyTransformConfig copyTransformConfig = new CopyTransformConfig();
+ copyTransformConfig.setFields(fields);
+ return copyTransformConfig;
+ }
+}
diff --git a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/CopyFieldTransformFactoryTest.java b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/CopyFieldTransformFactoryTest.java
index f1f905a80..9f22dbbbf 100644
--- a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/CopyFieldTransformFactoryTest.java
+++ b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/CopyFieldTransformFactoryTest.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.transform;
+import org.apache.seatunnel.transform.copy.CopyFieldTransformFactory;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;