You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/03/24 10:12:32 UTC

[incubator-seatunnel] branch dev updated: [Feature][Transform V2 & Zeta] Make SplitTransform Support CatalogTable And CatalogTable Evolution (#4396)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new cee6c1983 [Feature][Transform V2 & Zeta] Make SplitTransform Support CatalogTable And CatalogTable Evolution (#4396)
cee6c1983 is described below

commit cee6c198358b063c5fabe2d028620b38edba2251
Author: Eric <ga...@gmail.com>
AuthorDate: Fri Mar 24 18:12:24 2023 +0800

    [Feature][Transform V2 & Zeta] Make SplitTransform Support CatalogTable And CatalogTable Evolution (#4396)
---
 .../apache/seatunnel/api/table/catalog/Column.java |   3 +
 .../api/table/catalog/MetadataColumn.java          |   6 +
 .../api/table/catalog/PhysicalColumn.java          |   5 +
 .../seatunnel/api/table/catalog/TableSchema.java   |   5 +
 .../seatunnel/api/table/factory/FactoryUtil.java   |  14 +++
 .../api/transform/SeaTunnelTransform.java          |   8 ++
 .../core/parse/MultipleTableJobConfigParser.java   |  21 +++-
 .../apache/seatunnel/transform/SplitTransform.java | 125 -------------------
 .../common/AbstractCatalogSupportTransform.java    |  78 ++++++++++++
 .../common/AbstractSeaTunnelTransform.java         |   9 ++
 .../common/MultipleFieldOutputTransform.java       |  90 +++++++++++++-
 .../seatunnel/transform/split/SplitTransform.java  | 137 +++++++++++++++++++++
 .../transform/split/SplitTransformConfig.java      |  65 ++++++++++
 .../{ => split}/SplitTransformFactory.java         |  22 +++-
 .../transform/SplitTransformFactoryTest.java       |   2 +
 15 files changed, 457 insertions(+), 133 deletions(-)

diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
index 0b914fbc7..b528996a3 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
@@ -77,4 +77,7 @@ public abstract class Column implements Serializable {
 
     /** Returns a copy of the column with a replaced {@link SeaTunnelDataType}. */
     public abstract Column copy(SeaTunnelDataType<?> newType);
+
+    /** Returns a copy of the column. */
+    public abstract Column copy();
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataColumn.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataColumn.java
index d98f7d27e..5325dac79 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataColumn.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataColumn.java
@@ -64,4 +64,10 @@ public class MetadataColumn extends Column {
         return MetadataColumn.of(
                 name, newType, columnLength, metadataKey, nullable, defaultValue, comment);
     }
+
+    @Override
+    public Column copy() {
+        return MetadataColumn.of(
+                name, dataType, columnLength, metadataKey, nullable, defaultValue, comment);
+    }
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PhysicalColumn.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PhysicalColumn.java
index b29d7a357..bc379e355 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PhysicalColumn.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PhysicalColumn.java
@@ -57,4 +57,9 @@ public class PhysicalColumn extends Column {
     public Column copy(SeaTunnelDataType<?> newType) {
         return PhysicalColumn.of(name, newType, columnLength, nullable, defaultValue, comment);
     }
+
+    @Override
+    public Column copy() {
+        return PhysicalColumn.of(name, dataType, columnLength, nullable, defaultValue, comment);
+    }
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
index 1f3d7e6b3..bd494bfd8 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
@@ -83,6 +83,11 @@ public final class TableSchema implements Serializable {
             return this;
         }
 
+        public Builder constraintKey(List<ConstraintKey> constraintKeys) {
+            this.constraintKeys.addAll(constraintKeys);
+            return this;
+        }
+
         public TableSchema build() {
             return new TableSchema(columns, primaryKey, constraintKeys);
         }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
index ec1ed90f2..a330b4e37 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
@@ -33,6 +33,7 @@ import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.connector.TableSource;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -309,4 +310,17 @@ public final class FactoryUtil {
 
         return sinkOptionRule;
     }
+
+    public static SeaTunnelTransform<?> createAndPrepareTransform(
+            CatalogTable catalogTable,
+            ReadonlyConfig options,
+            ClassLoader classLoader,
+            String factoryIdentifier) {
+        final TableTransformFactory factory =
+                discoverFactory(classLoader, TableTransformFactory.class, factoryIdentifier);
+        TableFactoryContext context =
+                new TableFactoryContext(
+                        Collections.singletonList(catalogTable), options, classLoader);
+        return factory.createTransform(context).createTransform();
+    }
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java
index 292486f7d..54b0e390a 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.api.transform;
 import org.apache.seatunnel.api.common.PluginIdentifierInterface;
 import org.apache.seatunnel.api.common.SeaTunnelPluginLifeCycle;
 import org.apache.seatunnel.api.source.SeaTunnelJobAware;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 
 import java.io.Serializable;
@@ -47,6 +48,13 @@ public interface SeaTunnelTransform<T>
      */
     SeaTunnelDataType<T> getProducedType();
 
+    /**
+     * Get the catalog table output by this transform
+     *
+     * @return
+     */
+    CatalogTable getProducedCatalogTable();
+
     /**
      * Transform input data to {@link this#getProducedType()} types data.
      *
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index ad77a0a05..82ea1d62f 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -40,6 +40,7 @@ import org.apache.seatunnel.api.table.factory.TableSourceFactory;
 import org.apache.seatunnel.api.table.factory.TableTransformFactory;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.TypesafeConfigUtils;
@@ -325,6 +326,9 @@ public class MultipleTableJobConfigParser {
         Config config = transforms.poll();
         final ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(config);
         final String factoryId = getFactoryId(readonlyConfig);
+        // get factory urls
+        Set<URL> factoryUrls =
+                getFactoryUrls(readonlyConfig, classLoader, TableTransformFactory.class, factoryId);
         final List<String> inputIds = getInputIds(readonlyConfig);
 
         List<Tuple2<CatalogTable, Action>> inputs =
@@ -383,7 +387,22 @@ public class MultipleTableJobConfigParser {
             return;
         }
 
-        // TODO: TableTransformFactory is not available.
+        CatalogTable catalogTable = inputs.get(0)._1();
+        SeaTunnelTransform<?> transform =
+                FactoryUtil.createAndPrepareTransform(
+                        catalogTable, readonlyConfig, classLoader, factoryId);
+        long id = idGenerator.getNextId();
+        String actionName =
+                JobConfigParser.createTransformActionName(
+                        0, factoryId, JobConfigParser.getTableName(config));
+
+        TransformAction transformAction =
+                new TransformAction(
+                        id, actionName, new ArrayList<>(inputActions), transform, factoryUrls);
+        tableWithActionMap.put(
+                tableId,
+                Collections.singletonList(
+                        new Tuple2<>(transform.getProducedCatalogTable(), transformAction)));
         return;
     }
 
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/SplitTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/SplitTransform.java
deleted file mode 100644
index 6705f7a51..000000000
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/SplitTransform.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.transform;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.api.transform.SeaTunnelTransform;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform;
-import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
-
-import com.google.auto.service.AutoService;
-
-import java.util.List;
-import java.util.function.IntFunction;
-import java.util.stream.IntStream;
-
-@AutoService(SeaTunnelTransform.class)
-public class SplitTransform extends MultipleFieldOutputTransform {
-
-    public static final Option<String> KEY_SEPARATOR =
-            Options.key("separator")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("The separator to split the field");
-
-    public static final Option<String> KEY_SPLIT_FIELD =
-            Options.key("split_field")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("The field to be split");
-
-    public static final Option<List<String>> KEY_OUTPUT_FIELDS =
-            Options.key("output_fields")
-                    .listType()
-                    .noDefaultValue()
-                    .withDescription("The result fields after split");
-    private String separator;
-    private String splitField;
-    private int splitFieldIndex;
-    private String[] outputFields;
-    private String[] emptySplits;
-
-    @Override
-    public String getPluginName() {
-        return "Split";
-    }
-
-    @Override
-    protected void setConfig(Config pluginConfig) {
-        CheckResult checkResult =
-                CheckConfigUtil.checkAllExists(
-                        pluginConfig,
-                        KEY_SEPARATOR.key(),
-                        KEY_SPLIT_FIELD.key(),
-                        KEY_OUTPUT_FIELDS.key());
-        if (!checkResult.isSuccess()) {
-            throw new IllegalArgumentException("Failed to check config! " + checkResult.getMsg());
-        }
-
-        separator = pluginConfig.getString(KEY_SEPARATOR.key());
-        splitField = pluginConfig.getString(KEY_SPLIT_FIELD.key());
-        outputFields = pluginConfig.getStringList(KEY_OUTPUT_FIELDS.key()).toArray(new String[0]);
-        emptySplits = new String[outputFields.length];
-    }
-
-    @Override
-    protected void setInputRowType(SeaTunnelRowType rowType) {
-        splitFieldIndex = rowType.indexOf(splitField);
-        if (splitFieldIndex == -1) {
-            throw new IllegalArgumentException(
-                    "Cannot find [" + splitField + "] field in input row type");
-        }
-    }
-
-    @Override
-    protected String[] getOutputFieldNames() {
-        return outputFields;
-    }
-
-    @Override
-    protected SeaTunnelDataType[] getOutputFieldDataTypes() {
-        return IntStream.range(0, outputFields.length)
-                .mapToObj((IntFunction<SeaTunnelDataType>) value -> BasicType.STRING_TYPE)
-                .toArray(value -> new SeaTunnelDataType[value]);
-    }
-
-    @Override
-    protected Object[] getOutputFieldValues(SeaTunnelRowAccessor inputRow) {
-        Object splitFieldValue = inputRow.getField(splitFieldIndex);
-        if (splitFieldValue == null) {
-            return emptySplits;
-        }
-
-        String[] splitFieldValues =
-                splitFieldValue.toString().split(separator, outputFields.length);
-        if (splitFieldValues.length < outputFields.length) {
-            String[] tmp = splitFieldValues;
-            splitFieldValues = new String[outputFields.length];
-            System.arraycopy(tmp, 0, splitFieldValues, 0, tmp.length);
-        }
-        return splitFieldValues;
-    }
-}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
new file mode 100644
index 000000000..faf73ef72
--- /dev/null
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.common;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import lombok.NonNull;
+
+public abstract class AbstractCatalogSupportTransform extends AbstractSeaTunnelTransform {
+    protected CatalogTable inputCatalogTable;
+
+    protected volatile CatalogTable outputCatalogTable;
+
+    public AbstractCatalogSupportTransform() {
+        super();
+    }
+
+    public AbstractCatalogSupportTransform(@NonNull CatalogTable inputCatalogTable) {
+        this.inputCatalogTable = inputCatalogTable;
+    }
+
+    @Override
+    public CatalogTable getProducedCatalogTable() {
+        if (outputCatalogTable == null) {
+            synchronized (this) {
+                if (outputCatalogTable == null) {
+                    outputCatalogTable = transformCatalogTable();
+                }
+            }
+        }
+
+        return outputCatalogTable;
+    }
+
+    private CatalogTable transformCatalogTable() {
+        TableIdentifier tableIdentifier = transformTableIdentifier();
+        TableSchema tableSchema = transformTableSchema();
+        CatalogTable catalogTable =
+                CatalogTable.of(
+                        tableIdentifier,
+                        tableSchema,
+                        inputCatalogTable.getOptions(),
+                        inputCatalogTable.getPartitionKeys(),
+                        inputCatalogTable.getComment());
+        return catalogTable;
+    }
+
+    protected abstract TableSchema transformTableSchema();
+
+    protected abstract TableIdentifier transformTableIdentifier();
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+        if (outputRowType != null) {
+            return outputRowType;
+        }
+        return getProducedCatalogTable().getTableSchema().toPhysicalRowDataType();
+    }
+}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
index 272e79db1..f923b1c93 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -106,4 +107,12 @@ public abstract class AbstractSeaTunnelTransform implements SeaTunnelTransform<S
 
         return new SeaTunnelRowType(fieldNames, fieldTypes);
     }
+
+    @Override
+    public CatalogTable getProducedCatalogTable() {
+        throw new UnsupportedOperationException(
+                String.format(
+                        "Connector %s must implement TableTransformFactory.createTransform method",
+                        getPluginName()));
+    }
 }
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
index 98121e5c2..438435df2 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
@@ -17,19 +17,25 @@
 
 package org.apache.seatunnel.transform.common;
 
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 
+import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
+import java.util.stream.Collectors;
 
 @Slf4j
-public abstract class MultipleFieldOutputTransform extends AbstractSeaTunnelTransform {
+public abstract class MultipleFieldOutputTransform extends AbstractCatalogSupportTransform {
 
     private static final String[] TYPE_ARRAY_STRING = new String[0];
     private static final SeaTunnelDataType[] TYPE_ARRAY_SEATUNNEL_DATA_TYPE =
@@ -39,6 +45,14 @@ public abstract class MultipleFieldOutputTransform extends AbstractSeaTunnelTran
     private int[] fieldsIndex;
     private SeaTunnelRowContainerGenerator rowContainerGenerator;
 
+    public MultipleFieldOutputTransform() {
+        super();
+    }
+
+    public MultipleFieldOutputTransform(@NonNull CatalogTable inputCatalogTable) {
+        super(inputCatalogTable);
+    }
+
     @Override
     protected SeaTunnelRowType transformRowType(SeaTunnelRowType inputRowType) {
         setInputRowType(inputRowType);
@@ -157,4 +171,78 @@ public abstract class MultipleFieldOutputTransform extends AbstractSeaTunnelTran
      * @return
      */
     protected abstract Object[] getOutputFieldValues(SeaTunnelRowAccessor inputRow);
+
+    @Override
+    protected TableSchema transformTableSchema() {
+        Column[] outputColumns = getOutputColumns();
+        outputFieldNames =
+                Arrays.stream(outputColumns)
+                        .map(Column::getName)
+                        .collect(Collectors.toList())
+                        .toArray(TYPE_ARRAY_STRING);
+        TableSchema.Builder builder =
+                TableSchema.builder()
+                        .primaryKey(inputCatalogTable.getTableSchema().getPrimaryKey())
+                        .constraintKey(inputCatalogTable.getTableSchema().getConstraintKeys());
+        List<Column> copyInputColumns =
+                inputCatalogTable.getTableSchema().getColumns().stream()
+                        .map(Column::copy)
+                        .collect(Collectors.toList());
+
+        int addFieldCount = 0;
+        this.fieldsIndex = new int[outputColumns.length];
+        for (int i = 0; i < outputColumns.length; i++) {
+            for (int j = 0; j < copyInputColumns.size(); j++) {
+                if (copyInputColumns.get(j).getName().equals(outputColumns[i].getName())) {
+                    copyInputColumns.set(j, outputColumns[i]);
+                } else {
+                    addFieldCount++;
+                    copyInputColumns.add(outputColumns[i]);
+                }
+            }
+        }
+
+        TableSchema outputTableSchema = builder.columns(copyInputColumns).build();
+        if (addFieldCount > 0) {
+            int inputFieldLength =
+                    inputCatalogTable.getTableSchema().toPhysicalRowDataType().getTotalFields();
+            int outputFieldLength = copyInputColumns.size();
+
+            rowContainerGenerator =
+                    new SeaTunnelRowContainerGenerator() {
+                        @Override
+                        public SeaTunnelRow apply(SeaTunnelRow inputRow) {
+                            // todo reuse array container
+                            Object[] outputFieldValues = new Object[outputFieldLength];
+                            System.arraycopy(
+                                    inputRow.getFields(),
+                                    0,
+                                    outputFieldValues,
+                                    0,
+                                    inputFieldLength);
+
+                            SeaTunnelRow outputRow = new SeaTunnelRow(outputFieldValues);
+                            outputRow.setTableId(inputRow.getTableId());
+                            outputRow.setRowKind(inputRow.getRowKind());
+                            return outputRow;
+                        }
+                    };
+        } else {
+            rowContainerGenerator = SeaTunnelRowContainerGenerator.REUSE_ROW;
+        }
+
+        log.info(
+                "Changed input table schema: {} to output table schema: {}",
+                inputCatalogTable.getTableSchema(),
+                outputTableSchema);
+
+        return outputTableSchema;
+    }
+
+    @Override
+    protected TableIdentifier transformTableIdentifier() {
+        return inputCatalogTable.getTableId();
+    }
+
+    protected abstract Column[] getOutputColumns();
 }
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java
new file mode 100644
index 000000000..4d7b543fb
--- /dev/null
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.split;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.ConfigValidator;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform;
+import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+
+import com.google.auto.service.AutoService;
+import lombok.NonNull;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.IntFunction;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+@AutoService(SeaTunnelTransform.class)
+public class SplitTransform extends MultipleFieldOutputTransform {
+    private SplitTransformConfig splitTransformConfig;
+    private int splitFieldIndex;
+
+    public SplitTransform() {
+        super();
+    }
+
+    public SplitTransform(
+            @NonNull SplitTransformConfig splitTransformConfig,
+            @NonNull CatalogTable catalogTable) {
+        super(catalogTable);
+        this.splitTransformConfig = splitTransformConfig;
+        SeaTunnelRowType seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType();
+        splitFieldIndex = seaTunnelRowType.indexOf(splitTransformConfig.getSplitField());
+        if (splitFieldIndex == -1) {
+            throw new IllegalArgumentException(
+                    "Cannot find ["
+                            + splitTransformConfig.getSplitField()
+                            + "] field in input row type");
+        }
+        this.outputCatalogTable = getProducedCatalogTable();
+    }
+
+    @Override
+    public String getPluginName() {
+        return "Split";
+    }
+
+    @Override
+    protected void setConfig(Config pluginConfig) {
+        ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig))
+                .validate(new SplitTransformFactory().optionRule());
+        this.splitTransformConfig =
+                SplitTransformConfig.of(ReadonlyConfig.fromConfig(pluginConfig));
+    }
+
+    @Override
+    protected void setInputRowType(SeaTunnelRowType rowType) {
+        splitFieldIndex = rowType.indexOf(splitTransformConfig.getSplitField());
+        if (splitFieldIndex == -1) {
+            throw new IllegalArgumentException(
+                    "Cannot find ["
+                            + splitTransformConfig.getSplitField()
+                            + "] field in input row type");
+        }
+    }
+
+    @Override
+    protected String[] getOutputFieldNames() {
+        return splitTransformConfig.getOutputFields();
+    }
+
+    @Override
+    protected SeaTunnelDataType[] getOutputFieldDataTypes() {
+        return IntStream.range(0, splitTransformConfig.getOutputFields().length)
+                .mapToObj((IntFunction<SeaTunnelDataType>) value -> BasicType.STRING_TYPE)
+                .toArray(value -> new SeaTunnelDataType[value]);
+    }
+
+    @Override
+    protected Object[] getOutputFieldValues(SeaTunnelRowAccessor inputRow) {
+        Object splitFieldValue = inputRow.getField(splitFieldIndex);
+        if (splitFieldValue == null) {
+            return splitTransformConfig.getEmptySplits();
+        }
+
+        String[] splitFieldValues =
+                splitFieldValue
+                        .toString()
+                        .split(
+                                splitTransformConfig.getSeparator(),
+                                splitTransformConfig.getOutputFields().length);
+        if (splitFieldValues.length < splitTransformConfig.getOutputFields().length) {
+            String[] tmp = splitFieldValues;
+            splitFieldValues = new String[splitTransformConfig.getOutputFields().length];
+            System.arraycopy(tmp, 0, splitFieldValues, 0, tmp.length);
+        }
+        return splitFieldValues;
+    }
+
+    @Override
+    protected Column[] getOutputColumns() {
+        List<PhysicalColumn> collect =
+                Arrays.stream(splitTransformConfig.getOutputFields())
+                        .map(
+                                fieldName -> {
+                                    return PhysicalColumn.of(
+                                            fieldName, BasicType.STRING_TYPE, 200, true, "", "");
+                                })
+                        .collect(Collectors.toList());
+        return collect.toArray(new Column[0]);
+    }
+}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransformConfig.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransformConfig.java
new file mode 100644
index 000000000..c645ce700
--- /dev/null
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransformConfig.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.split;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Getter
+@Setter
+public class SplitTransformConfig implements Serializable {
+    public static final Option<String> KEY_SEPARATOR =
+            Options.key("separator")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The separator to split the field");
+
+    public static final Option<String> KEY_SPLIT_FIELD =
+            Options.key("split_field")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The field to be split");
+
+    public static final Option<List<String>> KEY_OUTPUT_FIELDS =
+            Options.key("output_fields")
+                    .listType()
+                    .noDefaultValue()
+                    .withDescription("The result fields after split");
+
+    private String separator;
+    private String splitField;
+    private String[] outputFields;
+    private String[] emptySplits;
+
+    public static SplitTransformConfig of(ReadonlyConfig config) {
+        SplitTransformConfig splitTransformConfig = new SplitTransformConfig();
+        splitTransformConfig.setSeparator(config.get(KEY_SEPARATOR));
+        splitTransformConfig.setSplitField(config.get(KEY_SPLIT_FIELD));
+        splitTransformConfig.setOutputFields(config.get(KEY_OUTPUT_FIELDS).toArray(new String[0]));
+        splitTransformConfig.setEmptySplits(
+                new String[splitTransformConfig.getOutputFields().length]);
+        return splitTransformConfig;
+    }
+}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/SplitTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransformFactory.java
similarity index 62%
rename from seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/SplitTransformFactory.java
rename to seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransformFactory.java
index 8c71f50df..91281251e 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/SplitTransformFactory.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransformFactory.java
@@ -15,17 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.transform;
+package org.apache.seatunnel.transform.split;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableTransform;
 import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableFactoryContext;
 import org.apache.seatunnel.api.table.factory.TableTransformFactory;
 
 import com.google.auto.service.AutoService;
-
-import static org.apache.seatunnel.transform.SplitTransform.KEY_OUTPUT_FIELDS;
-import static org.apache.seatunnel.transform.SplitTransform.KEY_SEPARATOR;
-import static org.apache.seatunnel.transform.SplitTransform.KEY_SPLIT_FIELD;
+import lombok.NonNull;
 
 @AutoService(Factory.class)
 public class SplitTransformFactory implements TableTransformFactory {
@@ -37,7 +37,17 @@ public class SplitTransformFactory implements TableTransformFactory {
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(KEY_SEPARATOR, KEY_SPLIT_FIELD, KEY_OUTPUT_FIELDS)
+                .required(
+                        SplitTransformConfig.KEY_SEPARATOR,
+                        SplitTransformConfig.KEY_SPLIT_FIELD,
+                        SplitTransformConfig.KEY_OUTPUT_FIELDS)
                 .build();
     }
+
+    @Override
+    public TableTransform createTransform(@NonNull TableFactoryContext context) {
+        SplitTransformConfig splitTransformConfig = SplitTransformConfig.of(context.getOptions());
+        CatalogTable catalogTable = context.getCatalogTable();
+        return () -> new SplitTransform(splitTransformConfig, catalogTable);
+    }
 }
diff --git a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/SplitTransformFactoryTest.java b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/SplitTransformFactoryTest.java
index 0648d3067..80264b21d 100644
--- a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/SplitTransformFactoryTest.java
+++ b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/SplitTransformFactoryTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.transform;
 
+import org.apache.seatunnel.transform.split.SplitTransformFactory;
+
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;