You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2023/03/28 06:39:52 UTC
[incubator-seatunnel] branch dev updated: [Feature][Transform] Add CatalogTable support for ReplaceTransform (#4411)
This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 60cbeb08b [Feature][Transform] Add CatalogTable support for ReplaceTransform (#4411)
60cbeb08b is described below
commit 60cbeb08b65944219942c8984a19844947ec7f4a
Author: Eric <ga...@gmail.com>
AuthorDate: Tue Mar 28 14:39:46 2023 +0800
[Feature][Transform] Add CatalogTable support for ReplaceTransform (#4411)
* Make Transform Support CatalogTable And CatalogTable Evolution
* revert example code
* Update seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
Co-authored-by: hailin0 <wa...@apache.org>
* Fix CI problems
* Fix CI problems
* Make ReplaceTransform Support CatalogTable
* Update seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
Co-authored-by: hailin0 <wa...@apache.org>
* Update seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
Co-authored-by: hailin0 <wa...@apache.org>
* fix review problems
---------
Co-authored-by: hailin0 <wa...@apache.org>
---
.../test/resources/iotdb/iotdb_source_to_sink.conf | 1 +
.../src/test/resources/copy_transform.conf | 3 -
.../resources/filter_row_kind_exclude_delete.conf | 3 -
.../resources/filter_row_kind_exclude_insert.conf | 3 -
.../resources/filter_row_kind_include_insert.conf | 3 -
.../src/test/resources/filter_transform.conf | 3 -
.../src/test/resources/split_transform.conf | 3 -
.../src/test/resources/replace_transform.conf | 1 +
.../seatunnel/transform/ReplaceTransform.java | 137 -------------------
.../common/MultipleFieldOutputTransform.java | 24 ++--
.../common/SingleFieldOutputTransform.java | 98 +++++++++++++-
.../transform/replace/ReplaceTransform.java | 145 +++++++++++++++++++++
.../transform/replace/ReplaceTransformConfig.java | 56 ++++++++
.../{ => replace}/ReplaceTransformFactory.java | 29 +++--
.../transform/ReplaceTransformFactoryTest.java | 2 +
15 files changed, 336 insertions(+), 175 deletions(-)
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iotdb-e2e/src/test/resources/iotdb/iotdb_source_to_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iotdb-e2e/src/test/resources/iotdb/iotdb_source_to_sink.conf
index 012ece4ea..10e4eef92 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iotdb-e2e/src/test/resources/iotdb/iotdb_source_to_sink.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iotdb-e2e/src/test/resources/iotdb/iotdb_source_to_sink.conf
@@ -62,6 +62,7 @@ transform {
pattern = "root.source_group"
replacement = "root.sink_group"
is_regex = false
+ replace_first = true
}
}
diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/copy_transform.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/copy_transform.conf
index 68c9e4e52..25ca4ce5f 100644
--- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/copy_transform.conf
+++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/copy_transform.conf
@@ -54,9 +54,6 @@ transform {
}
sink {
- Console {
- source_table_name = "fake2"
- }
Assert {
source_table_name = "fake2"
rules =
diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_delete.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_delete.conf
index 44ea90a52..f7fc0f6e0 100644
--- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_delete.conf
+++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_delete.conf
@@ -45,9 +45,6 @@ transform {
}
sink {
- Console {
- source_table_name = "fake1"
- }
Assert {
source_table_name = "fake1"
rules =
diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_insert.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_insert.conf
index 0fe31c921..cc3641778 100644
--- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_insert.conf
+++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_insert.conf
@@ -45,9 +45,6 @@ transform {
}
sink {
- Console {
- source_table_name = "fake1"
- }
Assert {
source_table_name = "fake1"
rules =
diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_include_insert.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_include_insert.conf
index 2ad1fec2b..d1fbf79be 100644
--- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_include_insert.conf
+++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_include_insert.conf
@@ -45,9 +45,6 @@ transform {
}
sink {
- Console {
- source_table_name = "fake1"
- }
Assert {
source_table_name = "fake1"
rules =
diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_transform.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_transform.conf
index 889ebe76f..56439b441 100644
--- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_transform.conf
+++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_transform.conf
@@ -45,9 +45,6 @@ transform {
}
sink {
- Console {
- source_table_name = "fake1"
- }
Assert {
source_table_name = "fake1"
rules =
diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/split_transform.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/split_transform.conf
index 4627e25f0..61e10f694 100644
--- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/split_transform.conf
+++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/split_transform.conf
@@ -47,9 +47,6 @@ transform {
}
sink {
- Console {
- source_table_name = "fake1"
- }
Assert {
source_table_name = "fake1"
rules =
diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/replace_transform.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/replace_transform.conf
index 4f5942c20..95150fb0d 100644
--- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/replace_transform.conf
+++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/replace_transform.conf
@@ -43,6 +43,7 @@ transform {
pattern = ".+"
replacement = "b"
is_regex = true
+ replace_first = true
}
}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/ReplaceTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/ReplaceTransform.java
deleted file mode 100644
index b95d2b4fc..000000000
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/ReplaceTransform.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.transform;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.api.transform.SeaTunnelTransform;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
-import org.apache.seatunnel.transform.common.SingleFieldOutputTransform;
-
-import com.google.auto.service.AutoService;
-
-@AutoService(SeaTunnelTransform.class)
-public class ReplaceTransform extends SingleFieldOutputTransform {
-
- public static final Option<String> KEY_REPLACE_FIELD =
- Options.key("replace_field")
- .stringType()
- .noDefaultValue()
- .withDescription("The field you want to replace");
-
- public static final Option<String> KEY_PATTERN =
- Options.key("pattern")
- .stringType()
- .noDefaultValue()
- .withDescription("The old string that will be replaced");
-
- public static final Option<String> KEY_REPLACEMENT =
- Options.key("replacement")
- .stringType()
- .noDefaultValue()
- .withDescription("The new string for replace");
-
- public static final Option<Boolean> KEY_IS_REGEX =
- Options.key("is_regex")
- .booleanType()
- .defaultValue(false)
- .withDescription("Use regex for string match");
-
- public static final Option<Boolean> KEY_REPLACE_FIRST =
- Options.key("replace_first")
- .booleanType()
- .noDefaultValue()
- .withDescription("Replace the first match string");
-
- private int inputFieldIndex;
- private String replaceField;
- private String pattern;
- private String replacement;
- private boolean isRegex;
- private boolean replaceFirst;
-
- @Override
- public String getPluginName() {
- return "Replace";
- }
-
- @Override
- protected void setConfig(Config pluginConfig) {
- CheckResult checkResult =
- CheckConfigUtil.checkAllExists(
- pluginConfig,
- KEY_REPLACE_FIELD.key(),
- KEY_PATTERN.key(),
- KEY_REPLACEMENT.key());
- if (!checkResult.isSuccess()) {
- throw new IllegalArgumentException("Failed to check config! " + checkResult.getMsg());
- }
-
- replaceField = pluginConfig.getString(KEY_REPLACE_FIELD.key());
- pattern = pluginConfig.getString(KEY_PATTERN.key());
- replacement = pluginConfig.getString(KEY_REPLACEMENT.key());
- if (pluginConfig.hasPath(KEY_IS_REGEX.key())) {
- isRegex = pluginConfig.getBoolean(KEY_IS_REGEX.key());
- }
- if (pluginConfig.hasPath(KEY_REPLACE_FIRST.key())) {
- replaceFirst = pluginConfig.getBoolean(KEY_REPLACE_FIRST.key());
- }
- }
-
- @Override
- protected void setInputRowType(SeaTunnelRowType rowType) {
- inputFieldIndex = rowType.indexOf(replaceField);
- if (inputFieldIndex == -1) {
- throw new IllegalArgumentException(
- "Cannot find [" + replaceField + "] field in input row type");
- }
- }
-
- @Override
- protected String getOutputFieldName() {
- return replaceField;
- }
-
- @Override
- protected SeaTunnelDataType getOutputFieldDataType() {
- return BasicType.STRING_TYPE;
- }
-
- @Override
- protected Object getOutputFieldValue(SeaTunnelRowAccessor inputRow) {
- Object inputFieldValue = inputRow.getField(inputFieldIndex);
- if (inputFieldValue == null) {
- return null;
- }
-
- if (isRegex) {
- if (replaceFirst) {
- return inputFieldValue.toString().replaceFirst(pattern, replacement);
- }
- return inputFieldValue.toString().replaceAll(pattern, replacement);
- }
- return inputFieldValue.toString().replace(pattern, replacement);
- }
-}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
index 2609ced1b..1287fff83 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
@@ -19,12 +19,14 @@ package org.apache.seatunnel.transform.common;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
@@ -36,6 +38,7 @@ import java.util.Optional;
import java.util.stream.Collectors;
@Slf4j
+@NoArgsConstructor
public abstract class MultipleFieldOutputTransform extends AbstractCatalogSupportTransform {
private static final String[] TYPE_ARRAY_STRING = new String[0];
@@ -46,10 +49,6 @@ public abstract class MultipleFieldOutputTransform extends AbstractCatalogSuppor
private int[] fieldsIndex;
private SeaTunnelRowContainerGenerator rowContainerGenerator;
- public MultipleFieldOutputTransform() {
- super();
- }
-
public MultipleFieldOutputTransform(@NonNull CatalogTable inputCatalogTable) {
super(inputCatalogTable);
}
@@ -181,10 +180,17 @@ public abstract class MultipleFieldOutputTransform extends AbstractCatalogSuppor
.map(Column::getName)
.collect(Collectors.toList())
.toArray(TYPE_ARRAY_STRING);
- TableSchema.Builder builder =
- TableSchema.builder()
- .primaryKey(inputCatalogTable.getTableSchema().getPrimaryKey())
- .constraintKey(inputCatalogTable.getTableSchema().getConstraintKeys());
+
+ List<ConstraintKey> copiedConstraintKeys =
+ inputCatalogTable.getTableSchema().getConstraintKeys().stream()
+ .map(ConstraintKey::copy)
+ .collect(Collectors.toList());
+
+ TableSchema.Builder builder = TableSchema.builder();
+ if (inputCatalogTable.getTableSchema().getPrimaryKey() != null) {
+ builder = builder.primaryKey(inputCatalogTable.getTableSchema().getPrimaryKey().copy());
+ }
+ builder = builder.constraintKey(copiedConstraintKeys);
List<Column> columns =
inputCatalogTable.getTableSchema().getColumns().stream()
.map(Column::copy)
@@ -252,7 +258,7 @@ public abstract class MultipleFieldOutputTransform extends AbstractCatalogSuppor
@Override
protected TableIdentifier transformTableIdentifier() {
- return inputCatalogTable.getTableId();
+ return inputCatalogTable.getTableId().copy();
}
protected abstract Column[] getOutputColumns();
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
index dfc217546..76554e176 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
@@ -17,17 +17,28 @@
package org.apache.seatunnel.transform.common;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
@Slf4j
-public abstract class SingleFieldOutputTransform extends AbstractSeaTunnelTransform {
+@NoArgsConstructor
+public abstract class SingleFieldOutputTransform extends AbstractCatalogSupportTransform {
private static final String[] TYPE_ARRAY_STRING = new String[0];
private static final SeaTunnelDataType[] TYPE_ARRAY_SEATUNNEL_DATA_TYPE =
@@ -36,6 +47,10 @@ public abstract class SingleFieldOutputTransform extends AbstractSeaTunnelTransf
private int fieldIndex;
private SeaTunnelRowContainerGenerator rowContainerGenerator;
+ public SingleFieldOutputTransform(@NonNull CatalogTable inputCatalogTable) {
+ super(inputCatalogTable);
+ }
+
@Override
protected SeaTunnelRowType transformRowType(SeaTunnelRowType inputRowType) {
setInputRowType(inputRowType);
@@ -131,4 +146,85 @@ public abstract class SingleFieldOutputTransform extends AbstractSeaTunnelTransf
* @return
*/
protected abstract Object getOutputFieldValue(SeaTunnelRowAccessor inputRow);
+
+ @Override
+ protected TableSchema transformTableSchema() {
+ Column outputColumn = getOutputColumn();
+ List<ConstraintKey> copiedConstraintKeys =
+ inputCatalogTable.getTableSchema().getConstraintKeys().stream()
+ .map(ConstraintKey::copy)
+ .collect(Collectors.toList());
+
+ TableSchema.Builder builder = TableSchema.builder();
+ if (inputCatalogTable.getTableSchema().getPrimaryKey() != null) {
+ builder = builder.primaryKey(inputCatalogTable.getTableSchema().getPrimaryKey().copy());
+ }
+ builder = builder.constraintKey(copiedConstraintKeys);
+ List<Column> columns =
+ inputCatalogTable.getTableSchema().getColumns().stream()
+ .map(Column::copy)
+ .collect(Collectors.toList());
+
+ int addFieldCount = 0;
+ Optional<Column> optional =
+ columns.stream()
+ .filter(c -> c.getName().equals(outputColumn.getName()))
+ .findFirst();
+ if (optional.isPresent()) {
+ Column originalColumn = optional.get();
+ int originalColumnIndex = columns.indexOf(originalColumn);
+ if (!originalColumn.getDataType().equals(outputColumn.getDataType())) {
+ columns.set(originalColumnIndex, originalColumn.copy(outputColumn.getDataType()));
+ }
+ this.fieldIndex = originalColumnIndex;
+ } else {
+ addFieldCount++;
+ columns.add(outputColumn);
+ this.fieldIndex = columns.indexOf(outputColumn);
+ }
+
+ TableSchema outputTableSchema = builder.columns(columns).build();
+ if (addFieldCount > 0) {
+ this.fieldIndex = outputTableSchema.getColumns().size() - 1;
+ int inputFieldLength =
+ inputCatalogTable.getTableSchema().toPhysicalRowDataType().getTotalFields();
+ int outputFieldLength = outputTableSchema.getColumns().size();
+
+ rowContainerGenerator =
+ new SeaTunnelRowContainerGenerator() {
+ @Override
+ public SeaTunnelRow apply(SeaTunnelRow inputRow) {
+ // todo reuse array container
+ Object[] outputFieldValues = new Object[outputFieldLength];
+ System.arraycopy(
+ inputRow.getFields(),
+ 0,
+ outputFieldValues,
+ 0,
+ inputFieldLength);
+
+ SeaTunnelRow outputRow = new SeaTunnelRow(outputFieldValues);
+ outputRow.setTableId(inputRow.getTableId());
+ outputRow.setRowKind(inputRow.getRowKind());
+ return outputRow;
+ }
+ };
+ } else {
+ rowContainerGenerator = SeaTunnelRowContainerGenerator.REUSE_ROW;
+ }
+
+ log.info(
+ "Changed input table schema: {} to output table schema: {}",
+ inputCatalogTable.getTableSchema(),
+ outputTableSchema);
+
+ return outputTableSchema;
+ }
+
+ @Override
+ protected TableIdentifier transformTableIdentifier() {
+ return inputCatalogTable.getTableId().copy();
+ }
+
+ protected abstract Column getOutputColumn();
}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransform.java
new file mode 100644
index 000000000..891b1bb20
--- /dev/null
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransform.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.replace;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.ConfigValidator;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+import org.apache.seatunnel.transform.common.SingleFieldOutputTransform;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import com.google.auto.service.AutoService;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+@AutoService(SeaTunnelTransform.class)
+@NoArgsConstructor
+public class ReplaceTransform extends SingleFieldOutputTransform {
+ private ReadonlyConfig config;
+ private int inputFieldIndex;
+
+ public ReplaceTransform(
+ @NonNull ReadonlyConfig config, @NonNull CatalogTable inputCatalogTable) {
+ super(inputCatalogTable);
+ this.config = config;
+ initOutputFields(
+ inputCatalogTable.getTableSchema().toPhysicalRowDataType(),
+ this.config.get(ReplaceTransformConfig.KEY_REPLACE_FIELD));
+ }
+
+ @Override
+ public String getPluginName() {
+ return "Replace";
+ }
+
+ @Override
+ protected void setConfig(Config pluginConfig) {
+ ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig))
+ .validate(new ReplaceTransformFactory().optionRule());
+ this.config = ReadonlyConfig.fromConfig(pluginConfig);
+ }
+
+ @Override
+ protected void setInputRowType(SeaTunnelRowType rowType) {
+ initOutputFields(rowType, config.get(ReplaceTransformConfig.KEY_REPLACE_FIELD));
+ }
+
+ private void initOutputFields(SeaTunnelRowType inputRowType, String replaceField) {
+ inputFieldIndex = inputRowType.indexOf(replaceField);
+ if (inputFieldIndex == -1) {
+ throw new IllegalArgumentException(
+ "Cannot find [" + replaceField + "] field in input row type");
+ }
+ }
+
+ @Override
+ protected String getOutputFieldName() {
+ return config.get(ReplaceTransformConfig.KEY_REPLACE_FIELD);
+ }
+
+ @Override
+ protected SeaTunnelDataType getOutputFieldDataType() {
+ return BasicType.STRING_TYPE;
+ }
+
+ @Override
+ protected Object getOutputFieldValue(SeaTunnelRowAccessor inputRow) {
+ Object inputFieldValue = inputRow.getField(inputFieldIndex);
+ if (inputFieldValue == null) {
+ return null;
+ }
+
+ boolean isRegex =
+ config.get(ReplaceTransformConfig.KEY_IS_REGEX) == null
+ ? false
+ : config.get(ReplaceTransformConfig.KEY_IS_REGEX);
+ if (isRegex) {
+ if (config.get(ReplaceTransformConfig.KEY_REPLACE_FIRST)) {
+ return inputFieldValue
+ .toString()
+ .replaceFirst(
+ config.get(ReplaceTransformConfig.KEY_PATTERN),
+ config.get(ReplaceTransformConfig.KEY_REPLACEMENT));
+ }
+ return inputFieldValue
+ .toString()
+ .replaceAll(
+ config.get(ReplaceTransformConfig.KEY_PATTERN),
+ config.get(ReplaceTransformConfig.KEY_REPLACEMENT));
+ }
+ return inputFieldValue
+ .toString()
+ .replace(
+ config.get(ReplaceTransformConfig.KEY_PATTERN),
+ config.get(ReplaceTransformConfig.KEY_REPLACEMENT));
+ }
+
+ @Override
+ protected Column getOutputColumn() {
+ List<Column> columns = inputCatalogTable.getTableSchema().getColumns();
+ List<Column> collect =
+ columns.stream()
+ .filter(
+ column ->
+ column.getName()
+ .equals(
+ config.get(
+ ReplaceTransformConfig
+ .KEY_REPLACE_FIELD)))
+ .collect(Collectors.toList());
+ if (CollectionUtils.isEmpty(collect)) {
+ throw new IllegalArgumentException(
+ "Cannot find ["
+ + config.get(ReplaceTransformConfig.KEY_REPLACE_FIELD)
+ + "] field in input catalog table");
+ }
+ return collect.get(0).copy();
+ }
+}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransformConfig.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransformConfig.java
new file mode 100644
index 000000000..97630080e
--- /dev/null
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransformConfig.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.replace;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import java.io.Serializable;
+
+public class ReplaceTransformConfig implements Serializable {
+
+ public static final Option<String> KEY_REPLACE_FIELD =
+ Options.key("replace_field")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The field you want to replace");
+
+ public static final Option<String> KEY_PATTERN =
+ Options.key("pattern")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The old string that will be replaced");
+
+ public static final Option<String> KEY_REPLACEMENT =
+ Options.key("replacement")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The new string for replace");
+
+ public static final Option<Boolean> KEY_IS_REGEX =
+ Options.key("is_regex")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Use regex for string match");
+
+ public static final Option<Boolean> KEY_REPLACE_FIRST =
+ Options.key("replace_first")
+ .booleanType()
+ .noDefaultValue()
+ .withDescription("Replace the first match string");
+}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/ReplaceTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransformFactory.java
similarity index 58%
rename from seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/ReplaceTransformFactory.java
rename to seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransformFactory.java
index fa765156c..25696ba6e 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/ReplaceTransformFactory.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransformFactory.java
@@ -15,20 +15,17 @@
* limitations under the License.
*/
-package org.apache.seatunnel.transform;
+package org.apache.seatunnel.transform.replace;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableTransform;
import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableFactoryContext;
import org.apache.seatunnel.api.table.factory.TableTransformFactory;
import com.google.auto.service.AutoService;
-import static org.apache.seatunnel.transform.ReplaceTransform.KEY_IS_REGEX;
-import static org.apache.seatunnel.transform.ReplaceTransform.KEY_PATTERN;
-import static org.apache.seatunnel.transform.ReplaceTransform.KEY_REPLACEMENT;
-import static org.apache.seatunnel.transform.ReplaceTransform.KEY_REPLACE_FIELD;
-import static org.apache.seatunnel.transform.ReplaceTransform.KEY_REPLACE_FIRST;
-
@AutoService(Factory.class)
public class ReplaceTransformFactory implements TableTransformFactory {
@Override
@@ -39,9 +36,21 @@ public class ReplaceTransformFactory implements TableTransformFactory {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(KEY_REPLACE_FIELD, KEY_PATTERN, KEY_REPLACEMENT)
- .optional(KEY_IS_REGEX)
- .conditional(KEY_IS_REGEX, true, KEY_REPLACE_FIRST)
+ .required(
+ ReplaceTransformConfig.KEY_REPLACE_FIELD,
+ ReplaceTransformConfig.KEY_PATTERN,
+ ReplaceTransformConfig.KEY_REPLACEMENT)
+ .optional(ReplaceTransformConfig.KEY_IS_REGEX)
+ .conditional(
+ ReplaceTransformConfig.KEY_IS_REGEX,
+ true,
+ ReplaceTransformConfig.KEY_REPLACE_FIRST)
.build();
}
+
+ @Override
+ public TableTransform createTransform(TableFactoryContext context) {
+ CatalogTable catalogTable = context.getCatalogTable();
+ return () -> new ReplaceTransform(context.getOptions(), catalogTable);
+ }
}
diff --git a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/ReplaceTransformFactoryTest.java b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/ReplaceTransformFactoryTest.java
index 5bc9267c4..6bacfec8f 100644
--- a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/ReplaceTransformFactoryTest.java
+++ b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/ReplaceTransformFactoryTest.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.transform;
+import org.apache.seatunnel.transform.replace.ReplaceTransformFactory;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;