You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/03/28 10:58:53 UTC

[incubator-seatunnel] branch dev updated: Add Catalog support for FilterRowKindTransform (#4420)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9ce220bf6 Add Catalog support for FilterRowKindTransform (#4420)
9ce220bf6 is described below

commit 9ce220bf6cf52f2b31ea76f84c59a2257444415d
Author: Eric <ga...@gmail.com>
AuthorDate: Tue Mar 28 18:58:45 2023 +0800

    Add Catalog support for FilterRowKindTransform (#4420)
---
 .../seatunnel/api/table/catalog/TableSchema.java   | 21 ++++----
 .../transform/common/FilterRowTransform.java       | 24 ++++++++-
 .../FilterRowKindTransform.java                    | 61 ++++++++++++----------
 .../FilterRowKindTransformFactory.java             | 22 +++++---
 .../FilterRowKinkTransformConfig.java              | 44 ++++++++++++++++
 .../seatunnel/transform/split/SplitTransform.java  |  6 +--
 .../FilterRowKindTransformFactoryTest.java         |  2 +
 7 files changed, 131 insertions(+), 49 deletions(-)

diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
index f2b33baa6..7fd277d2b 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
@@ -92,17 +92,16 @@ public final class TableSchema implements Serializable {
         public TableSchema build() {
             return new TableSchema(columns, primaryKey, constraintKeys);
         }
+    }
 
-        public TableSchema copy() {
-            List<Column> copyColumns =
-                    columns.stream().map(Column::copy).collect(Collectors.toList());
-            List<ConstraintKey> copyConstraintKeys =
-                    constraintKeys.stream().map(ConstraintKey::copy).collect(Collectors.toList());
-            return TableSchema.builder()
-                    .constraintKey(copyConstraintKeys)
-                    .columns(copyColumns)
-                    .primaryKey(primaryKey.copy())
-                    .build();
-        }
+    public TableSchema copy() {
+        List<Column> copyColumns = columns.stream().map(Column::copy).collect(Collectors.toList());
+        List<ConstraintKey> copyConstraintKeys =
+                constraintKeys.stream().map(ConstraintKey::copy).collect(Collectors.toList());
+        return TableSchema.builder()
+                .constraintKey(copyConstraintKeys)
+                .columns(copyColumns)
+                .primaryKey(primaryKey == null ? null : primaryKey.copy())
+                .build();
     }
 }
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/FilterRowTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/FilterRowTransform.java
index 939710fb1..d4aaa5ec4 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/FilterRowTransform.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/FilterRowTransform.java
@@ -17,11 +17,33 @@
 
 package org.apache.seatunnel.transform.common;
 
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 
-public abstract class FilterRowTransform extends AbstractSeaTunnelTransform {
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+
+@NoArgsConstructor
+public abstract class FilterRowTransform extends AbstractCatalogSupportTransform {
+
+    public FilterRowTransform(@NonNull CatalogTable inputCatalogTable) {
+        super(inputCatalogTable);
+    }
+
     @Override
     protected SeaTunnelRowType transformRowType(SeaTunnelRowType inputRowType) {
         return inputRowType;
     }
+
+    @Override
+    protected TableSchema transformTableSchema() {
+        return inputCatalogTable.getTableSchema().copy();
+    }
+
+    @Override
+    protected TableIdentifier transformTableIdentifier() {
+        return inputCatalogTable.getTableId().copy();
+    }
 }
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterRowKindTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKindTransform.java
similarity index 63%
rename from seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterRowKindTransform.java
rename to seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKindTransform.java
index 34b0c8491..1959e2d3b 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterRowKindTransform.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKindTransform.java
@@ -15,12 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.transform;
+package org.apache.seatunnel.transform.filterrowkind;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.ConfigValidator;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
@@ -29,44 +30,41 @@ import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 import org.apache.seatunnel.transform.common.FilterRowTransform;
 
 import com.google.auto.service.AutoService;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
 import lombok.ToString;
 
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 
 @ToString(of = {"includeKinds", "excludeKinds"})
 @AutoService(SeaTunnelTransform.class)
+@NoArgsConstructor
 public class FilterRowKindTransform extends FilterRowTransform {
-    public static final Option<List<RowKind>> INCLUDE_KINDS =
-            Options.key("include_kinds")
-                    .listType(RowKind.class)
-                    .noDefaultValue()
-                    .withDescription("the row kinds to include");
-    public static final Option<List<RowKind>> EXCLUDE_KINDS =
-            Options.key("exclude_kinds")
-                    .listType(RowKind.class)
-                    .noDefaultValue()
-                    .withDescription("the row kinds to exclude");
+    public static String PLUGIN_NAME = "FilterRowKind";
 
     private Set<RowKind> includeKinds = Collections.emptySet();
     private Set<RowKind> excludeKinds = Collections.emptySet();
 
+    public FilterRowKindTransform(
+            @NonNull ReadonlyConfig config, @NonNull CatalogTable inputCatalogTable) {
+        super(inputCatalogTable);
+        initConfig(config);
+    }
+
     @Override
     public String getPluginName() {
-        return "FilterRowKind";
+        return PLUGIN_NAME;
     }
 
-    @Override
-    protected void setConfig(Config pluginConfig) {
-        if (pluginConfig.hasPath(INCLUDE_KINDS.key())) {
-            includeKinds =
-                    new HashSet<>(pluginConfig.getEnumList(RowKind.class, INCLUDE_KINDS.key()));
-        }
-        if (pluginConfig.hasPath(EXCLUDE_KINDS.key())) {
+    private void initConfig(ReadonlyConfig config) {
+        if (config.get(FilterRowKinkTransformConfig.INCLUDE_KINDS) == null) {
             excludeKinds =
-                    new HashSet<>(pluginConfig.getEnumList(RowKind.class, EXCLUDE_KINDS.key()));
+                    new HashSet<RowKind>(config.get(FilterRowKinkTransformConfig.EXCLUDE_KINDS));
+        } else {
+            includeKinds =
+                    new HashSet<RowKind>(config.get(FilterRowKinkTransformConfig.INCLUDE_KINDS));
         }
         if ((includeKinds.isEmpty() && excludeKinds.isEmpty())
                 || (!includeKinds.isEmpty() && !excludeKinds.isEmpty())) {
@@ -74,16 +72,25 @@ public class FilterRowKindTransform extends FilterRowTransform {
                     CommonErrorCode.ILLEGAL_ARGUMENT,
                     String.format(
                             "These options(%s,%s) are mutually exclusive, allowing only one set of options to be configured.",
-                            INCLUDE_KINDS.key(), EXCLUDE_KINDS.key()));
+                            FilterRowKinkTransformConfig.INCLUDE_KINDS.key(),
+                            FilterRowKinkTransformConfig.EXCLUDE_KINDS.key()));
         }
     }
 
+    @Override
+    protected void setConfig(Config pluginConfig) {
+        ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig))
+                .validate(new FilterRowKindTransformFactory().optionRule());
+        initConfig(ReadonlyConfig.fromConfig(pluginConfig));
+    }
+
     @Override
     protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {
-        if (!excludeKinds.isEmpty()) {
-            return excludeKinds.contains(inputRow.getRowKind()) ? null : inputRow;
+        if (!this.excludeKinds.isEmpty()) {
+            return this.excludeKinds.contains(inputRow.getRowKind()) ? null : inputRow;
         }
-        if (!includeKinds.isEmpty()) {
+        if (!this.includeKinds.isEmpty()) {
+            Set<RowKind> includeKinds = this.includeKinds;
             return includeKinds.contains(inputRow.getRowKind()) ? inputRow : null;
         }
         throw new SeaTunnelRuntimeException(
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterRowKindTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKindTransformFactory.java
similarity index 62%
rename from seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterRowKindTransformFactory.java
rename to seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKindTransformFactory.java
index 5f4b68dc8..9e89ebe5e 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterRowKindTransformFactory.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKindTransformFactory.java
@@ -15,26 +15,36 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.transform;
+package org.apache.seatunnel.transform.filterrowkind;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableTransform;
 import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableFactoryContext;
 import org.apache.seatunnel.api.table.factory.TableTransformFactory;
 
 import com.google.auto.service.AutoService;
 
-import static org.apache.seatunnel.transform.FilterRowKindTransform.EXCLUDE_KINDS;
-import static org.apache.seatunnel.transform.FilterRowKindTransform.INCLUDE_KINDS;
-
 @AutoService(Factory.class)
 public class FilterRowKindTransformFactory implements TableTransformFactory {
     @Override
     public String factoryIdentifier() {
-        return "FilterRowKind";
+        return FilterRowKindTransform.PLUGIN_NAME;
     }
 
     @Override
     public OptionRule optionRule() {
-        return OptionRule.builder().exclusive(EXCLUDE_KINDS, INCLUDE_KINDS).build();
+        return OptionRule.builder()
+                .exclusive(
+                        FilterRowKinkTransformConfig.EXCLUDE_KINDS,
+                        FilterRowKinkTransformConfig.INCLUDE_KINDS)
+                .build();
+    }
+
+    @Override
+    public TableTransform createTransform(TableFactoryContext context) {
+        CatalogTable catalogTable = context.getCatalogTable();
+        return () -> new FilterRowKindTransform(context.getOptions(), catalogTable);
     }
 }
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKinkTransformConfig.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKinkTransformConfig.java
new file mode 100644
index 000000000..f31a56a0a
--- /dev/null
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKinkTransformConfig.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.filterrowkind;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.table.type.RowKind;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Getter
+@Setter
+public class FilterRowKinkTransformConfig implements Serializable {
+
+    public static final Option<List<RowKind>> INCLUDE_KINDS =
+            Options.key("include_kinds")
+                    .listType(RowKind.class)
+                    .noDefaultValue()
+                    .withDescription("the row kinds to include");
+    public static final Option<List<RowKind>> EXCLUDE_KINDS =
+            Options.key("exclude_kinds")
+                    .listType(RowKind.class)
+                    .noDefaultValue()
+                    .withDescription("the row kinds to exclude");
+}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java
index 4d7b543fb..d77ef8435 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java
@@ -32,6 +32,7 @@ import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform;
 import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
 
 import com.google.auto.service.AutoService;
+import lombok.NoArgsConstructor;
 import lombok.NonNull;
 
 import java.util.Arrays;
@@ -41,14 +42,11 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 @AutoService(SeaTunnelTransform.class)
+@NoArgsConstructor
 public class SplitTransform extends MultipleFieldOutputTransform {
     private SplitTransformConfig splitTransformConfig;
     private int splitFieldIndex;
 
-    public SplitTransform() {
-        super();
-    }
-
     public SplitTransform(
             @NonNull SplitTransformConfig splitTransformConfig,
             @NonNull CatalogTable catalogTable) {
diff --git a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/FilterRowKindTransformFactoryTest.java b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/FilterRowKindTransformFactoryTest.java
index 0cdc9e67b..0637a8a70 100644
--- a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/FilterRowKindTransformFactoryTest.java
+++ b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/FilterRowKindTransformFactoryTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.transform;
 
+import org.apache.seatunnel.transform.filterrowkind.FilterRowKindTransformFactory;
+
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;