You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/03/28 10:58:53 UTC
[incubator-seatunnel] branch dev updated: Add Catalog support for FilterRowKindTransform (#4420)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 9ce220bf6 Add Catalog support for FilterRowKindTransform (#4420)
9ce220bf6 is described below
commit 9ce220bf6cf52f2b31ea76f84c59a2257444415d
Author: Eric <ga...@gmail.com>
AuthorDate: Tue Mar 28 18:58:45 2023 +0800
Add Catalog support for FilterRowKindTransform (#4420)
---
.../seatunnel/api/table/catalog/TableSchema.java | 21 ++++----
.../transform/common/FilterRowTransform.java | 24 ++++++++-
.../FilterRowKindTransform.java | 61 ++++++++++++----------
.../FilterRowKindTransformFactory.java | 22 +++++---
.../FilterRowKinkTransformConfig.java | 44 ++++++++++++++++
.../seatunnel/transform/split/SplitTransform.java | 6 +--
.../FilterRowKindTransformFactoryTest.java | 2 +
7 files changed, 131 insertions(+), 49 deletions(-)
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
index f2b33baa6..7fd277d2b 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
@@ -92,17 +92,16 @@ public final class TableSchema implements Serializable {
public TableSchema build() {
return new TableSchema(columns, primaryKey, constraintKeys);
}
+ }
- public TableSchema copy() {
- List<Column> copyColumns =
- columns.stream().map(Column::copy).collect(Collectors.toList());
- List<ConstraintKey> copyConstraintKeys =
- constraintKeys.stream().map(ConstraintKey::copy).collect(Collectors.toList());
- return TableSchema.builder()
- .constraintKey(copyConstraintKeys)
- .columns(copyColumns)
- .primaryKey(primaryKey.copy())
- .build();
- }
+ public TableSchema copy() {
+ List<Column> copyColumns = columns.stream().map(Column::copy).collect(Collectors.toList());
+ List<ConstraintKey> copyConstraintKeys =
+ constraintKeys.stream().map(ConstraintKey::copy).collect(Collectors.toList());
+ return TableSchema.builder()
+ .constraintKey(copyConstraintKeys)
+ .columns(copyColumns)
+ .primaryKey(primaryKey == null ? null : primaryKey.copy())
+ .build();
}
}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/FilterRowTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/FilterRowTransform.java
index 939710fb1..d4aaa5ec4 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/FilterRowTransform.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/FilterRowTransform.java
@@ -17,11 +17,33 @@
package org.apache.seatunnel.transform.common;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-public abstract class FilterRowTransform extends AbstractSeaTunnelTransform {
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+
+@NoArgsConstructor
+public abstract class FilterRowTransform extends AbstractCatalogSupportTransform {
+
+ public FilterRowTransform(@NonNull CatalogTable inputCatalogTable) {
+ super(inputCatalogTable);
+ }
+
@Override
protected SeaTunnelRowType transformRowType(SeaTunnelRowType inputRowType) {
return inputRowType;
}
+
+ @Override
+ protected TableSchema transformTableSchema() {
+ return inputCatalogTable.getTableSchema().copy();
+ }
+
+ @Override
+ protected TableIdentifier transformTableIdentifier() {
+ return inputCatalogTable.getTableId().copy();
+ }
}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterRowKindTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKindTransform.java
similarity index 63%
rename from seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterRowKindTransform.java
rename to seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKindTransform.java
index 34b0c8491..1959e2d3b 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterRowKindTransform.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKindTransform.java
@@ -15,12 +15,13 @@
* limitations under the License.
*/
-package org.apache.seatunnel.transform;
+package org.apache.seatunnel.transform.filterrowkind;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.ConfigValidator;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
@@ -29,44 +30,41 @@ import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.transform.common.FilterRowTransform;
import com.google.auto.service.AutoService;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
import lombok.ToString;
import java.util.Collections;
import java.util.HashSet;
-import java.util.List;
import java.util.Set;
@ToString(of = {"includeKinds", "excludeKinds"})
@AutoService(SeaTunnelTransform.class)
+@NoArgsConstructor
public class FilterRowKindTransform extends FilterRowTransform {
- public static final Option<List<RowKind>> INCLUDE_KINDS =
- Options.key("include_kinds")
- .listType(RowKind.class)
- .noDefaultValue()
- .withDescription("the row kinds to include");
- public static final Option<List<RowKind>> EXCLUDE_KINDS =
- Options.key("exclude_kinds")
- .listType(RowKind.class)
- .noDefaultValue()
- .withDescription("the row kinds to exclude");
+ public static String PLUGIN_NAME = "FilterRowKind";
private Set<RowKind> includeKinds = Collections.emptySet();
private Set<RowKind> excludeKinds = Collections.emptySet();
+ public FilterRowKindTransform(
+ @NonNull ReadonlyConfig config, @NonNull CatalogTable inputCatalogTable) {
+ super(inputCatalogTable);
+ initConfig(config);
+ }
+
@Override
public String getPluginName() {
- return "FilterRowKind";
+ return PLUGIN_NAME;
}
- @Override
- protected void setConfig(Config pluginConfig) {
- if (pluginConfig.hasPath(INCLUDE_KINDS.key())) {
- includeKinds =
- new HashSet<>(pluginConfig.getEnumList(RowKind.class, INCLUDE_KINDS.key()));
- }
- if (pluginConfig.hasPath(EXCLUDE_KINDS.key())) {
+ private void initConfig(ReadonlyConfig config) {
+ if (config.get(FilterRowKinkTransformConfig.INCLUDE_KINDS) == null) {
excludeKinds =
- new HashSet<>(pluginConfig.getEnumList(RowKind.class, EXCLUDE_KINDS.key()));
+ new HashSet<RowKind>(config.get(FilterRowKinkTransformConfig.EXCLUDE_KINDS));
+ } else {
+ includeKinds =
+ new HashSet<RowKind>(config.get(FilterRowKinkTransformConfig.INCLUDE_KINDS));
}
if ((includeKinds.isEmpty() && excludeKinds.isEmpty())
|| (!includeKinds.isEmpty() && !excludeKinds.isEmpty())) {
@@ -74,16 +72,25 @@ public class FilterRowKindTransform extends FilterRowTransform {
CommonErrorCode.ILLEGAL_ARGUMENT,
String.format(
"These options(%s,%s) are mutually exclusive, allowing only one set of options to be configured.",
- INCLUDE_KINDS.key(), EXCLUDE_KINDS.key()));
+ FilterRowKinkTransformConfig.INCLUDE_KINDS.key(),
+ FilterRowKinkTransformConfig.EXCLUDE_KINDS.key()));
}
}
+ @Override
+ protected void setConfig(Config pluginConfig) {
+ ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig))
+ .validate(new FilterRowKindTransformFactory().optionRule());
+ initConfig(ReadonlyConfig.fromConfig(pluginConfig));
+ }
+
@Override
protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {
- if (!excludeKinds.isEmpty()) {
- return excludeKinds.contains(inputRow.getRowKind()) ? null : inputRow;
+ if (!this.excludeKinds.isEmpty()) {
+ return this.excludeKinds.contains(inputRow.getRowKind()) ? null : inputRow;
}
- if (!includeKinds.isEmpty()) {
+ if (!this.includeKinds.isEmpty()) {
+ Set<RowKind> includeKinds = this.includeKinds;
return includeKinds.contains(inputRow.getRowKind()) ? inputRow : null;
}
throw new SeaTunnelRuntimeException(
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterRowKindTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKindTransformFactory.java
similarity index 62%
rename from seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterRowKindTransformFactory.java
rename to seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKindTransformFactory.java
index 5f4b68dc8..9e89ebe5e 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterRowKindTransformFactory.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKindTransformFactory.java
@@ -15,26 +15,36 @@
* limitations under the License.
*/
-package org.apache.seatunnel.transform;
+package org.apache.seatunnel.transform.filterrowkind;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableTransform;
import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableFactoryContext;
import org.apache.seatunnel.api.table.factory.TableTransformFactory;
import com.google.auto.service.AutoService;
-import static org.apache.seatunnel.transform.FilterRowKindTransform.EXCLUDE_KINDS;
-import static org.apache.seatunnel.transform.FilterRowKindTransform.INCLUDE_KINDS;
-
@AutoService(Factory.class)
public class FilterRowKindTransformFactory implements TableTransformFactory {
@Override
public String factoryIdentifier() {
- return "FilterRowKind";
+ return FilterRowKindTransform.PLUGIN_NAME;
}
@Override
public OptionRule optionRule() {
- return OptionRule.builder().exclusive(EXCLUDE_KINDS, INCLUDE_KINDS).build();
+ return OptionRule.builder()
+ .exclusive(
+ FilterRowKinkTransformConfig.EXCLUDE_KINDS,
+ FilterRowKinkTransformConfig.INCLUDE_KINDS)
+ .build();
+ }
+
+ @Override
+ public TableTransform createTransform(TableFactoryContext context) {
+ CatalogTable catalogTable = context.getCatalogTable();
+ return () -> new FilterRowKindTransform(context.getOptions(), catalogTable);
}
}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKinkTransformConfig.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKinkTransformConfig.java
new file mode 100644
index 000000000..f31a56a0a
--- /dev/null
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKinkTransformConfig.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.filterrowkind;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.table.type.RowKind;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Getter
+@Setter
+public class FilterRowKinkTransformConfig implements Serializable {
+
+ public static final Option<List<RowKind>> INCLUDE_KINDS =
+ Options.key("include_kinds")
+ .listType(RowKind.class)
+ .noDefaultValue()
+ .withDescription("the row kinds to include");
+ public static final Option<List<RowKind>> EXCLUDE_KINDS =
+ Options.key("exclude_kinds")
+ .listType(RowKind.class)
+ .noDefaultValue()
+ .withDescription("the row kinds to exclude");
+}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java
index 4d7b543fb..d77ef8435 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java
@@ -32,6 +32,7 @@ import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform;
import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
import com.google.auto.service.AutoService;
+import lombok.NoArgsConstructor;
import lombok.NonNull;
import java.util.Arrays;
@@ -41,14 +42,11 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
@AutoService(SeaTunnelTransform.class)
+@NoArgsConstructor
public class SplitTransform extends MultipleFieldOutputTransform {
private SplitTransformConfig splitTransformConfig;
private int splitFieldIndex;
- public SplitTransform() {
- super();
- }
-
public SplitTransform(
@NonNull SplitTransformConfig splitTransformConfig,
@NonNull CatalogTable catalogTable) {
diff --git a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/FilterRowKindTransformFactoryTest.java b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/FilterRowKindTransformFactoryTest.java
index 0cdc9e67b..0637a8a70 100644
--- a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/FilterRowKindTransformFactoryTest.java
+++ b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/FilterRowKindTransformFactoryTest.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.transform;
+import org.apache.seatunnel.transform.filterrowkind.FilterRowKindTransformFactory;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;