You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/03/24 10:12:32 UTC
[incubator-seatunnel] branch dev updated: [Feature][Transform V2 & Zeta] Make SplitTransform Support CatalogTable And CatalogTable Evolution (#4396)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new cee6c1983 [Feature][Transform V2 & Zeta] Make SplitTransform Support CatalogTable And CatalogTable Evolution (#4396)
cee6c1983 is described below
commit cee6c198358b063c5fabe2d028620b38edba2251
Author: Eric <ga...@gmail.com>
AuthorDate: Fri Mar 24 18:12:24 2023 +0800
[Feature][Transform V2 & Zeta] Make SplitTransform Support CatalogTable And CatalogTable Evolution (#4396)
---
.../apache/seatunnel/api/table/catalog/Column.java | 3 +
.../api/table/catalog/MetadataColumn.java | 6 +
.../api/table/catalog/PhysicalColumn.java | 5 +
.../seatunnel/api/table/catalog/TableSchema.java | 5 +
.../seatunnel/api/table/factory/FactoryUtil.java | 14 +++
.../api/transform/SeaTunnelTransform.java | 8 ++
.../core/parse/MultipleTableJobConfigParser.java | 21 +++-
.../apache/seatunnel/transform/SplitTransform.java | 125 -------------------
.../common/AbstractCatalogSupportTransform.java | 78 ++++++++++++
.../common/AbstractSeaTunnelTransform.java | 9 ++
.../common/MultipleFieldOutputTransform.java | 90 +++++++++++++-
.../seatunnel/transform/split/SplitTransform.java | 137 +++++++++++++++++++++
.../transform/split/SplitTransformConfig.java | 65 ++++++++++
.../{ => split}/SplitTransformFactory.java | 22 +++-
.../transform/SplitTransformFactoryTest.java | 2 +
15 files changed, 457 insertions(+), 133 deletions(-)
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
index 0b914fbc7..b528996a3 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
@@ -77,4 +77,7 @@ public abstract class Column implements Serializable {
/** Returns a copy of the column with a replaced {@link SeaTunnelDataType}. */
public abstract Column copy(SeaTunnelDataType<?> newType);
+
+ /** Returns a copy of the column. */
+ public abstract Column copy();
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataColumn.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataColumn.java
index d98f7d27e..5325dac79 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataColumn.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataColumn.java
@@ -64,4 +64,10 @@ public class MetadataColumn extends Column {
return MetadataColumn.of(
name, newType, columnLength, metadataKey, nullable, defaultValue, comment);
}
+
+ @Override
+ public Column copy() {
+ return MetadataColumn.of(
+ name, dataType, columnLength, metadataKey, nullable, defaultValue, comment);
+ }
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PhysicalColumn.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PhysicalColumn.java
index b29d7a357..bc379e355 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PhysicalColumn.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PhysicalColumn.java
@@ -57,4 +57,9 @@ public class PhysicalColumn extends Column {
public Column copy(SeaTunnelDataType<?> newType) {
return PhysicalColumn.of(name, newType, columnLength, nullable, defaultValue, comment);
}
+
+ @Override
+ public Column copy() {
+ return PhysicalColumn.of(name, dataType, columnLength, nullable, defaultValue, comment);
+ }
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
index 1f3d7e6b3..bd494bfd8 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
@@ -83,6 +83,11 @@ public final class TableSchema implements Serializable {
return this;
}
+ public Builder constraintKey(List<ConstraintKey> constraintKeys) {
+ this.constraintKeys.addAll(constraintKeys);
+ return this;
+ }
+
public TableSchema build() {
return new TableSchema(columns, primaryKey, constraintKeys);
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
index ec1ed90f2..a330b4e37 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
@@ -33,6 +33,7 @@ import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.connector.TableSource;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -309,4 +310,17 @@ public final class FactoryUtil {
return sinkOptionRule;
}
+
+ public static SeaTunnelTransform<?> createAndPrepareTransform(
+ CatalogTable catalogTable,
+ ReadonlyConfig options,
+ ClassLoader classLoader,
+ String factoryIdentifier) {
+ final TableTransformFactory factory =
+ discoverFactory(classLoader, TableTransformFactory.class, factoryIdentifier);
+ TableFactoryContext context =
+ new TableFactoryContext(
+ Collections.singletonList(catalogTable), options, classLoader);
+ return factory.createTransform(context).createTransform();
+ }
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java
index 292486f7d..54b0e390a 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.api.transform;
import org.apache.seatunnel.api.common.PluginIdentifierInterface;
import org.apache.seatunnel.api.common.SeaTunnelPluginLifeCycle;
import org.apache.seatunnel.api.source.SeaTunnelJobAware;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import java.io.Serializable;
@@ -47,6 +48,13 @@ public interface SeaTunnelTransform<T>
*/
SeaTunnelDataType<T> getProducedType();
+ /**
+ * Get the catalog table output by this transform
+ *
+ * @return
+ */
+ CatalogTable getProducedCatalogTable();
+
/**
* Transform input data to {@link this#getProducedType()} types data.
*
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index ad77a0a05..82ea1d62f 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -40,6 +40,7 @@ import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableTransformFactory;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
@@ -325,6 +326,9 @@ public class MultipleTableJobConfigParser {
Config config = transforms.poll();
final ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(config);
final String factoryId = getFactoryId(readonlyConfig);
+ // get factory urls
+ Set<URL> factoryUrls =
+ getFactoryUrls(readonlyConfig, classLoader, TableTransformFactory.class, factoryId);
final List<String> inputIds = getInputIds(readonlyConfig);
List<Tuple2<CatalogTable, Action>> inputs =
@@ -383,7 +387,22 @@ public class MultipleTableJobConfigParser {
return;
}
- // TODO: TableTransformFactory is not available.
+ CatalogTable catalogTable = inputs.get(0)._1();
+ SeaTunnelTransform<?> transform =
+ FactoryUtil.createAndPrepareTransform(
+ catalogTable, readonlyConfig, classLoader, factoryId);
+ long id = idGenerator.getNextId();
+ String actionName =
+ JobConfigParser.createTransformActionName(
+ 0, factoryId, JobConfigParser.getTableName(config));
+
+ TransformAction transformAction =
+ new TransformAction(
+ id, actionName, new ArrayList<>(inputActions), transform, factoryUrls);
+ tableWithActionMap.put(
+ tableId,
+ Collections.singletonList(
+ new Tuple2<>(transform.getProducedCatalogTable(), transformAction)));
return;
}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/SplitTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/SplitTransform.java
deleted file mode 100644
index 6705f7a51..000000000
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/SplitTransform.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.transform;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.api.transform.SeaTunnelTransform;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform;
-import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
-
-import com.google.auto.service.AutoService;
-
-import java.util.List;
-import java.util.function.IntFunction;
-import java.util.stream.IntStream;
-
-@AutoService(SeaTunnelTransform.class)
-public class SplitTransform extends MultipleFieldOutputTransform {
-
- public static final Option<String> KEY_SEPARATOR =
- Options.key("separator")
- .stringType()
- .noDefaultValue()
- .withDescription("The separator to split the field");
-
- public static final Option<String> KEY_SPLIT_FIELD =
- Options.key("split_field")
- .stringType()
- .noDefaultValue()
- .withDescription("The field to be split");
-
- public static final Option<List<String>> KEY_OUTPUT_FIELDS =
- Options.key("output_fields")
- .listType()
- .noDefaultValue()
- .withDescription("The result fields after split");
- private String separator;
- private String splitField;
- private int splitFieldIndex;
- private String[] outputFields;
- private String[] emptySplits;
-
- @Override
- public String getPluginName() {
- return "Split";
- }
-
- @Override
- protected void setConfig(Config pluginConfig) {
- CheckResult checkResult =
- CheckConfigUtil.checkAllExists(
- pluginConfig,
- KEY_SEPARATOR.key(),
- KEY_SPLIT_FIELD.key(),
- KEY_OUTPUT_FIELDS.key());
- if (!checkResult.isSuccess()) {
- throw new IllegalArgumentException("Failed to check config! " + checkResult.getMsg());
- }
-
- separator = pluginConfig.getString(KEY_SEPARATOR.key());
- splitField = pluginConfig.getString(KEY_SPLIT_FIELD.key());
- outputFields = pluginConfig.getStringList(KEY_OUTPUT_FIELDS.key()).toArray(new String[0]);
- emptySplits = new String[outputFields.length];
- }
-
- @Override
- protected void setInputRowType(SeaTunnelRowType rowType) {
- splitFieldIndex = rowType.indexOf(splitField);
- if (splitFieldIndex == -1) {
- throw new IllegalArgumentException(
- "Cannot find [" + splitField + "] field in input row type");
- }
- }
-
- @Override
- protected String[] getOutputFieldNames() {
- return outputFields;
- }
-
- @Override
- protected SeaTunnelDataType[] getOutputFieldDataTypes() {
- return IntStream.range(0, outputFields.length)
- .mapToObj((IntFunction<SeaTunnelDataType>) value -> BasicType.STRING_TYPE)
- .toArray(value -> new SeaTunnelDataType[value]);
- }
-
- @Override
- protected Object[] getOutputFieldValues(SeaTunnelRowAccessor inputRow) {
- Object splitFieldValue = inputRow.getField(splitFieldIndex);
- if (splitFieldValue == null) {
- return emptySplits;
- }
-
- String[] splitFieldValues =
- splitFieldValue.toString().split(separator, outputFields.length);
- if (splitFieldValues.length < outputFields.length) {
- String[] tmp = splitFieldValues;
- splitFieldValues = new String[outputFields.length];
- System.arraycopy(tmp, 0, splitFieldValues, 0, tmp.length);
- }
- return splitFieldValues;
- }
-}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
new file mode 100644
index 000000000..faf73ef72
--- /dev/null
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.common;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import lombok.NonNull;
+
+public abstract class AbstractCatalogSupportTransform extends AbstractSeaTunnelTransform {
+ protected CatalogTable inputCatalogTable;
+
+ protected volatile CatalogTable outputCatalogTable;
+
+ public AbstractCatalogSupportTransform() {
+ super();
+ }
+
+ public AbstractCatalogSupportTransform(@NonNull CatalogTable inputCatalogTable) {
+ this.inputCatalogTable = inputCatalogTable;
+ }
+
+ @Override
+ public CatalogTable getProducedCatalogTable() {
+ if (outputCatalogTable == null) {
+ synchronized (this) {
+ if (outputCatalogTable == null) {
+ outputCatalogTable = transformCatalogTable();
+ }
+ }
+ }
+
+ return outputCatalogTable;
+ }
+
+ private CatalogTable transformCatalogTable() {
+ TableIdentifier tableIdentifier = transformTableIdentifier();
+ TableSchema tableSchema = transformTableSchema();
+ CatalogTable catalogTable =
+ CatalogTable.of(
+ tableIdentifier,
+ tableSchema,
+ inputCatalogTable.getOptions(),
+ inputCatalogTable.getPartitionKeys(),
+ inputCatalogTable.getComment());
+ return catalogTable;
+ }
+
+ protected abstract TableSchema transformTableSchema();
+
+ protected abstract TableIdentifier transformTableIdentifier();
+
+ @Override
+ public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+ if (outputRowType != null) {
+ return outputRowType;
+ }
+ return getProducedCatalogTable().getTableSchema().toPhysicalRowDataType();
+ }
+}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
index 272e79db1..f923b1c93 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -106,4 +107,12 @@ public abstract class AbstractSeaTunnelTransform implements SeaTunnelTransform<S
return new SeaTunnelRowType(fieldNames, fieldTypes);
}
+
+ @Override
+ public CatalogTable getProducedCatalogTable() {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Connector %s must implement TableTransformFactory.createTransform method",
+ getPluginName()));
+ }
}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
index 98121e5c2..438435df2 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
@@ -17,19 +17,25 @@
package org.apache.seatunnel.transform.common;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
+import java.util.stream.Collectors;
@Slf4j
-public abstract class MultipleFieldOutputTransform extends AbstractSeaTunnelTransform {
+public abstract class MultipleFieldOutputTransform extends AbstractCatalogSupportTransform {
private static final String[] TYPE_ARRAY_STRING = new String[0];
private static final SeaTunnelDataType[] TYPE_ARRAY_SEATUNNEL_DATA_TYPE =
@@ -39,6 +45,14 @@ public abstract class MultipleFieldOutputTransform extends AbstractSeaTunnelTran
private int[] fieldsIndex;
private SeaTunnelRowContainerGenerator rowContainerGenerator;
+ public MultipleFieldOutputTransform() {
+ super();
+ }
+
+ public MultipleFieldOutputTransform(@NonNull CatalogTable inputCatalogTable) {
+ super(inputCatalogTable);
+ }
+
@Override
protected SeaTunnelRowType transformRowType(SeaTunnelRowType inputRowType) {
setInputRowType(inputRowType);
@@ -157,4 +171,78 @@ public abstract class MultipleFieldOutputTransform extends AbstractSeaTunnelTran
* @return
*/
protected abstract Object[] getOutputFieldValues(SeaTunnelRowAccessor inputRow);
+
+ @Override
+ protected TableSchema transformTableSchema() {
+ Column[] outputColumns = getOutputColumns();
+ outputFieldNames =
+ Arrays.stream(outputColumns)
+ .map(Column::getName)
+ .collect(Collectors.toList())
+ .toArray(TYPE_ARRAY_STRING);
+ TableSchema.Builder builder =
+ TableSchema.builder()
+ .primaryKey(inputCatalogTable.getTableSchema().getPrimaryKey())
+ .constraintKey(inputCatalogTable.getTableSchema().getConstraintKeys());
+ List<Column> copyInputColumns =
+ inputCatalogTable.getTableSchema().getColumns().stream()
+ .map(Column::copy)
+ .collect(Collectors.toList());
+
+ int addFieldCount = 0;
+ this.fieldsIndex = new int[outputColumns.length];
+ for (int i = 0; i < outputColumns.length; i++) {
+ for (int j = 0; j < copyInputColumns.size(); j++) {
+ if (copyInputColumns.get(j).getName().equals(outputColumns[i].getName())) {
+ copyInputColumns.set(j, outputColumns[i]);
+ } else {
+ addFieldCount++;
+ copyInputColumns.add(outputColumns[i]);
+ }
+ }
+ }
+
+ TableSchema outputTableSchema = builder.columns(copyInputColumns).build();
+ if (addFieldCount > 0) {
+ int inputFieldLength =
+ inputCatalogTable.getTableSchema().toPhysicalRowDataType().getTotalFields();
+ int outputFieldLength = copyInputColumns.size();
+
+ rowContainerGenerator =
+ new SeaTunnelRowContainerGenerator() {
+ @Override
+ public SeaTunnelRow apply(SeaTunnelRow inputRow) {
+ // todo reuse array container
+ Object[] outputFieldValues = new Object[outputFieldLength];
+ System.arraycopy(
+ inputRow.getFields(),
+ 0,
+ outputFieldValues,
+ 0,
+ inputFieldLength);
+
+ SeaTunnelRow outputRow = new SeaTunnelRow(outputFieldValues);
+ outputRow.setTableId(inputRow.getTableId());
+ outputRow.setRowKind(inputRow.getRowKind());
+ return outputRow;
+ }
+ };
+ } else {
+ rowContainerGenerator = SeaTunnelRowContainerGenerator.REUSE_ROW;
+ }
+
+ log.info(
+ "Changed input table schema: {} to output table schema: {}",
+ inputCatalogTable.getTableSchema(),
+ outputTableSchema);
+
+ return outputTableSchema;
+ }
+
+ @Override
+ protected TableIdentifier transformTableIdentifier() {
+ return inputCatalogTable.getTableId();
+ }
+
+ protected abstract Column[] getOutputColumns();
}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java
new file mode 100644
index 000000000..4d7b543fb
--- /dev/null
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.split;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.ConfigValidator;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform;
+import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+
+import com.google.auto.service.AutoService;
+import lombok.NonNull;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.IntFunction;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+@AutoService(SeaTunnelTransform.class)
+public class SplitTransform extends MultipleFieldOutputTransform {
+ private SplitTransformConfig splitTransformConfig;
+ private int splitFieldIndex;
+
+ public SplitTransform() {
+ super();
+ }
+
+ public SplitTransform(
+ @NonNull SplitTransformConfig splitTransformConfig,
+ @NonNull CatalogTable catalogTable) {
+ super(catalogTable);
+ this.splitTransformConfig = splitTransformConfig;
+ SeaTunnelRowType seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType();
+ splitFieldIndex = seaTunnelRowType.indexOf(splitTransformConfig.getSplitField());
+ if (splitFieldIndex == -1) {
+ throw new IllegalArgumentException(
+ "Cannot find ["
+ + splitTransformConfig.getSplitField()
+ + "] field in input row type");
+ }
+ this.outputCatalogTable = getProducedCatalogTable();
+ }
+
+ @Override
+ public String getPluginName() {
+ return "Split";
+ }
+
+ @Override
+ protected void setConfig(Config pluginConfig) {
+ ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig))
+ .validate(new SplitTransformFactory().optionRule());
+ this.splitTransformConfig =
+ SplitTransformConfig.of(ReadonlyConfig.fromConfig(pluginConfig));
+ }
+
+ @Override
+ protected void setInputRowType(SeaTunnelRowType rowType) {
+ splitFieldIndex = rowType.indexOf(splitTransformConfig.getSplitField());
+ if (splitFieldIndex == -1) {
+ throw new IllegalArgumentException(
+ "Cannot find ["
+ + splitTransformConfig.getSplitField()
+ + "] field in input row type");
+ }
+ }
+
+ @Override
+ protected String[] getOutputFieldNames() {
+ return splitTransformConfig.getOutputFields();
+ }
+
+ @Override
+ protected SeaTunnelDataType[] getOutputFieldDataTypes() {
+ return IntStream.range(0, splitTransformConfig.getOutputFields().length)
+ .mapToObj((IntFunction<SeaTunnelDataType>) value -> BasicType.STRING_TYPE)
+ .toArray(value -> new SeaTunnelDataType[value]);
+ }
+
+ @Override
+ protected Object[] getOutputFieldValues(SeaTunnelRowAccessor inputRow) {
+ Object splitFieldValue = inputRow.getField(splitFieldIndex);
+ if (splitFieldValue == null) {
+ return splitTransformConfig.getEmptySplits();
+ }
+
+ String[] splitFieldValues =
+ splitFieldValue
+ .toString()
+ .split(
+ splitTransformConfig.getSeparator(),
+ splitTransformConfig.getOutputFields().length);
+ if (splitFieldValues.length < splitTransformConfig.getOutputFields().length) {
+ String[] tmp = splitFieldValues;
+ splitFieldValues = new String[splitTransformConfig.getOutputFields().length];
+ System.arraycopy(tmp, 0, splitFieldValues, 0, tmp.length);
+ }
+ return splitFieldValues;
+ }
+
+ @Override
+ protected Column[] getOutputColumns() {
+ List<PhysicalColumn> collect =
+ Arrays.stream(splitTransformConfig.getOutputFields())
+ .map(
+ fieldName -> {
+ return PhysicalColumn.of(
+ fieldName, BasicType.STRING_TYPE, 200, true, "", "");
+ })
+ .collect(Collectors.toList());
+ return collect.toArray(new Column[0]);
+ }
+}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransformConfig.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransformConfig.java
new file mode 100644
index 000000000..c645ce700
--- /dev/null
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransformConfig.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.split;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Getter
+@Setter
+public class SplitTransformConfig implements Serializable {
+ public static final Option<String> KEY_SEPARATOR =
+ Options.key("separator")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The separator to split the field");
+
+ public static final Option<String> KEY_SPLIT_FIELD =
+ Options.key("split_field")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The field to be split");
+
+ public static final Option<List<String>> KEY_OUTPUT_FIELDS =
+ Options.key("output_fields")
+ .listType()
+ .noDefaultValue()
+ .withDescription("The result fields after split");
+
+ private String separator;
+ private String splitField;
+ private String[] outputFields;
+ private String[] emptySplits;
+
+ public static SplitTransformConfig of(ReadonlyConfig config) {
+ SplitTransformConfig splitTransformConfig = new SplitTransformConfig();
+ splitTransformConfig.setSeparator(config.get(KEY_SEPARATOR));
+ splitTransformConfig.setSplitField(config.get(KEY_SPLIT_FIELD));
+ splitTransformConfig.setOutputFields(config.get(KEY_OUTPUT_FIELDS).toArray(new String[0]));
+ splitTransformConfig.setEmptySplits(
+ new String[splitTransformConfig.getOutputFields().length]);
+ return splitTransformConfig;
+ }
+}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/SplitTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransformFactory.java
similarity index 62%
rename from seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/SplitTransformFactory.java
rename to seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransformFactory.java
index 8c71f50df..91281251e 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/SplitTransformFactory.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransformFactory.java
@@ -15,17 +15,17 @@
* limitations under the License.
*/
-package org.apache.seatunnel.transform;
+package org.apache.seatunnel.transform.split;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableTransform;
import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableFactoryContext;
import org.apache.seatunnel.api.table.factory.TableTransformFactory;
import com.google.auto.service.AutoService;
-
-import static org.apache.seatunnel.transform.SplitTransform.KEY_OUTPUT_FIELDS;
-import static org.apache.seatunnel.transform.SplitTransform.KEY_SEPARATOR;
-import static org.apache.seatunnel.transform.SplitTransform.KEY_SPLIT_FIELD;
+import lombok.NonNull;
@AutoService(Factory.class)
public class SplitTransformFactory implements TableTransformFactory {
@@ -37,7 +37,17 @@ public class SplitTransformFactory implements TableTransformFactory {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(KEY_SEPARATOR, KEY_SPLIT_FIELD, KEY_OUTPUT_FIELDS)
+ .required(
+ SplitTransformConfig.KEY_SEPARATOR,
+ SplitTransformConfig.KEY_SPLIT_FIELD,
+ SplitTransformConfig.KEY_OUTPUT_FIELDS)
.build();
}
+
+ @Override
+ public TableTransform createTransform(@NonNull TableFactoryContext context) {
+ SplitTransformConfig splitTransformConfig = SplitTransformConfig.of(context.getOptions());
+ CatalogTable catalogTable = context.getCatalogTable();
+ return () -> new SplitTransform(splitTransformConfig, catalogTable);
+ }
}
diff --git a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/SplitTransformFactoryTest.java b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/SplitTransformFactoryTest.java
index 0648d3067..80264b21d 100644
--- a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/SplitTransformFactoryTest.java
+++ b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/SplitTransformFactoryTest.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.transform;
+import org.apache.seatunnel.transform.split.SplitTransformFactory;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;