You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2023/03/28 06:39:52 UTC

[incubator-seatunnel] branch dev updated: [Feature][Transform] Add CatalogTable support for ReplaceTransform (#4411)

This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 60cbeb08b [Feature][Transform] Add CatalogTable support for ReplaceTransform (#4411)
60cbeb08b is described below

commit 60cbeb08b65944219942c8984a19844947ec7f4a
Author: Eric <ga...@gmail.com>
AuthorDate: Tue Mar 28 14:39:46 2023 +0800

    [Feature][Transform] Add CatalogTable support for ReplaceTransform (#4411)
    
    * Make Transform Support CatalogTable And CatalogTable Evolution
    
    * revert example code
    
    * Update seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
    
    Co-authored-by: hailin0 <wa...@apache.org>
    
    * Fix CI problems
    
    * Fix CI problems
    
    * Make ReplaceTransform Support CatalogTable
    
    * Update seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
    
    Co-authored-by: hailin0 <wa...@apache.org>
    
    * Update seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
    
    Co-authored-by: hailin0 <wa...@apache.org>
    
    * fix review problems
    
    ---------
    
    Co-authored-by: hailin0 <wa...@apache.org>
---
 .../test/resources/iotdb/iotdb_source_to_sink.conf |   1 +
 .../src/test/resources/copy_transform.conf         |   3 -
 .../resources/filter_row_kind_exclude_delete.conf  |   3 -
 .../resources/filter_row_kind_exclude_insert.conf  |   3 -
 .../resources/filter_row_kind_include_insert.conf  |   3 -
 .../src/test/resources/filter_transform.conf       |   3 -
 .../src/test/resources/split_transform.conf        |   3 -
 .../src/test/resources/replace_transform.conf      |   1 +
 .../seatunnel/transform/ReplaceTransform.java      | 137 -------------------
 .../common/MultipleFieldOutputTransform.java       |  24 ++--
 .../common/SingleFieldOutputTransform.java         |  98 +++++++++++++-
 .../transform/replace/ReplaceTransform.java        | 145 +++++++++++++++++++++
 .../transform/replace/ReplaceTransformConfig.java  |  56 ++++++++
 .../{ => replace}/ReplaceTransformFactory.java     |  29 +++--
 .../transform/ReplaceTransformFactoryTest.java     |   2 +
 15 files changed, 336 insertions(+), 175 deletions(-)

diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iotdb-e2e/src/test/resources/iotdb/iotdb_source_to_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iotdb-e2e/src/test/resources/iotdb/iotdb_source_to_sink.conf
index 012ece4ea..10e4eef92 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iotdb-e2e/src/test/resources/iotdb/iotdb_source_to_sink.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iotdb-e2e/src/test/resources/iotdb/iotdb_source_to_sink.conf
@@ -62,6 +62,7 @@ transform {
         pattern = "root.source_group"
         replacement = "root.sink_group"
         is_regex = false
+        replace_first = true
     }
 }
 
diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/copy_transform.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/copy_transform.conf
index 68c9e4e52..25ca4ce5f 100644
--- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/copy_transform.conf
+++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/copy_transform.conf
@@ -54,9 +54,6 @@ transform {
 }
 
 sink {
-  Console {
-    source_table_name = "fake2"
-  }
   Assert {
     source_table_name = "fake2"
     rules =
diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_delete.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_delete.conf
index 44ea90a52..f7fc0f6e0 100644
--- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_delete.conf
+++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_delete.conf
@@ -45,9 +45,6 @@ transform {
 }
 
 sink {
-  Console {
-    source_table_name = "fake1"
-  }
   Assert {
     source_table_name = "fake1"
     rules =
diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_insert.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_insert.conf
index 0fe31c921..cc3641778 100644
--- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_insert.conf
+++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_insert.conf
@@ -45,9 +45,6 @@ transform {
 }
 
 sink {
-  Console {
-    source_table_name = "fake1"
-  }
   Assert {
     source_table_name = "fake1"
     rules =
diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_include_insert.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_include_insert.conf
index 2ad1fec2b..d1fbf79be 100644
--- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_include_insert.conf
+++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_include_insert.conf
@@ -45,9 +45,6 @@ transform {
 }
 
 sink {
-  Console {
-    source_table_name = "fake1"
-  }
   Assert {
     source_table_name = "fake1"
     rules =
diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_transform.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_transform.conf
index 889ebe76f..56439b441 100644
--- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_transform.conf
+++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_transform.conf
@@ -45,9 +45,6 @@ transform {
 }
 
 sink {
-  Console {
-    source_table_name = "fake1"
-  }
   Assert {
     source_table_name = "fake1"
     rules =
diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/split_transform.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/split_transform.conf
index 4627e25f0..61e10f694 100644
--- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/split_transform.conf
+++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/split_transform.conf
@@ -47,9 +47,6 @@ transform {
 }
 
 sink {
-  Console {
-    source_table_name = "fake1"
-  }
   Assert {
     source_table_name = "fake1"
     rules =
diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/replace_transform.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/replace_transform.conf
index 4f5942c20..95150fb0d 100644
--- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/replace_transform.conf
+++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/replace_transform.conf
@@ -43,6 +43,7 @@ transform {
     pattern = ".+"
     replacement = "b"
     is_regex = true
+    replace_first = true
   }
 }
 
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/ReplaceTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/ReplaceTransform.java
deleted file mode 100644
index b95d2b4fc..000000000
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/ReplaceTransform.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.transform;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.api.transform.SeaTunnelTransform;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
-import org.apache.seatunnel.transform.common.SingleFieldOutputTransform;
-
-import com.google.auto.service.AutoService;
-
-@AutoService(SeaTunnelTransform.class)
-public class ReplaceTransform extends SingleFieldOutputTransform {
-
-    public static final Option<String> KEY_REPLACE_FIELD =
-            Options.key("replace_field")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("The field you want to replace");
-
-    public static final Option<String> KEY_PATTERN =
-            Options.key("pattern")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("The old string that will be replaced");
-
-    public static final Option<String> KEY_REPLACEMENT =
-            Options.key("replacement")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("The new string for replace");
-
-    public static final Option<Boolean> KEY_IS_REGEX =
-            Options.key("is_regex")
-                    .booleanType()
-                    .defaultValue(false)
-                    .withDescription("Use regex for string match");
-
-    public static final Option<Boolean> KEY_REPLACE_FIRST =
-            Options.key("replace_first")
-                    .booleanType()
-                    .noDefaultValue()
-                    .withDescription("Replace the first match string");
-
-    private int inputFieldIndex;
-    private String replaceField;
-    private String pattern;
-    private String replacement;
-    private boolean isRegex;
-    private boolean replaceFirst;
-
-    @Override
-    public String getPluginName() {
-        return "Replace";
-    }
-
-    @Override
-    protected void setConfig(Config pluginConfig) {
-        CheckResult checkResult =
-                CheckConfigUtil.checkAllExists(
-                        pluginConfig,
-                        KEY_REPLACE_FIELD.key(),
-                        KEY_PATTERN.key(),
-                        KEY_REPLACEMENT.key());
-        if (!checkResult.isSuccess()) {
-            throw new IllegalArgumentException("Failed to check config! " + checkResult.getMsg());
-        }
-
-        replaceField = pluginConfig.getString(KEY_REPLACE_FIELD.key());
-        pattern = pluginConfig.getString(KEY_PATTERN.key());
-        replacement = pluginConfig.getString(KEY_REPLACEMENT.key());
-        if (pluginConfig.hasPath(KEY_IS_REGEX.key())) {
-            isRegex = pluginConfig.getBoolean(KEY_IS_REGEX.key());
-        }
-        if (pluginConfig.hasPath(KEY_REPLACE_FIRST.key())) {
-            replaceFirst = pluginConfig.getBoolean(KEY_REPLACE_FIRST.key());
-        }
-    }
-
-    @Override
-    protected void setInputRowType(SeaTunnelRowType rowType) {
-        inputFieldIndex = rowType.indexOf(replaceField);
-        if (inputFieldIndex == -1) {
-            throw new IllegalArgumentException(
-                    "Cannot find [" + replaceField + "] field in input row type");
-        }
-    }
-
-    @Override
-    protected String getOutputFieldName() {
-        return replaceField;
-    }
-
-    @Override
-    protected SeaTunnelDataType getOutputFieldDataType() {
-        return BasicType.STRING_TYPE;
-    }
-
-    @Override
-    protected Object getOutputFieldValue(SeaTunnelRowAccessor inputRow) {
-        Object inputFieldValue = inputRow.getField(inputFieldIndex);
-        if (inputFieldValue == null) {
-            return null;
-        }
-
-        if (isRegex) {
-            if (replaceFirst) {
-                return inputFieldValue.toString().replaceFirst(pattern, replacement);
-            }
-            return inputFieldValue.toString().replaceAll(pattern, replacement);
-        }
-        return inputFieldValue.toString().replace(pattern, replacement);
-    }
-}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
index 2609ced1b..1287fff83 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
@@ -19,12 +19,14 @@ package org.apache.seatunnel.transform.common;
 
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 
+import lombok.NoArgsConstructor;
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 
@@ -36,6 +38,7 @@ import java.util.Optional;
 import java.util.stream.Collectors;
 
 @Slf4j
+@NoArgsConstructor
 public abstract class MultipleFieldOutputTransform extends AbstractCatalogSupportTransform {
 
     private static final String[] TYPE_ARRAY_STRING = new String[0];
@@ -46,10 +49,6 @@ public abstract class MultipleFieldOutputTransform extends AbstractCatalogSuppor
     private int[] fieldsIndex;
     private SeaTunnelRowContainerGenerator rowContainerGenerator;
 
-    public MultipleFieldOutputTransform() {
-        super();
-    }
-
     public MultipleFieldOutputTransform(@NonNull CatalogTable inputCatalogTable) {
         super(inputCatalogTable);
     }
@@ -181,10 +180,17 @@ public abstract class MultipleFieldOutputTransform extends AbstractCatalogSuppor
                         .map(Column::getName)
                         .collect(Collectors.toList())
                         .toArray(TYPE_ARRAY_STRING);
-        TableSchema.Builder builder =
-                TableSchema.builder()
-                        .primaryKey(inputCatalogTable.getTableSchema().getPrimaryKey())
-                        .constraintKey(inputCatalogTable.getTableSchema().getConstraintKeys());
+
+        List<ConstraintKey> copiedConstraintKeys =
+                inputCatalogTable.getTableSchema().getConstraintKeys().stream()
+                        .map(ConstraintKey::copy)
+                        .collect(Collectors.toList());
+
+        TableSchema.Builder builder = TableSchema.builder();
+        if (inputCatalogTable.getTableSchema().getPrimaryKey() != null) {
+            builder = builder.primaryKey(inputCatalogTable.getTableSchema().getPrimaryKey().copy());
+        }
+        builder = builder.constraintKey(copiedConstraintKeys);
         List<Column> columns =
                 inputCatalogTable.getTableSchema().getColumns().stream()
                         .map(Column::copy)
@@ -252,7 +258,7 @@ public abstract class MultipleFieldOutputTransform extends AbstractCatalogSuppor
 
     @Override
     protected TableIdentifier transformTableIdentifier() {
-        return inputCatalogTable.getTableId();
+        return inputCatalogTable.getTableId().copy();
     }
 
     protected abstract Column[] getOutputColumns();
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
index dfc217546..76554e176 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
@@ -17,17 +17,28 @@
 
 package org.apache.seatunnel.transform.common;
 
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 
 import java.util.Arrays;
 import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
 
 @Slf4j
-public abstract class SingleFieldOutputTransform extends AbstractSeaTunnelTransform {
+@NoArgsConstructor
+public abstract class SingleFieldOutputTransform extends AbstractCatalogSupportTransform {
 
     private static final String[] TYPE_ARRAY_STRING = new String[0];
     private static final SeaTunnelDataType[] TYPE_ARRAY_SEATUNNEL_DATA_TYPE =
@@ -36,6 +47,10 @@ public abstract class SingleFieldOutputTransform extends AbstractSeaTunnelTransf
     private int fieldIndex;
     private SeaTunnelRowContainerGenerator rowContainerGenerator;
 
+    public SingleFieldOutputTransform(@NonNull CatalogTable inputCatalogTable) {
+        super(inputCatalogTable);
+    }
+
     @Override
     protected SeaTunnelRowType transformRowType(SeaTunnelRowType inputRowType) {
         setInputRowType(inputRowType);
@@ -131,4 +146,85 @@ public abstract class SingleFieldOutputTransform extends AbstractSeaTunnelTransf
      * @return
      */
     protected abstract Object getOutputFieldValue(SeaTunnelRowAccessor inputRow);
+
+    @Override
+    protected TableSchema transformTableSchema() {
+        Column outputColumn = getOutputColumn();
+        List<ConstraintKey> copiedConstraintKeys =
+                inputCatalogTable.getTableSchema().getConstraintKeys().stream()
+                        .map(ConstraintKey::copy)
+                        .collect(Collectors.toList());
+
+        TableSchema.Builder builder = TableSchema.builder();
+        if (inputCatalogTable.getTableSchema().getPrimaryKey() != null) {
+            builder = builder.primaryKey(inputCatalogTable.getTableSchema().getPrimaryKey().copy());
+        }
+        builder = builder.constraintKey(copiedConstraintKeys);
+        List<Column> columns =
+                inputCatalogTable.getTableSchema().getColumns().stream()
+                        .map(Column::copy)
+                        .collect(Collectors.toList());
+
+        int addFieldCount = 0;
+        Optional<Column> optional =
+                columns.stream()
+                        .filter(c -> c.getName().equals(outputColumn.getName()))
+                        .findFirst();
+        if (optional.isPresent()) {
+            Column originalColumn = optional.get();
+            int originalColumnIndex = columns.indexOf(originalColumn);
+            if (!originalColumn.getDataType().equals(outputColumn.getDataType())) {
+                columns.set(originalColumnIndex, originalColumn.copy(outputColumn.getDataType()));
+            }
+            this.fieldIndex = originalColumnIndex;
+        } else {
+            addFieldCount++;
+            columns.add(outputColumn);
+            this.fieldIndex = columns.indexOf(outputColumn);
+        }
+
+        TableSchema outputTableSchema = builder.columns(columns).build();
+        if (addFieldCount > 0) {
+            this.fieldIndex = outputTableSchema.getColumns().size() - 1;
+            int inputFieldLength =
+                    inputCatalogTable.getTableSchema().toPhysicalRowDataType().getTotalFields();
+            int outputFieldLength = outputTableSchema.getColumns().size();
+
+            rowContainerGenerator =
+                    new SeaTunnelRowContainerGenerator() {
+                        @Override
+                        public SeaTunnelRow apply(SeaTunnelRow inputRow) {
+                            // todo reuse array container
+                            Object[] outputFieldValues = new Object[outputFieldLength];
+                            System.arraycopy(
+                                    inputRow.getFields(),
+                                    0,
+                                    outputFieldValues,
+                                    0,
+                                    inputFieldLength);
+
+                            SeaTunnelRow outputRow = new SeaTunnelRow(outputFieldValues);
+                            outputRow.setTableId(inputRow.getTableId());
+                            outputRow.setRowKind(inputRow.getRowKind());
+                            return outputRow;
+                        }
+                    };
+        } else {
+            rowContainerGenerator = SeaTunnelRowContainerGenerator.REUSE_ROW;
+        }
+
+        log.info(
+                "Changed input table schema: {} to output table schema: {}",
+                inputCatalogTable.getTableSchema(),
+                outputTableSchema);
+
+        return outputTableSchema;
+    }
+
+    @Override
+    protected TableIdentifier transformTableIdentifier() {
+        return inputCatalogTable.getTableId().copy();
+    }
+
+    protected abstract Column getOutputColumn();
 }
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransform.java
new file mode 100644
index 000000000..891b1bb20
--- /dev/null
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransform.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.replace;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.ConfigValidator;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+import org.apache.seatunnel.transform.common.SingleFieldOutputTransform;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import com.google.auto.service.AutoService;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+@AutoService(SeaTunnelTransform.class)
+@NoArgsConstructor
+public class ReplaceTransform extends SingleFieldOutputTransform {
+    private ReadonlyConfig config;
+    private int inputFieldIndex;
+
+    public ReplaceTransform(
+            @NonNull ReadonlyConfig config, @NonNull CatalogTable inputCatalogTable) {
+        super(inputCatalogTable);
+        this.config = config;
+        initOutputFields(
+                inputCatalogTable.getTableSchema().toPhysicalRowDataType(),
+                this.config.get(ReplaceTransformConfig.KEY_REPLACE_FIELD));
+    }
+
+    @Override
+    public String getPluginName() {
+        return "Replace";
+    }
+
+    @Override
+    protected void setConfig(Config pluginConfig) {
+        ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig))
+                .validate(new ReplaceTransformFactory().optionRule());
+        this.config = ReadonlyConfig.fromConfig(pluginConfig);
+    }
+
+    @Override
+    protected void setInputRowType(SeaTunnelRowType rowType) {
+        initOutputFields(rowType, config.get(ReplaceTransformConfig.KEY_REPLACE_FIELD));
+    }
+
+    private void initOutputFields(SeaTunnelRowType inputRowType, String replaceField) {
+        inputFieldIndex = inputRowType.indexOf(replaceField);
+        if (inputFieldIndex == -1) {
+            throw new IllegalArgumentException(
+                    "Cannot find [" + replaceField + "] field in input row type");
+        }
+    }
+
+    @Override
+    protected String getOutputFieldName() {
+        return config.get(ReplaceTransformConfig.KEY_REPLACE_FIELD);
+    }
+
+    @Override
+    protected SeaTunnelDataType getOutputFieldDataType() {
+        return BasicType.STRING_TYPE;
+    }
+
+    @Override
+    protected Object getOutputFieldValue(SeaTunnelRowAccessor inputRow) {
+        Object inputFieldValue = inputRow.getField(inputFieldIndex);
+        if (inputFieldValue == null) {
+            return null;
+        }
+
+        boolean isRegex =
+                config.get(ReplaceTransformConfig.KEY_IS_REGEX) == null
+                        ? false
+                        : config.get(ReplaceTransformConfig.KEY_IS_REGEX);
+        if (isRegex) {
+            if (config.get(ReplaceTransformConfig.KEY_REPLACE_FIRST)) {
+                return inputFieldValue
+                        .toString()
+                        .replaceFirst(
+                                config.get(ReplaceTransformConfig.KEY_PATTERN),
+                                config.get(ReplaceTransformConfig.KEY_REPLACEMENT));
+            }
+            return inputFieldValue
+                    .toString()
+                    .replaceAll(
+                            config.get(ReplaceTransformConfig.KEY_PATTERN),
+                            config.get(ReplaceTransformConfig.KEY_REPLACEMENT));
+        }
+        return inputFieldValue
+                .toString()
+                .replace(
+                        config.get(ReplaceTransformConfig.KEY_PATTERN),
+                        config.get(ReplaceTransformConfig.KEY_REPLACEMENT));
+    }
+
+    @Override
+    protected Column getOutputColumn() {
+        List<Column> columns = inputCatalogTable.getTableSchema().getColumns();
+        List<Column> collect =
+                columns.stream()
+                        .filter(
+                                column ->
+                                        column.getName()
+                                                .equals(
+                                                        config.get(
+                                                                ReplaceTransformConfig
+                                                                        .KEY_REPLACE_FIELD)))
+                        .collect(Collectors.toList());
+        if (CollectionUtils.isEmpty(collect)) {
+            throw new IllegalArgumentException(
+                    "Cannot find ["
+                            + config.get(ReplaceTransformConfig.KEY_REPLACE_FIELD)
+                            + "] field in input catalog table");
+        }
+        return collect.get(0).copy();
+    }
+}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransformConfig.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransformConfig.java
new file mode 100644
index 000000000..97630080e
--- /dev/null
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransformConfig.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.replace;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import java.io.Serializable;
+
+public class ReplaceTransformConfig implements Serializable {
+
+    public static final Option<String> KEY_REPLACE_FIELD =
+            Options.key("replace_field")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The field you want to replace");
+
+    public static final Option<String> KEY_PATTERN =
+            Options.key("pattern")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The old string that will be replaced");
+
+    public static final Option<String> KEY_REPLACEMENT =
+            Options.key("replacement")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The new string for replace");
+
+    public static final Option<Boolean> KEY_IS_REGEX =
+            Options.key("is_regex")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Use regex for string match");
+
+    public static final Option<Boolean> KEY_REPLACE_FIRST =
+            Options.key("replace_first")
+                    .booleanType()
+                    .noDefaultValue()
+                    .withDescription("Replace the first match string");
+}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/ReplaceTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransformFactory.java
similarity index 58%
rename from seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/ReplaceTransformFactory.java
rename to seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransformFactory.java
index fa765156c..25696ba6e 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/ReplaceTransformFactory.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransformFactory.java
@@ -15,20 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.transform;
+package org.apache.seatunnel.transform.replace;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableTransform;
 import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableFactoryContext;
 import org.apache.seatunnel.api.table.factory.TableTransformFactory;
 
 import com.google.auto.service.AutoService;
 
-import static org.apache.seatunnel.transform.ReplaceTransform.KEY_IS_REGEX;
-import static org.apache.seatunnel.transform.ReplaceTransform.KEY_PATTERN;
-import static org.apache.seatunnel.transform.ReplaceTransform.KEY_REPLACEMENT;
-import static org.apache.seatunnel.transform.ReplaceTransform.KEY_REPLACE_FIELD;
-import static org.apache.seatunnel.transform.ReplaceTransform.KEY_REPLACE_FIRST;
-
 @AutoService(Factory.class)
 public class ReplaceTransformFactory implements TableTransformFactory {
     @Override
@@ -39,9 +36,21 @@ public class ReplaceTransformFactory implements TableTransformFactory {
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(KEY_REPLACE_FIELD, KEY_PATTERN, KEY_REPLACEMENT)
-                .optional(KEY_IS_REGEX)
-                .conditional(KEY_IS_REGEX, true, KEY_REPLACE_FIRST)
+                .required(
+                        ReplaceTransformConfig.KEY_REPLACE_FIELD,
+                        ReplaceTransformConfig.KEY_PATTERN,
+                        ReplaceTransformConfig.KEY_REPLACEMENT)
+                .optional(ReplaceTransformConfig.KEY_IS_REGEX)
+                .conditional(
+                        ReplaceTransformConfig.KEY_IS_REGEX,
+                        true,
+                        ReplaceTransformConfig.KEY_REPLACE_FIRST)
                 .build();
     }
+
+    @Override
+    public TableTransform createTransform(TableFactoryContext context) {
+        CatalogTable catalogTable = context.getCatalogTable();
+        return () -> new ReplaceTransform(context.getOptions(), catalogTable);
+    }
 }
diff --git a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/ReplaceTransformFactoryTest.java b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/ReplaceTransformFactoryTest.java
index 5bc9267c4..6bacfec8f 100644
--- a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/ReplaceTransformFactoryTest.java
+++ b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/ReplaceTransformFactoryTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.transform;
 
+import org.apache.seatunnel.transform.replace.ReplaceTransformFactory;
+
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;