You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2023/04/01 09:37:45 UTC
[incubator-seatunnel] branch dev updated: [Feature][Transform-V2] Add support CatalogTable for FilterFieldTransform (#4422)
This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 211db4489 [Feature][Transform-V2] Add support CatalogTable for FilterFieldTransform (#4422)
211db4489 is described below
commit 211db4489deeedc6b6eac317565570cd999ee338
Author: Eric <ga...@gmail.com>
AuthorDate: Sat Apr 1 17:37:39 2023 +0800
[Feature][Transform-V2] Add support CatalogTable for FilterFieldTransform (#4422)
---
.../connector-v2/Error-Quick-Reference-Manual.md | 6 +
.../seatunnel/transform/FilterFieldTransform.java | 92 ------------
.../FilterFieldTransformErrorCode.java} | 27 ++--
.../transform/filter/FilterFieldTransform.java | 163 +++++++++++++++++++++
.../filter/FilterFieldTransformConfig.java} | 26 ++--
.../{ => filter}/FilterFieldTransformFactory.java | 17 ++-
.../transform/FilterFieldTransformFactoryTest.java | 2 +
7 files changed, 216 insertions(+), 117 deletions(-)
diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
index c5fec98a1..29bb1adbf 100644
--- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
+++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
@@ -246,3 +246,9 @@ problems encountered by users.
|---------------|---------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| S3RedShift-01 | Aggregate committer error | S3Redshift Sink Connector will write data to s3 and then move file to the target s3 path. And then use `Copy` action copy the data to Redshift. Please check the error log and find out the specific reason. |
+## FilterFieldTransform Error Codes
+
+| code | description | solution |
+|---------------------------|------------------------|-------------------------|
+| FILTER_FIELD_TRANSFORM-01 | filter field not found | filter field not found. |
+
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterFieldTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterFieldTransform.java
deleted file mode 100644
index 68bd60a37..000000000
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterFieldTransform.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.transform;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.api.transform.SeaTunnelTransform;
-import org.apache.seatunnel.transform.common.AbstractSeaTunnelTransform;
-
-import com.google.auto.service.AutoService;
-import lombok.extern.slf4j.Slf4j;
-
-import java.util.List;
-
-@Slf4j
-@AutoService(SeaTunnelTransform.class)
-public class FilterFieldTransform extends AbstractSeaTunnelTransform {
- public static final Option<List<String>> KEY_FIELDS =
- Options.key("fields")
- .listType()
- .noDefaultValue()
- .withDescription(
- "The list of fields that need to be kept. Fields not in the list will be deleted");
-
- private String[] fields;
- private int[] inputValueIndex;
-
- @Override
- public String getPluginName() {
- return "Filter";
- }
-
- @Override
- protected void setConfig(Config pluginConfig) {
- if (!pluginConfig.hasPath(KEY_FIELDS.key())) {
- throw new IllegalArgumentException("The configuration missing key: " + KEY_FIELDS);
- }
- this.fields = pluginConfig.getStringList(KEY_FIELDS.key()).toArray(new String[0]);
- }
-
- @Override
- protected SeaTunnelRowType transformRowType(SeaTunnelRowType inputRowType) {
- int[] inputValueIndex = new int[fields.length];
- SeaTunnelDataType[] fieldDataTypes = new SeaTunnelDataType[fields.length];
- for (int i = 0; i < fields.length; i++) {
- String field = fields[i];
- int inputFieldIndex = inputRowType.indexOf(field);
- if (inputFieldIndex == -1) {
- throw new IllegalArgumentException(
- "Cannot find [" + field + "] field in input row type");
- }
-
- fieldDataTypes[i] = inputRowType.getFieldType(inputFieldIndex);
- inputValueIndex[i] = inputFieldIndex;
- }
- SeaTunnelRowType outputRowType = new SeaTunnelRowType(fields, fieldDataTypes);
- log.info("Changed input row type: {} to output row type: {}", inputRowType, outputRowType);
-
- this.inputValueIndex = inputValueIndex;
- return outputRowType;
- }
-
- @Override
- protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {
- // todo reuse array container if not remove fields
- Object[] values = new Object[fields.length];
- for (int i = 0; i < fields.length; i++) {
- values[i] = inputRow.getField(inputValueIndex[i]);
- }
- return new SeaTunnelRow(values);
- }
-}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterFieldTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FilterFieldTransformErrorCode.java
similarity index 57%
copy from seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterFieldTransformFactory.java
copy to seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FilterFieldTransformErrorCode.java
index ee3a3272f..59d2d0708 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterFieldTransformFactory.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FilterFieldTransformErrorCode.java
@@ -15,25 +15,28 @@
* limitations under the License.
*/
-package org.apache.seatunnel.transform;
+package org.apache.seatunnel.transform.exception;
-import org.apache.seatunnel.api.configuration.util.OptionRule;
-import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
-import com.google.auto.service.AutoService;
+public enum FilterFieldTransformErrorCode implements SeaTunnelErrorCode {
+ FILTER_FIELD_NOT_FOUND("FILTER_FIELD_TRANSFORM-01", "filter field not found");
-import static org.apache.seatunnel.transform.FilterFieldTransform.KEY_FIELDS;
+ private final String code;
+ private final String description;
+
+ FilterFieldTransformErrorCode(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
-@AutoService(Factory.class)
-public class FilterFieldTransformFactory implements TableTransformFactory {
@Override
- public String factoryIdentifier() {
- return "Filter";
+ public String getCode() {
+ return this.code;
}
@Override
- public OptionRule optionRule() {
- return OptionRule.builder().required(KEY_FIELDS).build();
+ public String getDescription() {
+ return this.description;
}
}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java
new file mode 100644
index 000000000..50c020872
--- /dev/null
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.filter;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.ConfigValidator;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.transform.common.AbstractCatalogSupportTransform;
+import org.apache.seatunnel.transform.exception.FilterFieldTransformErrorCode;
+import org.apache.seatunnel.transform.exception.TransformException;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import com.google.auto.service.AutoService;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@Slf4j
+@NoArgsConstructor
+@AutoService(SeaTunnelTransform.class)
+public class FilterFieldTransform extends AbstractCatalogSupportTransform {
+ public static final String PLUGIN_NAME = "Filter";
+ private int[] inputValueIndex;
+ private String[] fields;
+
+ public FilterFieldTransform(
+ @NonNull ReadonlyConfig config, @NonNull CatalogTable catalogTable) {
+ super(catalogTable);
+ SeaTunnelRowType seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType();
+ fields = config.get(FilterFieldTransformConfig.KEY_FIELDS).toArray(new String[0]);
+ List<String> canNotFoundFields =
+ Arrays.stream(fields)
+ .filter(field -> seaTunnelRowType.indexOf(field) == -1)
+ .collect(Collectors.toList());
+
+ if (!CollectionUtils.isEmpty(canNotFoundFields)) {
+ throw new TransformException(
+ FilterFieldTransformErrorCode.FILTER_FIELD_NOT_FOUND,
+ canNotFoundFields.toString());
+ }
+ }
+
+ @Override
+ public String getPluginName() {
+ return PLUGIN_NAME;
+ }
+
+ @Override
+ protected void setConfig(Config pluginConfig) {
+ ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig))
+ .validate(new FilterFieldTransformFactory().optionRule());
+ fields =
+ ReadonlyConfig.fromConfig(pluginConfig)
+ .get(FilterFieldTransformConfig.KEY_FIELDS)
+ .toArray(new String[0]);
+ }
+
+ @Override
+ protected SeaTunnelRowType transformRowType(SeaTunnelRowType inputRowType) {
+ int[] inputValueIndex = new int[fields.length];
+ SeaTunnelDataType[] fieldDataTypes = new SeaTunnelDataType[fields.length];
+ for (int i = 0; i < fields.length; i++) {
+ String field = fields[i];
+ int inputFieldIndex = inputRowType.indexOf(field);
+ if (inputFieldIndex == -1) {
+ throw new IllegalArgumentException(
+ "Cannot find [" + field + "] field in input row type");
+ }
+
+ fieldDataTypes[i] = inputRowType.getFieldType(inputFieldIndex);
+ inputValueIndex[i] = inputFieldIndex;
+ }
+ SeaTunnelRowType outputRowType = new SeaTunnelRowType(fields, fieldDataTypes);
+ log.info("Changed input row type: {} to output row type: {}", inputRowType, outputRowType);
+
+ this.inputValueIndex = inputValueIndex;
+ return outputRowType;
+ }
+
+ @Override
+ protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {
+ // todo reuse array container if not remove fields
+ Object[] values = new Object[fields.length];
+ for (int i = 0; i < fields.length; i++) {
+ values[i] = inputRow.getField(inputValueIndex[i]);
+ }
+ return new SeaTunnelRow(values);
+ }
+
+ @Override
+ protected TableSchema transformTableSchema() {
+ List<String> filterFields = Arrays.asList(fields);
+ List<Column> outputColumns = new ArrayList<>();
+
+ SeaTunnelRowType seaTunnelRowType =
+ inputCatalogTable.getTableSchema().toPhysicalRowDataType();
+
+ inputValueIndex = new int[filterFields.size()];
+ for (int i = 0; i < filterFields.size(); i++) {
+ String field = filterFields.get(i);
+ int inputFieldIndex = seaTunnelRowType.indexOf(field);
+ if (inputFieldIndex == -1) {
+ throw new IllegalArgumentException(
+ "Cannot find [" + field + "] field in input row type");
+ }
+ inputValueIndex[i] = inputFieldIndex;
+ outputColumns.add(
+ inputCatalogTable.getTableSchema().getColumns().get(inputFieldIndex).copy());
+ }
+
+ List<ConstraintKey> copyConstraintKeys =
+ inputCatalogTable.getTableSchema().getConstraintKeys().stream()
+ .map(ConstraintKey::copy)
+ .collect(Collectors.toList());
+
+ PrimaryKey copiedPrimaryKey =
+ inputCatalogTable.getTableSchema().getPrimaryKey() == null
+ ? null
+ : inputCatalogTable.getTableSchema().getPrimaryKey().copy();
+ return TableSchema.builder()
+ .columns(outputColumns)
+ .primaryKey(copiedPrimaryKey)
+ .constraintKey(copyConstraintKeys)
+ .build();
+ }
+
+ @Override
+ protected TableIdentifier transformTableIdentifier() {
+ return inputCatalogTable.getTableId().copy();
+ }
+}
diff --git a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/FilterFieldTransformFactoryTest.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformConfig.java
similarity index 55%
copy from seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/FilterFieldTransformFactoryTest.java
copy to seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformConfig.java
index 712d23982..adf7cca80 100644
--- a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/FilterFieldTransformFactoryTest.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformConfig.java
@@ -15,16 +15,24 @@
* limitations under the License.
*/
-package org.apache.seatunnel.transform;
+package org.apache.seatunnel.transform.filter;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
-public class FilterFieldTransformFactoryTest {
+import lombok.Getter;
+import lombok.Setter;
- @Test
- public void testOptionRule() throws Exception {
- FilterFieldTransformFactory filterFieldTransformFactory = new FilterFieldTransformFactory();
- Assertions.assertNotNull(filterFieldTransformFactory.optionRule());
- }
+import java.io.Serializable;
+import java.util.List;
+
+@Getter
+@Setter
+public class FilterFieldTransformConfig implements Serializable {
+ public static final Option<List<String>> KEY_FIELDS =
+ Options.key("fields")
+ .listType()
+ .noDefaultValue()
+ .withDescription(
+ "The list of fields that need to be kept. Fields not in the list will be deleted");
}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterFieldTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformFactory.java
similarity index 64%
rename from seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterFieldTransformFactory.java
rename to seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformFactory.java
index ee3a3272f..14259a0a8 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterFieldTransformFactory.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformFactory.java
@@ -15,25 +15,34 @@
* limitations under the License.
*/
-package org.apache.seatunnel.transform;
+package org.apache.seatunnel.transform.filter;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableTransform;
import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableFactoryContext;
import org.apache.seatunnel.api.table.factory.TableTransformFactory;
import com.google.auto.service.AutoService;
-import static org.apache.seatunnel.transform.FilterFieldTransform.KEY_FIELDS;
+import static org.apache.seatunnel.transform.filter.FilterFieldTransform.PLUGIN_NAME;
@AutoService(Factory.class)
public class FilterFieldTransformFactory implements TableTransformFactory {
@Override
public String factoryIdentifier() {
- return "Filter";
+ return PLUGIN_NAME;
}
@Override
public OptionRule optionRule() {
- return OptionRule.builder().required(KEY_FIELDS).build();
+ return OptionRule.builder().required(FilterFieldTransformConfig.KEY_FIELDS).build();
+ }
+
+ @Override
+ public TableTransform createTransform(TableFactoryContext context) {
+ CatalogTable catalogTable = context.getCatalogTable();
+ return () -> new FilterFieldTransform(context.getOptions(), catalogTable);
}
}
diff --git a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/FilterFieldTransformFactoryTest.java b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/FilterFieldTransformFactoryTest.java
index 712d23982..b81620023 100644
--- a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/FilterFieldTransformFactoryTest.java
+++ b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/FilterFieldTransformFactoryTest.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.transform;
+import org.apache.seatunnel.transform.filter.FilterFieldTransformFactory;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;