You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/06/19 05:17:46 UTC

[incubator-seatunnel] branch dev updated: [Feature][Connector]add AssertSink connector (#2022)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 581b1e1d5 [Feature][Connector]add AssertSink connector (#2022)
581b1e1d5 is described below

commit 581b1e1d5bb56a8b09248b001801691570cd9946
Author: Jared Li <lh...@gmail.com>
AuthorDate: Sun Jun 19 13:17:40 2022 +0800

    [Feature][Connector]add AssertSink connector (#2022)
    
    * [Feature][Connector]add AssertSink AssertExecutor
---
 docs/en/connector/sink/Assert.md                   | 105 +++++++++++++++++++
 .../seatunnel-connectors-flink-dist/pom.xml        |   6 ++
 .../seatunnel-connectors-flink/pom.xml             |   1 +
 .../{ => seatunnel-connector-flink-assert}/pom.xml |  48 +++++----
 .../seatunnel/flink/assertion/AssertExecutor.java  | 105 +++++++++++++++++++
 .../flink/assertion/rule/AssertFieldRule.java      |  64 ++++++++++++
 .../flink/assertion/rule/AssertRuleParser.java     |  92 +++++++++++++++++
 .../seatunnel/flink/assertion/sink/AssertSink.java | 113 +++++++++++++++++++++
 .../flink/assertion/AssertExecutorTest.java        | 108 ++++++++++++++++++++
 .../flink/assertion/rule/AssertRuleParserTest.java |  83 +++++++++++++++
 .../seatunnel/core/flink/config/FlinkRunMode.java  |   5 +-
 .../seatunnel-flink-examples/pom.xml               |   5 +
 12 files changed, 713 insertions(+), 22 deletions(-)

diff --git a/docs/en/connector/sink/Assert.md b/docs/en/connector/sink/Assert.md
new file mode 100644
index 000000000..74eee7bb7
--- /dev/null
+++ b/docs/en/connector/sink/Assert.md
@@ -0,0 +1,105 @@
+# Assert
+
+> # Sink plugin: Assert [Flink]
+
+## Description
+
+A flink sink plugin which can assert illegal data by user defined rules
+
+:::tip
+
+Engine Supported and plugin name
+
+* [ ] Spark
+* [x] Flink: AssertSink
+
+:::
+
+## Options
+
+| name                    | type     | required | default value |
+| ----------------------- | -------- | -------- | ------------- |
+| rules         		  | ConfigList | yes      | -             |
+| &ensp;field_name        | `String` | yes       | -     |
+| &ensp;field_type        | `String` | no       | -          |
+| &ensp;field_value | ConfigList | no       | -             |
+| &ensp;&ensp;rule_type         | `String`    | no       | -             |
+| &ensp;&ensp;rule_value         | double    | no       | -             |
+
+
+### rules
+
+Rule definition of user's available data.  Each rule represents one field validation.
+
+### field_name
+
+field name(string)
+
+### field_type
+
+field type (string),  e.g. `string,boolean,byte,short,int,long,float,double,char,void,BigInteger,BigDecimal,Instant`
+
+### field_value
+
+A list value rule define the data value validation
+
+### rule_type
+
+The following rules are supported for now
+`
+NOT_NULL,  
+MIN,  
+MAX,  
+MIN_LENGTH,  
+MAX_LENGTH
+`
+
+### rule_value
+
+the value related to rule type
+
+
+## Example
+the whole config obey with `hocon` style
+
+```hocon
+AssertSink {
+    rules = 
+        [{
+            field_name = name
+            field_type = string
+            field_value = [
+                {
+                    rule_type = NOT_NULL
+                },
+                {
+                    rule_type = MIN_LENGTH
+                    rule_value = 3
+                },
+                {
+                     rule_type = MAX_LENGTH
+                     rule_value = 5
+                }
+            ]
+        },{
+            field_name = age
+            field_type = int
+            field_value = [
+                {
+                    rule_type = NOT_NULL
+                },
+                {
+                    rule_type = MIN
+                    rule_value = 10
+                },
+                {
+                     rule_type = MAX
+                     rule_value = 20
+                }
+            ]
+        }
+        ]
+    
+}
+
+```
diff --git a/seatunnel-connectors/seatunnel-connectors-flink-dist/pom.xml b/seatunnel-connectors/seatunnel-connectors-flink-dist/pom.xml
index 0e3ca69d5..71c43622c 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink-dist/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-flink-dist/pom.xml
@@ -92,6 +92,12 @@
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-connector-flink-assert</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
     </dependencies>
 
     <build>
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/pom.xml b/seatunnel-connectors/seatunnel-connectors-flink/pom.xml
index bb3495ddd..b05f09bff 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-flink/pom.xml
@@ -44,6 +44,7 @@
         <module>seatunnel-connector-flink-influxdb</module>
         <module>seatunnel-connector-flink-clickhouse</module>
         <module>seatunnel-connector-flink-http</module>
+        <module>seatunnel-connector-flink-assert</module>
     </modules>
 
 </project>
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/pom.xml b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/pom.xml
similarity index 56%
copy from seatunnel-connectors/seatunnel-connectors-flink/pom.xml
copy to seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/pom.xml
index bb3495ddd..3ccb346d5 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/pom.xml
@@ -22,28 +22,34 @@
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <groupId>org.apache.seatunnel</groupId>
-        <artifactId>seatunnel-connectors</artifactId>
+        <artifactId>seatunnel-connectors-flink</artifactId>
         <version>${revision}</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>seatunnel-connectors-flink</artifactId>
-    <packaging>pom</packaging>
-
-    <modules>
-        <module>seatunnel-connector-flink-console</module>
-        <module>seatunnel-connector-flink-druid</module>
-        <module>seatunnel-connector-flink-elasticsearch6</module>
-        <module>seatunnel-connector-flink-elasticsearch7</module>
-        <module>seatunnel-connector-flink-file</module>
-        <module>seatunnel-connector-flink-jdbc</module>
-        <module>seatunnel-connector-flink-kafka</module>
-        <module>seatunnel-connector-flink-fake</module>
-        <module>seatunnel-connector-flink-socket</module>
-        <module>seatunnel-connector-flink-doris</module>
-        <module>seatunnel-connector-flink-influxdb</module>
-        <module>seatunnel-connector-flink-clickhouse</module>
-        <module>seatunnel-connector-flink-http</module>
-    </modules>
-
-</project>
+    <artifactId>seatunnel-connector-flink-assert</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-api-flink</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-java</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+        </dependency>
+
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/AssertExecutor.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/AssertExecutor.java
new file mode 100644
index 000000000..1e17f4c0f
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/AssertExecutor.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.assertion;
+
+import org.apache.seatunnel.flink.assertion.rule.AssertFieldRule;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * AssertExecutor is used to determine whether a row data is available
+ * It can not only be used in AssertSink, but also other Sink plugin
+ * (stateless Object)
+ */
+public class AssertExecutor {
+    /**
+     * determine whether a row data is available
+     *
+     * @param row              row data
+     * @param assertFieldRules definition of user's available data
+     * @return the first rule that can NOT pass, it will be null if pass through all rules
+     */
+    public Optional<AssertFieldRule> fail(Row row, List<AssertFieldRule> assertFieldRules) {
+        return assertFieldRules.stream()
+            .filter(assertFieldRule -> !pass(row, assertFieldRule))
+            .findFirst();
+    }
+
+    private boolean pass(Row row, AssertFieldRule assertFieldRule) {
+        Object value = row.getField(assertFieldRule.getFieldName());
+        if (Objects.isNull(value)) {
+            return Boolean.FALSE;
+        }
+        Boolean typeChecked = checkType(value, assertFieldRule.getFieldType());
+        if (Boolean.FALSE.equals(typeChecked)) {
+            return Boolean.FALSE;
+        }
+        Boolean valueChecked = checkValue(value, assertFieldRule.getFieldValueRules());
+        if (Boolean.FALSE.equals(valueChecked)) {
+            return Boolean.FALSE;
+        }
+        return Boolean.TRUE;
+    }
+
+    private Boolean checkValue(Object value, List<AssertFieldRule.AssertValueRule> fieldValueRules) {
+        if (CollectionUtils.isNotEmpty(fieldValueRules)) {
+            AssertFieldRule.AssertValueRule failValueRule = fieldValueRules.stream()
+                .filter(valueRule -> !pass(value, valueRule))
+                .findFirst()
+                .orElse(null);
+            if (Objects.nonNull(failValueRule)) {
+                return Boolean.FALSE;
+            }
+        }
+        return Boolean.TRUE;
+    }
+
+    private boolean pass(Object value, AssertFieldRule.AssertValueRule valueRule) {
+        if (AssertFieldRule.AssertValueRuleType.NOT_NULL.equals(valueRule.getFieldValueRuleType())) {
+            return Objects.nonNull(value);
+        }
+
+        if (value instanceof Number && AssertFieldRule.AssertValueRuleType.MAX.equals(valueRule.getFieldValueRuleType())) {
+            return ((Number) value).doubleValue() <= valueRule.getFieldValueRuleValue();
+        }
+        if (value instanceof Number && AssertFieldRule.AssertValueRuleType.MIN.equals(valueRule.getFieldValueRuleType())) {
+            return ((Number) value).doubleValue() >= valueRule.getFieldValueRuleValue();
+        }
+
+        String valueStr = Objects.isNull(value) ? StringUtils.EMPTY : String.valueOf(value);
+        if (AssertFieldRule.AssertValueRuleType.MAX_LENGTH.equals(valueRule.getFieldValueRuleType())) {
+            return valueStr.length() <= valueRule.getFieldValueRuleValue();
+        }
+
+        if (AssertFieldRule.AssertValueRuleType.MIN_LENGTH.equals(valueRule.getFieldValueRuleType())) {
+            return valueStr.length() >= valueRule.getFieldValueRuleValue();
+        }
+        return Boolean.TRUE;
+    }
+
+    private Boolean checkType(Object value, TypeInformation<?> fieldType) {
+        return value.getClass().equals(fieldType.getTypeClass());
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/rule/AssertFieldRule.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/rule/AssertFieldRule.java
new file mode 100644
index 000000000..3dec0858d
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/rule/AssertFieldRule.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.assertion.rule;
+
+import lombok.Data;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+public class AssertFieldRule implements Serializable {
+    private String fieldName;
+    private TypeInformation<?> fieldType;
+    private List<AssertValueRule> fieldValueRules;
+
+    @Data
+    public static class AssertValueRule implements Serializable {
+        private AssertValueRuleType fieldValueRuleType;
+        private Double fieldValueRuleValue;
+    }
+
+    /**
+     * Here is all supported value assert rule type,
+     * An exception will be thrown if a field value break the rule
+     */
+    public enum AssertValueRuleType {
+        /**
+         * value can't be null
+         */
+        NOT_NULL,
+        /**
+         * minimum value of the data
+         */
+        MIN,
+        /**
+         * maximum value of the data
+         */
+        MAX,
+        /**
+         * minimum string length of a string data
+         */
+        MIN_LENGTH,
+        /**
+         * maximum string length of a string data
+         */
+        MAX_LENGTH
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/rule/AssertRuleParser.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/rule/AssertRuleParser.java
new file mode 100644
index 000000000..55b74f024
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/rule/AssertRuleParser.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.assertion.rule;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.collect.Maps;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class AssertRuleParser {
+
+    public List<AssertFieldRule> parseRules(List<? extends Config> ruleConfigList) {
+        return ruleConfigList.stream()
+            .map(config -> {
+                AssertFieldRule fieldRule = new AssertFieldRule();
+                fieldRule.setFieldName(config.getString("field_name"));
+                fieldRule.setFieldType(getFieldType(config.getString("field_type")));
+                List<AssertFieldRule.AssertValueRule> fieldValueRules = assembleFieldValueRules(config.getConfigList("field_value"));
+                fieldRule.setFieldValueRules(fieldValueRules);
+                return fieldRule;
+            })
+            .collect(Collectors.toList());
+    }
+
+    private List<AssertFieldRule.AssertValueRule> assembleFieldValueRules(List<? extends Config> fieldValueConfigList) {
+        return fieldValueConfigList.stream()
+            .map(config -> {
+                AssertFieldRule.AssertValueRule valueRule = new AssertFieldRule.AssertValueRule();
+                if (config.hasPath("rule_type")) {
+                    valueRule.setFieldValueRuleType(AssertFieldRule.AssertValueRuleType.valueOf(config.getString("rule_type")));
+                }
+                if (config.hasPath("rule_value")) {
+                    valueRule.setFieldValueRuleValue(config.getDouble("rule_value"));
+                }
+                return valueRule;
+            })
+            .collect(Collectors.toList());
+    }
+
+    private TypeInformation<?> getFieldType(String fieldTypeStr) {
+        return TYPES.get(fieldTypeStr);
+    }
+
+    private static final Map<String, BasicTypeInfo<?>> TYPES = Maps.newHashMap();
+
+    static {
+        TYPES.put("string", BasicTypeInfo.STRING_TYPE_INFO);
+        TYPES.put("Boolean", BasicTypeInfo.BOOLEAN_TYPE_INFO);
+        TYPES.put("boolean", BasicTypeInfo.BOOLEAN_TYPE_INFO);
+        TYPES.put("Byte", BasicTypeInfo.BYTE_TYPE_INFO);
+        TYPES.put("byte", BasicTypeInfo.BYTE_TYPE_INFO);
+        TYPES.put("Short", BasicTypeInfo.SHORT_TYPE_INFO);
+        TYPES.put("short", BasicTypeInfo.SHORT_TYPE_INFO);
+        TYPES.put("Integer", BasicTypeInfo.INT_TYPE_INFO);
+        TYPES.put("Int", BasicTypeInfo.INT_TYPE_INFO);
+        TYPES.put("int", BasicTypeInfo.INT_TYPE_INFO);
+        TYPES.put("Long", BasicTypeInfo.LONG_TYPE_INFO);
+        TYPES.put("long", BasicTypeInfo.LONG_TYPE_INFO);
+        TYPES.put("Float", BasicTypeInfo.FLOAT_TYPE_INFO);
+        TYPES.put("float", BasicTypeInfo.FLOAT_TYPE_INFO);
+        TYPES.put("Double", BasicTypeInfo.DOUBLE_TYPE_INFO);
+        TYPES.put("double", BasicTypeInfo.DOUBLE_TYPE_INFO);
+        TYPES.put("Character", BasicTypeInfo.CHAR_TYPE_INFO);
+        TYPES.put("char", BasicTypeInfo.CHAR_TYPE_INFO);
+        TYPES.put("Date", BasicTypeInfo.DATE_TYPE_INFO);
+        TYPES.put("Void", BasicTypeInfo.VOID_TYPE_INFO);
+        TYPES.put("void", BasicTypeInfo.VOID_TYPE_INFO);
+        TYPES.put("BigInteger", BasicTypeInfo.BIG_INT_TYPE_INFO);
+        TYPES.put("BigDecimal", BasicTypeInfo.BIG_DEC_TYPE_INFO);
+        TYPES.put("Instant", BasicTypeInfo.INSTANT_TYPE_INFO);
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/sink/AssertSink.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/sink/AssertSink.java
new file mode 100644
index 000000000..7de846245
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/sink/AssertSink.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.assertion.sink;
+
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.BaseFlinkSink;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.assertion.AssertExecutor;
+import org.apache.seatunnel.flink.assertion.rule.AssertFieldRule;
+import org.apache.seatunnel.flink.assertion.rule.AssertRuleParser;
+import org.apache.seatunnel.flink.batch.FlinkBatchSink;
+import org.apache.seatunnel.flink.stream.FlinkStreamSink;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+
+/**
+ * A flink sink plugin which can assert illegal data by user defined rules
+ * Refer to https://github.com/apache/incubator-seatunnel/issues/1912
+ */
+@AutoService(BaseFlinkSink.class)
+public class AssertSink implements FlinkBatchSink, FlinkStreamSink {
+    //The assertion executor
+    private static final AssertExecutor ASSERT_EXECUTOR = new AssertExecutor();
+    //User defined rules used to assert illegal data
+    private List<AssertFieldRule> assertFieldRules;
+    private static final String RULES = "rules";
+    private Config config;
+    private List<? extends Config> configList;
+
+    @Override
+    public void outputBatch(FlinkEnvironment env, DataSet<Row> inDataSet) {
+        inDataSet.map(row -> {
+            ASSERT_EXECUTOR
+                .fail(row, assertFieldRules)
+                .ifPresent(failRule -> {
+                    throw new IllegalStateException("row :" + row + " fail rule: " + failRule);
+                });
+            return null;
+        });
+    }
+
+    @Override
+    public void outputStream(FlinkEnvironment env, DataStream<Row> dataStream) {
+        dataStream.map(row -> {
+            ASSERT_EXECUTOR
+                .fail(row, assertFieldRules)
+                .ifPresent(failRule -> {
+                    throw new IllegalStateException("row :" + row + "field name of the fail rule: " + failRule.getFieldName());
+                });
+            return null;
+        });
+    }
+
+    @Override
+    public void setConfig(Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public Config getConfig() {
+        return config;
+    }
+
+    @Override
+    public CheckResult checkConfig() {
+        if (config.hasPath(RULES)) {
+            configList = this.config.getConfigList(RULES);
+            if (CollectionUtils.isNotEmpty(configList)) {
+                return CheckResult.success();
+            }
+        }
+        return CheckResult.error("There is no assert-rule defined in AssertSink plugin");
+    }
+
+    @Override
+    public void prepare(FlinkEnvironment env) {
+        AssertRuleParser assertRuleParser = new AssertRuleParser();
+        assertFieldRules = assertRuleParser.parseRules(configList);
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public String getPluginName() {
+        return "AssertSink";
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/test/java/org/apache/seatunnel/flink/assertion/AssertExecutorTest.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/test/java/org/apache/seatunnel/flink/assertion/AssertExecutorTest.java
new file mode 100644
index 000000000..34c533cfc
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/test/java/org/apache/seatunnel/flink/assertion/AssertExecutorTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.assertion;
+
+import org.apache.seatunnel.flink.assertion.rule.AssertFieldRule;
+
+import junit.framework.TestCase;
+import org.apache.commons.compress.utils.Lists;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+
+@SuppressWarnings("magicnumber")
+public class AssertExecutorTest extends TestCase {
+    Row row = Row.withNames();
+    AssertExecutor assertExecutor = new AssertExecutor();
+
+    @Override
+    protected void setUp() throws Exception {
+        row.setField("name", "jared");
+        row.setField("age", 17);
+    }
+
+    public void testFailWithType() {
+        List<AssertFieldRule> rules = Lists.newArrayList();
+        AssertFieldRule rule1 = new AssertFieldRule();
+        rule1.setFieldName("name");
+        rule1.setFieldType(Types.INT);
+        rules.add(rule1);
+
+        AssertFieldRule failRule = assertExecutor.fail(row, rules).orElse(null);
+        assertNotNull(failRule);
+    }
+
+    public void testFailWithValue() {
+        List<AssertFieldRule> rules = Lists.newArrayList();
+        AssertFieldRule rule1 = getFieldRule4Name();
+        AssertFieldRule rule2 = getFieldRule4Age();
+
+        rules.add(rule1);
+        rules.add(rule2);
+
+        AssertFieldRule failRule = assertExecutor.fail(row, rules).orElse(null);
+        assertNull(failRule);
+    }
+
+    private AssertFieldRule getFieldRule4Age() {
+        AssertFieldRule rule = new AssertFieldRule();
+        rule.setFieldName("age");
+        rule.setFieldType(Types.INT);
+
+        List<AssertFieldRule.AssertValueRule> valueRules = Lists.newArrayList();
+
+        AssertFieldRule.AssertValueRule valueRule = new AssertFieldRule.AssertValueRule();
+        valueRule.setFieldValueRuleType(AssertFieldRule.AssertValueRuleType.NOT_NULL);
+        AssertFieldRule.AssertValueRule valueRule1 = new AssertFieldRule.AssertValueRule();
+        valueRule1.setFieldValueRuleType(AssertFieldRule.AssertValueRuleType.MIN);
+        valueRule1.setFieldValueRuleValue(13.0);
+        AssertFieldRule.AssertValueRule valueRule2 = new AssertFieldRule.AssertValueRule();
+        valueRule2.setFieldValueRuleType(AssertFieldRule.AssertValueRuleType.MAX);
+        valueRule2.setFieldValueRuleValue(25.0);
+
+        valueRules.add(valueRule);
+        valueRules.add(valueRule1);
+        valueRules.add(valueRule2);
+        rule.setFieldValueRules(valueRules);
+        return rule;
+    }
+
+    private AssertFieldRule getFieldRule4Name() {
+        AssertFieldRule rule = new AssertFieldRule();
+        rule.setFieldName("name");
+        rule.setFieldType(Types.STRING);
+
+        List<AssertFieldRule.AssertValueRule> valueRules = Lists.newArrayList();
+
+        AssertFieldRule.AssertValueRule valueRule = new AssertFieldRule.AssertValueRule();
+        valueRule.setFieldValueRuleType(AssertFieldRule.AssertValueRuleType.NOT_NULL);
+        AssertFieldRule.AssertValueRule valueRule1 = new AssertFieldRule.AssertValueRule();
+        valueRule1.setFieldValueRuleType(AssertFieldRule.AssertValueRuleType.MIN_LENGTH);
+        valueRule1.setFieldValueRuleValue(3.0);
+        AssertFieldRule.AssertValueRule valueRule2 = new AssertFieldRule.AssertValueRule();
+        valueRule2.setFieldValueRuleType(AssertFieldRule.AssertValueRuleType.MAX_LENGTH);
+        valueRule2.setFieldValueRuleValue(5.0);
+
+        valueRules.add(valueRule);
+        valueRules.add(valueRule1);
+        valueRules.add(valueRule2);
+        rule.setFieldValueRules(valueRules);
+        return rule;
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/test/java/org/apache/seatunnel/flink/assertion/rule/AssertRuleParserTest.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/test/java/org/apache/seatunnel/flink/assertion/rule/AssertRuleParserTest.java
new file mode 100644
index 000000000..fad5ed322
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/test/java/org/apache/seatunnel/flink/assertion/rule/AssertRuleParserTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.assertion.rule;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import junit.framework.TestCase;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+
+import java.util.List;
+
+@SuppressWarnings("magicnumber")
+public class AssertRuleParserTest extends TestCase {
+    AssertRuleParser parser = new AssertRuleParser();
+
+    public void testParseRules() {
+        List<? extends Config> ruleConfigList = assembleConfig();
+        List<AssertFieldRule> assertFieldRules = parser.parseRules(ruleConfigList);
+        assertEquals(assertFieldRules.size(), 2);
+        assertEquals(assertFieldRules.get(0).getFieldType(), BasicTypeInfo.STRING_TYPE_INFO);
+    }
+
+    private List<? extends Config> assembleConfig() {
+        String s = "AssertSink {\n" +
+            "    rules = \n" +
+            "        [{\n" +
+            "            field_name = name\n" +
+            "            field_type = string\n" +
+            "            field_value = [\n" +
+            "                {\n" +
+            "                    rule_type = NOT_NULL\n" +
+            "                },\n" +
+            "                {\n" +
+            "                    rule_type = MIN_LENGTH\n" +
+            "                    rule_value = 3\n" +
+            "                },\n" +
+            "                {\n" +
+            "                     rule_type = MAX_LENGTH\n" +
+            "                     rule_value = 5\n" +
+            "                }\n" +
+            "            ]\n" +
+            "        },{\n" +
+            "            field_name = age\n" +
+            "            field_type = int\n" +
+            "            field_value = [\n" +
+            "                {\n" +
+            "                    rule_type = NOT_NULL\n" +
+            "                },\n" +
+            "                {\n" +
+            "                    rule_type = MIN\n" +
+            "                    rule_value = 10\n" +
+            "                },\n" +
+            "                {\n" +
+            "                     rule_type = MAX\n" +
+            "                     rule_value = 20\n" +
+            "                }\n" +
+            "            ]\n" +
+            "        }\n" +
+            "        ]\n" +
+            "    \n" +
+            "}\n";
+        Config config = ConfigFactory.parseString(s);
+
+        return config.getConfig("AssertSink").getConfigList("rules");
+    }
+
+}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkRunMode.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkRunMode.java
index 4c3435ded..3e6869ccf 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkRunMode.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkRunMode.java
@@ -37,6 +37,9 @@ public enum FlinkRunMode {
     }
 
     public String toString() {
-        return mode;
+
+        return "FlinkRunMode{" +
+            "mode='" + mode + '\'' +
+            '}';
     }
 }
diff --git a/seatunnel-examples/seatunnel-flink-examples/pom.xml b/seatunnel-examples/seatunnel-flink-examples/pom.xml
index d2dadebc3..1982cef7c 100644
--- a/seatunnel-examples/seatunnel-flink-examples/pom.xml
+++ b/seatunnel-examples/seatunnel-flink-examples/pom.xml
@@ -56,6 +56,11 @@
             <artifactId>seatunnel-connector-flink-console</artifactId>
             <version>${project.version}</version>
         </dependency>
+       <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-connector-flink-assert</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <!--   seatunnel connectors   -->
 
         <!--flink-->