You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2023/04/01 09:37:45 UTC

[incubator-seatunnel] branch dev updated: [Feature][Transform-V2] Add support CatalogTable for FilterFieldTransform (#4422)

This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 211db4489 [Feature][Transform-V2] Add support CatalogTable for FilterFieldTransform (#4422)
211db4489 is described below

commit 211db4489deeedc6b6eac317565570cd999ee338
Author: Eric <ga...@gmail.com>
AuthorDate: Sat Apr 1 17:37:39 2023 +0800

    [Feature][Transform-V2] Add support CatalogTable for FilterFieldTransform (#4422)
---
 .../connector-v2/Error-Quick-Reference-Manual.md   |   6 +
 .../seatunnel/transform/FilterFieldTransform.java  |  92 ------------
 .../FilterFieldTransformErrorCode.java}            |  27 ++--
 .../transform/filter/FilterFieldTransform.java     | 163 +++++++++++++++++++++
 .../filter/FilterFieldTransformConfig.java}        |  26 ++--
 .../{ => filter}/FilterFieldTransformFactory.java  |  17 ++-
 .../transform/FilterFieldTransformFactoryTest.java |   2 +
 7 files changed, 216 insertions(+), 117 deletions(-)

diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
index c5fec98a1..29bb1adbf 100644
--- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
+++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
@@ -246,3 +246,9 @@ problems encountered by users.
 |---------------|---------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
 | S3RedShift-01 | Aggregate committer error | S3Redshift Sink Connector will write data to s3 and then move file to the target s3 path. And then use `Copy` action copy the data to Redshift. Please check the error log and find out the specific reason. |
 
+## FilterFieldTransform Error Codes
+
+|           code            |      description       |        solution         |
+|---------------------------|------------------------|-------------------------|
+| FILTER_FIELD_TRANSFORM-01 | filter field not found | filter field not found. |
+
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterFieldTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterFieldTransform.java
deleted file mode 100644
index 68bd60a37..000000000
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterFieldTransform.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.transform;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.api.transform.SeaTunnelTransform;
-import org.apache.seatunnel.transform.common.AbstractSeaTunnelTransform;
-
-import com.google.auto.service.AutoService;
-import lombok.extern.slf4j.Slf4j;
-
-import java.util.List;
-
-@Slf4j
-@AutoService(SeaTunnelTransform.class)
-public class FilterFieldTransform extends AbstractSeaTunnelTransform {
-    public static final Option<List<String>> KEY_FIELDS =
-            Options.key("fields")
-                    .listType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "The list of fields that need to be kept. Fields not in the list will be deleted");
-
-    private String[] fields;
-    private int[] inputValueIndex;
-
-    @Override
-    public String getPluginName() {
-        return "Filter";
-    }
-
-    @Override
-    protected void setConfig(Config pluginConfig) {
-        if (!pluginConfig.hasPath(KEY_FIELDS.key())) {
-            throw new IllegalArgumentException("The configuration missing key: " + KEY_FIELDS);
-        }
-        this.fields = pluginConfig.getStringList(KEY_FIELDS.key()).toArray(new String[0]);
-    }
-
-    @Override
-    protected SeaTunnelRowType transformRowType(SeaTunnelRowType inputRowType) {
-        int[] inputValueIndex = new int[fields.length];
-        SeaTunnelDataType[] fieldDataTypes = new SeaTunnelDataType[fields.length];
-        for (int i = 0; i < fields.length; i++) {
-            String field = fields[i];
-            int inputFieldIndex = inputRowType.indexOf(field);
-            if (inputFieldIndex == -1) {
-                throw new IllegalArgumentException(
-                        "Cannot find [" + field + "] field in input row type");
-            }
-
-            fieldDataTypes[i] = inputRowType.getFieldType(inputFieldIndex);
-            inputValueIndex[i] = inputFieldIndex;
-        }
-        SeaTunnelRowType outputRowType = new SeaTunnelRowType(fields, fieldDataTypes);
-        log.info("Changed input row type: {} to output row type: {}", inputRowType, outputRowType);
-
-        this.inputValueIndex = inputValueIndex;
-        return outputRowType;
-    }
-
-    @Override
-    protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {
-        // todo reuse array container if not remove fields
-        Object[] values = new Object[fields.length];
-        for (int i = 0; i < fields.length; i++) {
-            values[i] = inputRow.getField(inputValueIndex[i]);
-        }
-        return new SeaTunnelRow(values);
-    }
-}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterFieldTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FilterFieldTransformErrorCode.java
similarity index 57%
copy from seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterFieldTransformFactory.java
copy to seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FilterFieldTransformErrorCode.java
index ee3a3272f..59d2d0708 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterFieldTransformFactory.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FilterFieldTransformErrorCode.java
@@ -15,25 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.transform;
+package org.apache.seatunnel.transform.exception;
 
-import org.apache.seatunnel.api.configuration.util.OptionRule;
-import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
 
-import com.google.auto.service.AutoService;
+public enum FilterFieldTransformErrorCode implements SeaTunnelErrorCode {
+    FILTER_FIELD_NOT_FOUND("FILTER_FIELD_TRANSFORM-01", "filter field not found");
 
-import static org.apache.seatunnel.transform.FilterFieldTransform.KEY_FIELDS;
+    private final String code;
+    private final String description;
+
+    FilterFieldTransformErrorCode(String code, String description) {
+        this.code = code;
+        this.description = description;
+    }
 
-@AutoService(Factory.class)
-public class FilterFieldTransformFactory implements TableTransformFactory {
     @Override
-    public String factoryIdentifier() {
-        return "Filter";
+    public String getCode() {
+        return this.code;
     }
 
     @Override
-    public OptionRule optionRule() {
-        return OptionRule.builder().required(KEY_FIELDS).build();
+    public String getDescription() {
+        return this.description;
     }
 }
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java
new file mode 100644
index 000000000..50c020872
--- /dev/null
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.filter;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.ConfigValidator;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.transform.common.AbstractCatalogSupportTransform;
+import org.apache.seatunnel.transform.exception.FilterFieldTransformErrorCode;
+import org.apache.seatunnel.transform.exception.TransformException;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import com.google.auto.service.AutoService;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@Slf4j
+@NoArgsConstructor
+@AutoService(SeaTunnelTransform.class)
+public class FilterFieldTransform extends AbstractCatalogSupportTransform {
+    public static final String PLUGIN_NAME = "Filter";
+    private int[] inputValueIndex;
+    private String[] fields;
+
+    public FilterFieldTransform(
+            @NonNull ReadonlyConfig config, @NonNull CatalogTable catalogTable) {
+        super(catalogTable);
+        SeaTunnelRowType seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType();
+        fields = config.get(FilterFieldTransformConfig.KEY_FIELDS).toArray(new String[0]);
+        List<String> canNotFoundFields =
+                Arrays.stream(fields)
+                        .filter(field -> seaTunnelRowType.indexOf(field) == -1)
+                        .collect(Collectors.toList());
+
+        if (!CollectionUtils.isEmpty(canNotFoundFields)) {
+            throw new TransformException(
+                    FilterFieldTransformErrorCode.FILTER_FIELD_NOT_FOUND,
+                    canNotFoundFields.toString());
+        }
+    }
+
+    @Override
+    public String getPluginName() {
+        return PLUGIN_NAME;
+    }
+
+    @Override
+    protected void setConfig(Config pluginConfig) {
+        ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig))
+                .validate(new FilterFieldTransformFactory().optionRule());
+        fields =
+                ReadonlyConfig.fromConfig(pluginConfig)
+                        .get(FilterFieldTransformConfig.KEY_FIELDS)
+                        .toArray(new String[0]);
+    }
+
+    @Override
+    protected SeaTunnelRowType transformRowType(SeaTunnelRowType inputRowType) {
+        int[] inputValueIndex = new int[fields.length];
+        SeaTunnelDataType[] fieldDataTypes = new SeaTunnelDataType[fields.length];
+        for (int i = 0; i < fields.length; i++) {
+            String field = fields[i];
+            int inputFieldIndex = inputRowType.indexOf(field);
+            if (inputFieldIndex == -1) {
+                throw new IllegalArgumentException(
+                        "Cannot find [" + field + "] field in input row type");
+            }
+
+            fieldDataTypes[i] = inputRowType.getFieldType(inputFieldIndex);
+            inputValueIndex[i] = inputFieldIndex;
+        }
+        SeaTunnelRowType outputRowType = new SeaTunnelRowType(fields, fieldDataTypes);
+        log.info("Changed input row type: {} to output row type: {}", inputRowType, outputRowType);
+
+        this.inputValueIndex = inputValueIndex;
+        return outputRowType;
+    }
+
+    @Override
+    protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {
+        // todo reuse array container if not remove fields
+        Object[] values = new Object[fields.length];
+        for (int i = 0; i < fields.length; i++) {
+            values[i] = inputRow.getField(inputValueIndex[i]);
+        }
+        return new SeaTunnelRow(values);
+    }
+
+    @Override
+    protected TableSchema transformTableSchema() {
+        List<String> filterFields = Arrays.asList(fields);
+        List<Column> outputColumns = new ArrayList<>();
+
+        SeaTunnelRowType seaTunnelRowType =
+                inputCatalogTable.getTableSchema().toPhysicalRowDataType();
+
+        inputValueIndex = new int[filterFields.size()];
+        for (int i = 0; i < filterFields.size(); i++) {
+            String field = filterFields.get(i);
+            int inputFieldIndex = seaTunnelRowType.indexOf(field);
+            if (inputFieldIndex == -1) {
+                throw new IllegalArgumentException(
+                        "Cannot find [" + field + "] field in input row type");
+            }
+            inputValueIndex[i] = inputFieldIndex;
+            outputColumns.add(
+                    inputCatalogTable.getTableSchema().getColumns().get(inputFieldIndex).copy());
+        }
+
+        List<ConstraintKey> copyConstraintKeys =
+                inputCatalogTable.getTableSchema().getConstraintKeys().stream()
+                        .map(ConstraintKey::copy)
+                        .collect(Collectors.toList());
+
+        PrimaryKey copiedPrimaryKey =
+                inputCatalogTable.getTableSchema().getPrimaryKey() == null
+                        ? null
+                        : inputCatalogTable.getTableSchema().getPrimaryKey().copy();
+        return TableSchema.builder()
+                .columns(outputColumns)
+                .primaryKey(copiedPrimaryKey)
+                .constraintKey(copyConstraintKeys)
+                .build();
+    }
+
+    @Override
+    protected TableIdentifier transformTableIdentifier() {
+        return inputCatalogTable.getTableId().copy();
+    }
+}
diff --git a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/FilterFieldTransformFactoryTest.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformConfig.java
similarity index 55%
copy from seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/FilterFieldTransformFactoryTest.java
copy to seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformConfig.java
index 712d23982..adf7cca80 100644
--- a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/FilterFieldTransformFactoryTest.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformConfig.java
@@ -15,16 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.transform;
+package org.apache.seatunnel.transform.filter;
 
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
 
-public class FilterFieldTransformFactoryTest {
+import lombok.Getter;
+import lombok.Setter;
 
-    @Test
-    public void testOptionRule() throws Exception {
-        FilterFieldTransformFactory filterFieldTransformFactory = new FilterFieldTransformFactory();
-        Assertions.assertNotNull(filterFieldTransformFactory.optionRule());
-    }
+import java.io.Serializable;
+import java.util.List;
+
+@Getter
+@Setter
+public class FilterFieldTransformConfig implements Serializable {
+    public static final Option<List<String>> KEY_FIELDS =
+            Options.key("fields")
+                    .listType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The list of fields that need to be kept. Fields not in the list will be deleted");
 }
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterFieldTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformFactory.java
similarity index 64%
rename from seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterFieldTransformFactory.java
rename to seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformFactory.java
index ee3a3272f..14259a0a8 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterFieldTransformFactory.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformFactory.java
@@ -15,25 +15,34 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.transform;
+package org.apache.seatunnel.transform.filter;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableTransform;
 import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableFactoryContext;
 import org.apache.seatunnel.api.table.factory.TableTransformFactory;
 
 import com.google.auto.service.AutoService;
 
-import static org.apache.seatunnel.transform.FilterFieldTransform.KEY_FIELDS;
+import static org.apache.seatunnel.transform.filter.FilterFieldTransform.PLUGIN_NAME;
 
 @AutoService(Factory.class)
 public class FilterFieldTransformFactory implements TableTransformFactory {
     @Override
     public String factoryIdentifier() {
-        return "Filter";
+        return PLUGIN_NAME;
     }
 
     @Override
     public OptionRule optionRule() {
-        return OptionRule.builder().required(KEY_FIELDS).build();
+        return OptionRule.builder().required(FilterFieldTransformConfig.KEY_FIELDS).build();
+    }
+
+    @Override
+    public TableTransform createTransform(TableFactoryContext context) {
+        CatalogTable catalogTable = context.getCatalogTable();
+        return () -> new FilterFieldTransform(context.getOptions(), catalogTable);
     }
 }
diff --git a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/FilterFieldTransformFactoryTest.java b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/FilterFieldTransformFactoryTest.java
index 712d23982..b81620023 100644
--- a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/FilterFieldTransformFactoryTest.java
+++ b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/FilterFieldTransformFactoryTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.transform;
 
+import org.apache.seatunnel.transform.filter.FilterFieldTransformFactory;
+
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;