You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/06/19 05:17:46 UTC
[incubator-seatunnel] branch dev updated: [Feature][Connector]add AssertSink connector (#2022)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 581b1e1d5 [Feature][Connector]add AssertSink connector (#2022)
581b1e1d5 is described below
commit 581b1e1d5bb56a8b09248b001801691570cd9946
Author: Jared Li <lh...@gmail.com>
AuthorDate: Sun Jun 19 13:17:40 2022 +0800
[Feature][Connector]add AssertSink connector (#2022)
* [Feature][Connector]add AssertSink AssertExecutor
---
docs/en/connector/sink/Assert.md | 105 +++++++++++++++++++
.../seatunnel-connectors-flink-dist/pom.xml | 6 ++
.../seatunnel-connectors-flink/pom.xml | 1 +
.../{ => seatunnel-connector-flink-assert}/pom.xml | 48 +++++----
.../seatunnel/flink/assertion/AssertExecutor.java | 105 +++++++++++++++++++
.../flink/assertion/rule/AssertFieldRule.java | 64 ++++++++++++
.../flink/assertion/rule/AssertRuleParser.java | 92 +++++++++++++++++
.../seatunnel/flink/assertion/sink/AssertSink.java | 113 +++++++++++++++++++++
.../flink/assertion/AssertExecutorTest.java | 108 ++++++++++++++++++++
.../flink/assertion/rule/AssertRuleParserTest.java | 83 +++++++++++++++
.../seatunnel/core/flink/config/FlinkRunMode.java | 5 +-
.../seatunnel-flink-examples/pom.xml | 5 +
12 files changed, 713 insertions(+), 22 deletions(-)
diff --git a/docs/en/connector/sink/Assert.md b/docs/en/connector/sink/Assert.md
new file mode 100644
index 000000000..74eee7bb7
--- /dev/null
+++ b/docs/en/connector/sink/Assert.md
@@ -0,0 +1,105 @@
+# Assert
+
+> # Sink plugin: Assert [Flink]
+
+## Description
+
+A flink sink plugin which can assert illegal data by user defined rules
+
+:::tip
+
+Engine Supported and plugin name
+
+* [ ] Spark
+* [x] Flink: AssertSink
+
+:::
+
+## Options
+
+| name | type | required | default value |
+| ----------------------- | -------- | -------- | ------------- |
+| rules | ConfigList | yes | - |
+|  field_name | `String` | yes | - |
+|  field_type | `String` | no | - |
+|  field_value | ConfigList | no | - |
+|   rule_type | `String` | no | - |
+|   rule_value | double | no | - |
+
+
+### rules
+
+Rule definition of user's available data. Each rule represents one field validation.
+
+### field_name
+
+field name(string)
+
+### field_type
+
+field type (string), e.g. `string,boolean,byte,short,int,long,float,double,char,void,BigInteger,BigDecimal,Instant`
+
+### field_value
+
+A list value rule define the data value validation
+
+### rule_type
+
+The following rules are supported for now
+`
+NOT_NULL,
+MIN,
+MAX,
+MIN_LENGTH,
+MAX_LENGTH
+`
+
+### rule_value
+
+the value related to rule type
+
+
+## Example
+the whole config obey with `hocon` style
+
+```hocon
+AssertSink {
+ rules =
+ [{
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN_LENGTH
+ rule_value = 3
+ },
+ {
+ rule_type = MAX_LENGTH
+ rule_value = 5
+ }
+ ]
+ },{
+ field_name = age
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN
+ rule_value = 10
+ },
+ {
+ rule_type = MAX
+ rule_value = 20
+ }
+ ]
+ }
+ ]
+
+}
+
+```
diff --git a/seatunnel-connectors/seatunnel-connectors-flink-dist/pom.xml b/seatunnel-connectors/seatunnel-connectors-flink-dist/pom.xml
index 0e3ca69d5..71c43622c 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink-dist/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-flink-dist/pom.xml
@@ -92,6 +92,12 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connector-flink-assert</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
</dependencies>
<build>
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/pom.xml b/seatunnel-connectors/seatunnel-connectors-flink/pom.xml
index bb3495ddd..b05f09bff 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-flink/pom.xml
@@ -44,6 +44,7 @@
<module>seatunnel-connector-flink-influxdb</module>
<module>seatunnel-connector-flink-clickhouse</module>
<module>seatunnel-connector-flink-http</module>
+ <module>seatunnel-connector-flink-assert</module>
</modules>
</project>
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/pom.xml b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/pom.xml
similarity index 56%
copy from seatunnel-connectors/seatunnel-connectors-flink/pom.xml
copy to seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/pom.xml
index bb3495ddd..3ccb346d5 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/pom.xml
@@ -22,28 +22,34 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-connectors</artifactId>
+ <artifactId>seatunnel-connectors-flink</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>seatunnel-connectors-flink</artifactId>
- <packaging>pom</packaging>
-
- <modules>
- <module>seatunnel-connector-flink-console</module>
- <module>seatunnel-connector-flink-druid</module>
- <module>seatunnel-connector-flink-elasticsearch6</module>
- <module>seatunnel-connector-flink-elasticsearch7</module>
- <module>seatunnel-connector-flink-file</module>
- <module>seatunnel-connector-flink-jdbc</module>
- <module>seatunnel-connector-flink-kafka</module>
- <module>seatunnel-connector-flink-fake</module>
- <module>seatunnel-connector-flink-socket</module>
- <module>seatunnel-connector-flink-doris</module>
- <module>seatunnel-connector-flink-influxdb</module>
- <module>seatunnel-connector-flink-clickhouse</module>
- <module>seatunnel-connector-flink-http</module>
- </modules>
-
-</project>
+ <artifactId>seatunnel-connector-flink-assert</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-api-flink</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ </dependency>
+
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/AssertExecutor.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/AssertExecutor.java
new file mode 100644
index 000000000..1e17f4c0f
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/AssertExecutor.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.assertion;
+
+import org.apache.seatunnel.flink.assertion.rule.AssertFieldRule;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * AssertExecutor is used to determine whether a row data is available
+ * It can not only be used in AssertSink, but also other Sink plugin
+ * (stateless Object)
+ */
+public class AssertExecutor {
+ /**
+ * determine whether a row data is available
+ *
+ * @param row row data
+ * @param assertFieldRules definition of user's available data
+ * @return the first rule that can NOT pass, it will be null if pass through all rules
+ */
+ public Optional<AssertFieldRule> fail(Row row, List<AssertFieldRule> assertFieldRules) {
+ return assertFieldRules.stream()
+ .filter(assertFieldRule -> !pass(row, assertFieldRule))
+ .findFirst();
+ }
+
+ private boolean pass(Row row, AssertFieldRule assertFieldRule) {
+ Object value = row.getField(assertFieldRule.getFieldName());
+ if (Objects.isNull(value)) {
+ return Boolean.FALSE;
+ }
+ Boolean typeChecked = checkType(value, assertFieldRule.getFieldType());
+ if (Boolean.FALSE.equals(typeChecked)) {
+ return Boolean.FALSE;
+ }
+ Boolean valueChecked = checkValue(value, assertFieldRule.getFieldValueRules());
+ if (Boolean.FALSE.equals(valueChecked)) {
+ return Boolean.FALSE;
+ }
+ return Boolean.TRUE;
+ }
+
+ private Boolean checkValue(Object value, List<AssertFieldRule.AssertValueRule> fieldValueRules) {
+ if (CollectionUtils.isNotEmpty(fieldValueRules)) {
+ AssertFieldRule.AssertValueRule failValueRule = fieldValueRules.stream()
+ .filter(valueRule -> !pass(value, valueRule))
+ .findFirst()
+ .orElse(null);
+ if (Objects.nonNull(failValueRule)) {
+ return Boolean.FALSE;
+ }
+ }
+ return Boolean.TRUE;
+ }
+
+ private boolean pass(Object value, AssertFieldRule.AssertValueRule valueRule) {
+ if (AssertFieldRule.AssertValueRuleType.NOT_NULL.equals(valueRule.getFieldValueRuleType())) {
+ return Objects.nonNull(value);
+ }
+
+ if (value instanceof Number && AssertFieldRule.AssertValueRuleType.MAX.equals(valueRule.getFieldValueRuleType())) {
+ return ((Number) value).doubleValue() <= valueRule.getFieldValueRuleValue();
+ }
+ if (value instanceof Number && AssertFieldRule.AssertValueRuleType.MIN.equals(valueRule.getFieldValueRuleType())) {
+ return ((Number) value).doubleValue() >= valueRule.getFieldValueRuleValue();
+ }
+
+ String valueStr = Objects.isNull(value) ? StringUtils.EMPTY : String.valueOf(value);
+ if (AssertFieldRule.AssertValueRuleType.MAX_LENGTH.equals(valueRule.getFieldValueRuleType())) {
+ return valueStr.length() <= valueRule.getFieldValueRuleValue();
+ }
+
+ if (AssertFieldRule.AssertValueRuleType.MIN_LENGTH.equals(valueRule.getFieldValueRuleType())) {
+ return valueStr.length() >= valueRule.getFieldValueRuleValue();
+ }
+ return Boolean.TRUE;
+ }
+
+ private Boolean checkType(Object value, TypeInformation<?> fieldType) {
+ return value.getClass().equals(fieldType.getTypeClass());
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/rule/AssertFieldRule.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/rule/AssertFieldRule.java
new file mode 100644
index 000000000..3dec0858d
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/rule/AssertFieldRule.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.assertion.rule;
+
+import lombok.Data;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+public class AssertFieldRule implements Serializable {
+ private String fieldName;
+ private TypeInformation<?> fieldType;
+ private List<AssertValueRule> fieldValueRules;
+
+ @Data
+ public static class AssertValueRule implements Serializable {
+ private AssertValueRuleType fieldValueRuleType;
+ private Double fieldValueRuleValue;
+ }
+
+ /**
+ * Here is all supported value assert rule type,
+ * An exception will be thrown if a field value break the rule
+ */
+ public enum AssertValueRuleType {
+ /**
+ * value can't be null
+ */
+ NOT_NULL,
+ /**
+ * minimum value of the data
+ */
+ MIN,
+ /**
+ * maximum value of the data
+ */
+ MAX,
+ /**
+ * minimum string length of a string data
+ */
+ MIN_LENGTH,
+ /**
+ * maximum string length of a string data
+ */
+ MAX_LENGTH
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/rule/AssertRuleParser.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/rule/AssertRuleParser.java
new file mode 100644
index 000000000..55b74f024
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/rule/AssertRuleParser.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.assertion.rule;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.collect.Maps;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class AssertRuleParser {
+
+ public List<AssertFieldRule> parseRules(List<? extends Config> ruleConfigList) {
+ return ruleConfigList.stream()
+ .map(config -> {
+ AssertFieldRule fieldRule = new AssertFieldRule();
+ fieldRule.setFieldName(config.getString("field_name"));
+ fieldRule.setFieldType(getFieldType(config.getString("field_type")));
+ List<AssertFieldRule.AssertValueRule> fieldValueRules = assembleFieldValueRules(config.getConfigList("field_value"));
+ fieldRule.setFieldValueRules(fieldValueRules);
+ return fieldRule;
+ })
+ .collect(Collectors.toList());
+ }
+
+ private List<AssertFieldRule.AssertValueRule> assembleFieldValueRules(List<? extends Config> fieldValueConfigList) {
+ return fieldValueConfigList.stream()
+ .map(config -> {
+ AssertFieldRule.AssertValueRule valueRule = new AssertFieldRule.AssertValueRule();
+ if (config.hasPath("rule_type")) {
+ valueRule.setFieldValueRuleType(AssertFieldRule.AssertValueRuleType.valueOf(config.getString("rule_type")));
+ }
+ if (config.hasPath("rule_value")) {
+ valueRule.setFieldValueRuleValue(config.getDouble("rule_value"));
+ }
+ return valueRule;
+ })
+ .collect(Collectors.toList());
+ }
+
+ private TypeInformation<?> getFieldType(String fieldTypeStr) {
+ return TYPES.get(fieldTypeStr);
+ }
+
+ private static final Map<String, BasicTypeInfo<?>> TYPES = Maps.newHashMap();
+
+ static {
+ TYPES.put("string", BasicTypeInfo.STRING_TYPE_INFO);
+ TYPES.put("Boolean", BasicTypeInfo.BOOLEAN_TYPE_INFO);
+ TYPES.put("boolean", BasicTypeInfo.BOOLEAN_TYPE_INFO);
+ TYPES.put("Byte", BasicTypeInfo.BYTE_TYPE_INFO);
+ TYPES.put("byte", BasicTypeInfo.BYTE_TYPE_INFO);
+ TYPES.put("Short", BasicTypeInfo.SHORT_TYPE_INFO);
+ TYPES.put("short", BasicTypeInfo.SHORT_TYPE_INFO);
+ TYPES.put("Integer", BasicTypeInfo.INT_TYPE_INFO);
+ TYPES.put("Int", BasicTypeInfo.INT_TYPE_INFO);
+ TYPES.put("int", BasicTypeInfo.INT_TYPE_INFO);
+ TYPES.put("Long", BasicTypeInfo.LONG_TYPE_INFO);
+ TYPES.put("long", BasicTypeInfo.LONG_TYPE_INFO);
+ TYPES.put("Float", BasicTypeInfo.FLOAT_TYPE_INFO);
+ TYPES.put("float", BasicTypeInfo.FLOAT_TYPE_INFO);
+ TYPES.put("Double", BasicTypeInfo.DOUBLE_TYPE_INFO);
+ TYPES.put("double", BasicTypeInfo.DOUBLE_TYPE_INFO);
+ TYPES.put("Character", BasicTypeInfo.CHAR_TYPE_INFO);
+ TYPES.put("char", BasicTypeInfo.CHAR_TYPE_INFO);
+ TYPES.put("Date", BasicTypeInfo.DATE_TYPE_INFO);
+ TYPES.put("Void", BasicTypeInfo.VOID_TYPE_INFO);
+ TYPES.put("void", BasicTypeInfo.VOID_TYPE_INFO);
+ TYPES.put("BigInteger", BasicTypeInfo.BIG_INT_TYPE_INFO);
+ TYPES.put("BigDecimal", BasicTypeInfo.BIG_DEC_TYPE_INFO);
+ TYPES.put("Instant", BasicTypeInfo.INSTANT_TYPE_INFO);
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/sink/AssertSink.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/sink/AssertSink.java
new file mode 100644
index 000000000..7de846245
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/sink/AssertSink.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.assertion.sink;
+
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.BaseFlinkSink;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.assertion.AssertExecutor;
+import org.apache.seatunnel.flink.assertion.rule.AssertFieldRule;
+import org.apache.seatunnel.flink.assertion.rule.AssertRuleParser;
+import org.apache.seatunnel.flink.batch.FlinkBatchSink;
+import org.apache.seatunnel.flink.stream.FlinkStreamSink;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+
+/**
+ * A flink sink plugin which can assert illegal data by user defined rules
+ * Refer to https://github.com/apache/incubator-seatunnel/issues/1912
+ */
+@AutoService(BaseFlinkSink.class)
+public class AssertSink implements FlinkBatchSink, FlinkStreamSink {
+ //The assertion executor
+ private static final AssertExecutor ASSERT_EXECUTOR = new AssertExecutor();
+ //User defined rules used to assert illegal data
+ private List<AssertFieldRule> assertFieldRules;
+ private static final String RULES = "rules";
+ private Config config;
+ private List<? extends Config> configList;
+
+ @Override
+ public void outputBatch(FlinkEnvironment env, DataSet<Row> inDataSet) {
+ inDataSet.map(row -> {
+ ASSERT_EXECUTOR
+ .fail(row, assertFieldRules)
+ .ifPresent(failRule -> {
+ throw new IllegalStateException("row :" + row + " fail rule: " + failRule);
+ });
+ return null;
+ });
+ }
+
+ @Override
+ public void outputStream(FlinkEnvironment env, DataStream<Row> dataStream) {
+ dataStream.map(row -> {
+ ASSERT_EXECUTOR
+ .fail(row, assertFieldRules)
+ .ifPresent(failRule -> {
+ throw new IllegalStateException("row :" + row + "field name of the fail rule: " + failRule.getFieldName());
+ });
+ return null;
+ });
+ }
+
+ @Override
+ public void setConfig(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public Config getConfig() {
+ return config;
+ }
+
+ @Override
+ public CheckResult checkConfig() {
+ if (config.hasPath(RULES)) {
+ configList = this.config.getConfigList(RULES);
+ if (CollectionUtils.isNotEmpty(configList)) {
+ return CheckResult.success();
+ }
+ }
+ return CheckResult.error("There is no assert-rule defined in AssertSink plugin");
+ }
+
+ @Override
+ public void prepare(FlinkEnvironment env) {
+ AssertRuleParser assertRuleParser = new AssertRuleParser();
+ assertFieldRules = assertRuleParser.parseRules(configList);
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public String getPluginName() {
+ return "AssertSink";
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/test/java/org/apache/seatunnel/flink/assertion/AssertExecutorTest.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/test/java/org/apache/seatunnel/flink/assertion/AssertExecutorTest.java
new file mode 100644
index 000000000..34c533cfc
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/test/java/org/apache/seatunnel/flink/assertion/AssertExecutorTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.assertion;
+
+import org.apache.seatunnel.flink.assertion.rule.AssertFieldRule;
+
+import junit.framework.TestCase;
+import org.apache.commons.compress.utils.Lists;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+
+@SuppressWarnings("magicnumber")
+public class AssertExecutorTest extends TestCase {
+ Row row = Row.withNames();
+ AssertExecutor assertExecutor = new AssertExecutor();
+
+ @Override
+ protected void setUp() throws Exception {
+ row.setField("name", "jared");
+ row.setField("age", 17);
+ }
+
+ public void testFailWithType() {
+ List<AssertFieldRule> rules = Lists.newArrayList();
+ AssertFieldRule rule1 = new AssertFieldRule();
+ rule1.setFieldName("name");
+ rule1.setFieldType(Types.INT);
+ rules.add(rule1);
+
+ AssertFieldRule failRule = assertExecutor.fail(row, rules).orElse(null);
+ assertNotNull(failRule);
+ }
+
+ public void testFailWithValue() {
+ List<AssertFieldRule> rules = Lists.newArrayList();
+ AssertFieldRule rule1 = getFieldRule4Name();
+ AssertFieldRule rule2 = getFieldRule4Age();
+
+ rules.add(rule1);
+ rules.add(rule2);
+
+ AssertFieldRule failRule = assertExecutor.fail(row, rules).orElse(null);
+ assertNull(failRule);
+ }
+
+ private AssertFieldRule getFieldRule4Age() {
+ AssertFieldRule rule = new AssertFieldRule();
+ rule.setFieldName("age");
+ rule.setFieldType(Types.INT);
+
+ List<AssertFieldRule.AssertValueRule> valueRules = Lists.newArrayList();
+
+ AssertFieldRule.AssertValueRule valueRule = new AssertFieldRule.AssertValueRule();
+ valueRule.setFieldValueRuleType(AssertFieldRule.AssertValueRuleType.NOT_NULL);
+ AssertFieldRule.AssertValueRule valueRule1 = new AssertFieldRule.AssertValueRule();
+ valueRule1.setFieldValueRuleType(AssertFieldRule.AssertValueRuleType.MIN);
+ valueRule1.setFieldValueRuleValue(13.0);
+ AssertFieldRule.AssertValueRule valueRule2 = new AssertFieldRule.AssertValueRule();
+ valueRule2.setFieldValueRuleType(AssertFieldRule.AssertValueRuleType.MAX);
+ valueRule2.setFieldValueRuleValue(25.0);
+
+ valueRules.add(valueRule);
+ valueRules.add(valueRule1);
+ valueRules.add(valueRule2);
+ rule.setFieldValueRules(valueRules);
+ return rule;
+ }
+
+ private AssertFieldRule getFieldRule4Name() {
+ AssertFieldRule rule = new AssertFieldRule();
+ rule.setFieldName("name");
+ rule.setFieldType(Types.STRING);
+
+ List<AssertFieldRule.AssertValueRule> valueRules = Lists.newArrayList();
+
+ AssertFieldRule.AssertValueRule valueRule = new AssertFieldRule.AssertValueRule();
+ valueRule.setFieldValueRuleType(AssertFieldRule.AssertValueRuleType.NOT_NULL);
+ AssertFieldRule.AssertValueRule valueRule1 = new AssertFieldRule.AssertValueRule();
+ valueRule1.setFieldValueRuleType(AssertFieldRule.AssertValueRuleType.MIN_LENGTH);
+ valueRule1.setFieldValueRuleValue(3.0);
+ AssertFieldRule.AssertValueRule valueRule2 = new AssertFieldRule.AssertValueRule();
+ valueRule2.setFieldValueRuleType(AssertFieldRule.AssertValueRuleType.MAX_LENGTH);
+ valueRule2.setFieldValueRuleValue(5.0);
+
+ valueRules.add(valueRule);
+ valueRules.add(valueRule1);
+ valueRules.add(valueRule2);
+ rule.setFieldValueRules(valueRules);
+ return rule;
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/test/java/org/apache/seatunnel/flink/assertion/rule/AssertRuleParserTest.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/test/java/org/apache/seatunnel/flink/assertion/rule/AssertRuleParserTest.java
new file mode 100644
index 000000000..fad5ed322
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/test/java/org/apache/seatunnel/flink/assertion/rule/AssertRuleParserTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.assertion.rule;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import junit.framework.TestCase;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+
+import java.util.List;
+
+@SuppressWarnings("magicnumber")
+public class AssertRuleParserTest extends TestCase {
+ AssertRuleParser parser = new AssertRuleParser();
+
+ public void testParseRules() {
+ List<? extends Config> ruleConfigList = assembleConfig();
+ List<AssertFieldRule> assertFieldRules = parser.parseRules(ruleConfigList);
+ assertEquals(assertFieldRules.size(), 2);
+ assertEquals(assertFieldRules.get(0).getFieldType(), BasicTypeInfo.STRING_TYPE_INFO);
+ }
+
+ private List<? extends Config> assembleConfig() {
+ String s = "AssertSink {\n" +
+ " rules = \n" +
+ " [{\n" +
+ " field_name = name\n" +
+ " field_type = string\n" +
+ " field_value = [\n" +
+ " {\n" +
+ " rule_type = NOT_NULL\n" +
+ " },\n" +
+ " {\n" +
+ " rule_type = MIN_LENGTH\n" +
+ " rule_value = 3\n" +
+ " },\n" +
+ " {\n" +
+ " rule_type = MAX_LENGTH\n" +
+ " rule_value = 5\n" +
+ " }\n" +
+ " ]\n" +
+ " },{\n" +
+ " field_name = age\n" +
+ " field_type = int\n" +
+ " field_value = [\n" +
+ " {\n" +
+ " rule_type = NOT_NULL\n" +
+ " },\n" +
+ " {\n" +
+ " rule_type = MIN\n" +
+ " rule_value = 10\n" +
+ " },\n" +
+ " {\n" +
+ " rule_type = MAX\n" +
+ " rule_value = 20\n" +
+ " }\n" +
+ " ]\n" +
+ " }\n" +
+ " ]\n" +
+ " \n" +
+ "}\n";
+ Config config = ConfigFactory.parseString(s);
+
+ return config.getConfig("AssertSink").getConfigList("rules");
+ }
+
+}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkRunMode.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkRunMode.java
index 4c3435ded..3e6869ccf 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkRunMode.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkRunMode.java
@@ -37,6 +37,9 @@ public enum FlinkRunMode {
}
public String toString() {
- return mode;
+
+ return "FlinkRunMode{" +
+ "mode='" + mode + '\'' +
+ '}';
}
}
diff --git a/seatunnel-examples/seatunnel-flink-examples/pom.xml b/seatunnel-examples/seatunnel-flink-examples/pom.xml
index d2dadebc3..1982cef7c 100644
--- a/seatunnel-examples/seatunnel-flink-examples/pom.xml
+++ b/seatunnel-examples/seatunnel-flink-examples/pom.xml
@@ -56,6 +56,11 @@
<artifactId>seatunnel-connector-flink-console</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connector-flink-assert</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- seatunnel connectors -->
<!--flink-->