You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/06/16 14:30:24 UTC

[GitHub] [incubator-seatunnel] lhyundeadsoul opened a new pull request, #2022: [Feature][Connector]add AssertSink connector

lhyundeadsoul opened a new pull request, #2022:
URL: https://github.com/apache/incubator-seatunnel/pull/2022

   <!--
   
   Thank you for contributing to SeaTunnel! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [GITHUB issue](https://github.com/apache/incubator-seatunnel/issues).
   
     - Name the pull request in the form "[Feature] [component] Title of the pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
   
     - Minor fixes should be named following this pattern: `[hotfix] [docs] Fix typo in README.md doc`.
   
   -->
   
   ## Purpose of this pull request
    import an AssertSink plugin, this plugin is similier with ConsoleSink plugin, but will not print data in console, it will validate the data with some rule, e.g: data rows, data type, data value...
   
   https://github.com/apache/incubator-seatunnel/issues/1912
   <!-- Describe the purpose of this pull request. For example: This pull request adds checkstyle plugin.-->
   
   ## Check list
   
   * [x] Code changed are covered with tests, or it does not need tests for reason:
   * [x] If any new Jar binary package adding in your PR, please add License Notice according
     [New License Guide](https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [x] If necessary, please update the documentation to describe the new feature. https://github.com/apache/incubator-seatunnel/tree/dev/docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lhyundeadsoul commented on a diff in pull request #2022: [Feature][Connector]add AssertSink connector

Posted by GitBox <gi...@apache.org>.
lhyundeadsoul commented on code in PR #2022:
URL: https://github.com/apache/incubator-seatunnel/pull/2022#discussion_r899725353


##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/sink/AssertSink.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.assertion.sink;
+
+import com.google.auto.service.AutoService;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.BaseFlinkSink;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.assertion.AssertExecutor;
+import org.apache.seatunnel.flink.assertion.rule.AssertFieldRule;
+import org.apache.seatunnel.flink.assertion.rule.AssertRuleParser;
+import org.apache.seatunnel.flink.batch.FlinkBatchSink;
+import org.apache.seatunnel.flink.stream.FlinkStreamSink;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.util.List;
+
+/**
+ * A flink sink plugin which can assert illegal data by user defined rules
+ * Refer to https://github.com/apache/incubator-seatunnel/issues/1912
+ */
+@AutoService(BaseFlinkSink.class)
+public class AssertSink implements FlinkBatchSink, FlinkStreamSink {
+    //The assertion executor
+    private static final AssertExecutor ASSERT_EXECUTOR = new AssertExecutor();
+    //User defined rules used to assert illegal data
+    private List<AssertFieldRule> assertFieldRules;
+    private static final String RULES = "rules";
+    private Config config;
+    private List<? extends Config> configList;
+
+    @Override
+    public void outputBatch(FlinkEnvironment env, DataSet<Row> inDataSet) {
+        inDataSet.map(row -> {

Review Comment:
   > 
   Here is reduce Defination in `org.apache.flink.api.common.functions.ReduceFunction`
   `    T reduce(T value1, T value2) throws Exception;`
   
   
   code will be like
   `
   
       @Override
       public void outputBatch(FlinkEnvironment env, DataSet<Row> inDataSet) {
           inDataSet.reduce((row1, row2) -> {
               ASSERT_EXECUTOR
                       .fail(row1, assertFieldRules)
                       .ifPresent(failRule -> {
                           throw new IllegalStateException("row :" + row1 + " fail rule: " + failRule);
                       });
               ASSERT_EXECUTOR
                       .fail(row2, assertFieldRules)
                       .ifPresent(failRule -> {
                           throw new IllegalStateException("row :" + row2 + " fail rule: " + failRule);
                       });
               return null;
           });
       }
   `



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ruanwenjun commented on a diff in pull request #2022: [Feature][Connector]add AssertSink connector

Posted by GitBox <gi...@apache.org>.
ruanwenjun commented on code in PR #2022:
URL: https://github.com/apache/incubator-seatunnel/pull/2022#discussion_r899723254


##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/sink/AssertSink.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.assertion.sink;
+
+import com.google.auto.service.AutoService;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.BaseFlinkSink;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.assertion.AssertExecutor;
+import org.apache.seatunnel.flink.assertion.rule.AssertFieldRule;
+import org.apache.seatunnel.flink.assertion.rule.AssertRuleParser;
+import org.apache.seatunnel.flink.batch.FlinkBatchSink;
+import org.apache.seatunnel.flink.stream.FlinkStreamSink;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.util.List;
+
+/**
+ * A flink sink plugin which can assert illegal data by user defined rules
+ * Refer to https://github.com/apache/incubator-seatunnel/issues/1912
+ */
+@AutoService(BaseFlinkSink.class)
+public class AssertSink implements FlinkBatchSink, FlinkStreamSink {
+    //The assertion executor
+    private static final AssertExecutor ASSERT_EXECUTOR = new AssertExecutor();
+    //User defined rules used to assert illegal data
+    private List<AssertFieldRule> assertFieldRules;
+    private static final String RULES = "rules";
+    private Config config;
+    private List<? extends Config> configList;
+
+    @Override
+    public void outputBatch(FlinkEnvironment env, DataSet<Row> inDataSet) {
+        inDataSet.map(row -> {
+            System.out.println(row);
+            ASSERT_EXECUTOR
+                    .fail(row, assertFieldRules)
+                    .ifPresent(failRule -> {
+                        throw new IllegalStateException("row :" + row + " fail rule: " + failRule);
+                    });
+            return null;
+        });
+    }
+
+    @Override
+    public void outputStream(FlinkEnvironment env, DataStream<Row> dataStream) {
+        dataStream.map(row -> {
+            System.out.println(row);

Review Comment:
   Use `System.out.println` is not reliable, it will just print the data into console. If you want to log the data here you can use LOGGER, but I think we don't need to log data here, if there is no error, means the validation is pass.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lhyundeadsoul commented on a diff in pull request #2022: [Feature][Connector]add AssertSink connector

Posted by GitBox <gi...@apache.org>.
lhyundeadsoul commented on code in PR #2022:
URL: https://github.com/apache/incubator-seatunnel/pull/2022#discussion_r899713701


##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/sink/AssertSink.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.assertion.sink;
+
+import com.google.auto.service.AutoService;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.BaseFlinkSink;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.assertion.AssertExecutor;
+import org.apache.seatunnel.flink.assertion.rule.AssertFieldRule;
+import org.apache.seatunnel.flink.assertion.rule.AssertRuleParser;
+import org.apache.seatunnel.flink.batch.FlinkBatchSink;
+import org.apache.seatunnel.flink.stream.FlinkStreamSink;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.util.List;
+
+/**
+ * A flink sink plugin which can assert illegal data by user defined rules
+ * Refer to https://github.com/apache/incubator-seatunnel/issues/1912
+ */
+@AutoService(BaseFlinkSink.class)
+public class AssertSink implements FlinkBatchSink, FlinkStreamSink {
+    //The assertion executor
+    private static final AssertExecutor ASSERT_EXECUTOR = new AssertExecutor();
+    //User defined rules used to assert illegal data
+    private List<AssertFieldRule> assertFieldRules;
+    private static final String RULES = "rules";
+    private Config config;
+    private List<? extends Config> configList;
+
+    @Override
+    public void outputBatch(FlinkEnvironment env, DataSet<Row> inDataSet) {
+        inDataSet.map(row -> {
+            System.out.println(row);
+            ASSERT_EXECUTOR
+                    .fail(row, assertFieldRules)
+                    .ifPresent(failRule -> {
+                        throw new IllegalStateException("row :" + row + " fail rule: " + failRule);
+                    });
+            return null;
+        });
+    }
+
+    @Override
+    public void outputStream(FlinkEnvironment env, DataStream<Row> dataStream) {
+        dataStream.map(row -> {
+            System.out.println(row);

Review Comment:
   I didn't want to `system.out` here either at first, but I found that you wouldn't know the progress of code if there is nothing output here when all data is valid. Maybe we can use LOGGER here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lhyundeadsoul closed pull request #2022: [Feature][Connector]add AssertSink connector

Posted by GitBox <gi...@apache.org>.
lhyundeadsoul closed pull request #2022: [Feature][Connector]add AssertSink connector
URL: https://github.com/apache/incubator-seatunnel/pull/2022


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ruanwenjun commented on a diff in pull request #2022: [Feature][Connector]add AssertSink connector

Posted by GitBox <gi...@apache.org>.
ruanwenjun commented on code in PR #2022:
URL: https://github.com/apache/incubator-seatunnel/pull/2022#discussion_r899879835


##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/sink/AssertSink.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.assertion.sink;
+
+import com.google.auto.service.AutoService;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.BaseFlinkSink;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.assertion.AssertExecutor;
+import org.apache.seatunnel.flink.assertion.rule.AssertFieldRule;
+import org.apache.seatunnel.flink.assertion.rule.AssertRuleParser;
+import org.apache.seatunnel.flink.batch.FlinkBatchSink;
+import org.apache.seatunnel.flink.stream.FlinkStreamSink;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.util.List;
+
+/**
+ * A flink sink plugin which can assert illegal data by user defined rules
+ * Refer to https://github.com/apache/incubator-seatunnel/issues/1912
+ */
+@AutoService(BaseFlinkSink.class)
+public class AssertSink implements FlinkBatchSink, FlinkStreamSink {
+    //The assertion executor
+    private static final AssertExecutor ASSERT_EXECUTOR = new AssertExecutor();
+    //User defined rules used to assert illegal data
+    private List<AssertFieldRule> assertFieldRules;
+    private static final String RULES = "rules";
+    private Config config;
+    private List<? extends Config> configList;
+
+    @Override
+    public void outputBatch(FlinkEnvironment env, DataSet<Row> inDataSet) {
+        inDataSet.map(row -> {

Review Comment:
   Ok, this is not important thing, I close this conversation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lhyundeadsoul commented on pull request #2022: [Feature][Connector]add AssertSink connector

Posted by GitBox <gi...@apache.org>.
lhyundeadsoul commented on PR #2022:
URL: https://github.com/apache/incubator-seatunnel/pull/2022#issuecomment-1157734531

   @ruanwenjun https://github.com/apache/incubator-seatunnel/issues/1912 has been done. Can you help me to review the code ? Thx!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lhyundeadsoul commented on a diff in pull request #2022: [Feature][Connector]add AssertSink connector

Posted by GitBox <gi...@apache.org>.
lhyundeadsoul commented on code in PR #2022:
URL: https://github.com/apache/incubator-seatunnel/pull/2022#discussion_r899714941


##########
seatunnel-examples/seatunnel-flink-examples/src/main/resources/examples/fake_to_console.conf:
##########
@@ -48,6 +48,44 @@ transform {
 sink {
   ConsoleSink {}
 
+  #AssertSink {

Review Comment:
   Can you tell me where can add doc for this plugin?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ruanwenjun commented on a diff in pull request #2022: [Feature][Connector]add AssertSink connector

Posted by GitBox <gi...@apache.org>.
ruanwenjun commented on code in PR #2022:
URL: https://github.com/apache/incubator-seatunnel/pull/2022#discussion_r899723518


##########
seatunnel-examples/seatunnel-flink-examples/src/main/resources/examples/fake_to_console.conf:
##########
@@ -48,6 +48,44 @@ transform {
 sink {
   ConsoleSink {}
 
+  #AssertSink {

Review Comment:
   You can add log under: https://github.com/apache/incubator-seatunnel/tree/dev/docs/en/connector/sink



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ruanwenjun merged pull request #2022: [Feature][Connector]add AssertSink connector

Posted by GitBox <gi...@apache.org>.
ruanwenjun merged PR #2022:
URL: https://github.com/apache/incubator-seatunnel/pull/2022


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lhyundeadsoul commented on a diff in pull request #2022: [Feature][Connector]add AssertSink connector

Posted by GitBox <gi...@apache.org>.
lhyundeadsoul commented on code in PR #2022:
URL: https://github.com/apache/incubator-seatunnel/pull/2022#discussion_r899874693


##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/sink/AssertSink.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.assertion.sink;
+
+import com.google.auto.service.AutoService;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.BaseFlinkSink;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.assertion.AssertExecutor;
+import org.apache.seatunnel.flink.assertion.rule.AssertFieldRule;
+import org.apache.seatunnel.flink.assertion.rule.AssertRuleParser;
+import org.apache.seatunnel.flink.batch.FlinkBatchSink;
+import org.apache.seatunnel.flink.stream.FlinkStreamSink;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.util.List;
+
+/**
+ * A flink sink plugin which can assert illegal data by user defined rules
+ * Refer to https://github.com/apache/incubator-seatunnel/issues/1912
+ */
+@AutoService(BaseFlinkSink.class)
+public class AssertSink implements FlinkBatchSink, FlinkStreamSink {
+    //The assertion executor
+    private static final AssertExecutor ASSERT_EXECUTOR = new AssertExecutor();
+    //User defined rules used to assert illegal data
+    private List<AssertFieldRule> assertFieldRules;
+    private static final String RULES = "rules";
+    private Config config;
+    private List<? extends Config> configList;
+
+    @Override
+    public void outputBatch(FlinkEnvironment env, DataSet<Row> inDataSet) {
+        inDataSet.map(row -> {

Review Comment:
   code will be like
   ![ima](https://user-images.githubusercontent.com/5850103/174255498-d71b2362-35a1-4503-9a96-6a80dabaa7be.png)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lhyundeadsoul commented on pull request #2022: [Feature][Connector]add AssertSink connector

Posted by GitBox <gi...@apache.org>.
lhyundeadsoul commented on PR #2022:
URL: https://github.com/apache/incubator-seatunnel/pull/2022#issuecomment-1159738611

   > LGTM, you can add IT to test this plugin.
   
   Thanks very much for reviewing. Hope to help community develop more efficiently!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lhyundeadsoul commented on a diff in pull request #2022: [Feature][Connector]add AssertSink connector

Posted by GitBox <gi...@apache.org>.
lhyundeadsoul commented on code in PR #2022:
URL: https://github.com/apache/incubator-seatunnel/pull/2022#discussion_r899727455


##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/sink/AssertSink.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.assertion.sink;
+
+import com.google.auto.service.AutoService;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.BaseFlinkSink;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.assertion.AssertExecutor;
+import org.apache.seatunnel.flink.assertion.rule.AssertFieldRule;
+import org.apache.seatunnel.flink.assertion.rule.AssertRuleParser;
+import org.apache.seatunnel.flink.batch.FlinkBatchSink;
+import org.apache.seatunnel.flink.stream.FlinkStreamSink;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.util.List;
+
+/**
+ * A flink sink plugin which can assert illegal data by user defined rules
+ * Refer to https://github.com/apache/incubator-seatunnel/issues/1912
+ */
+@AutoService(BaseFlinkSink.class)
+public class AssertSink implements FlinkBatchSink, FlinkStreamSink {
+    //The assertion executor
+    private static final AssertExecutor ASSERT_EXECUTOR = new AssertExecutor();
+    //User defined rules used to assert illegal data
+    private List<AssertFieldRule> assertFieldRules;
+    private static final String RULES = "rules";
+    private Config config;
+    private List<? extends Config> configList;
+
+    @Override
+    public void outputBatch(FlinkEnvironment env, DataSet<Row> inDataSet) {
+        inDataSet.map(row -> {
+            System.out.println(row);
+            ASSERT_EXECUTOR
+                    .fail(row, assertFieldRules)
+                    .ifPresent(failRule -> {
+                        throw new IllegalStateException("row :" + row + " fail rule: " + failRule);
+                    });
+            return null;
+        });
+    }
+
+    @Override
+    public void outputStream(FlinkEnvironment env, DataStream<Row> dataStream) {
+        dataStream.map(row -> {
+            System.out.println(row);

Review Comment:
   OK



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lhyundeadsoul commented on a diff in pull request #2022: [Feature][Connector]add AssertSink connector

Posted by GitBox <gi...@apache.org>.
lhyundeadsoul commented on code in PR #2022:
URL: https://github.com/apache/incubator-seatunnel/pull/2022#discussion_r899713701


##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/sink/AssertSink.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.assertion.sink;
+
+import com.google.auto.service.AutoService;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.BaseFlinkSink;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.assertion.AssertExecutor;
+import org.apache.seatunnel.flink.assertion.rule.AssertFieldRule;
+import org.apache.seatunnel.flink.assertion.rule.AssertRuleParser;
+import org.apache.seatunnel.flink.batch.FlinkBatchSink;
+import org.apache.seatunnel.flink.stream.FlinkStreamSink;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.util.List;
+
+/**
+ * A flink sink plugin which can assert illegal data by user defined rules
+ * Refer to https://github.com/apache/incubator-seatunnel/issues/1912
+ */
+@AutoService(BaseFlinkSink.class)
+public class AssertSink implements FlinkBatchSink, FlinkStreamSink {
+    //The assertion executor
+    private static final AssertExecutor ASSERT_EXECUTOR = new AssertExecutor();
+    //User defined rules used to assert illegal data
+    private List<AssertFieldRule> assertFieldRules;
+    private static final String RULES = "rules";
+    private Config config;
+    private List<? extends Config> configList;
+
+    @Override
+    public void outputBatch(FlinkEnvironment env, DataSet<Row> inDataSet) {
+        inDataSet.map(row -> {
+            System.out.println(row);
+            ASSERT_EXECUTOR
+                    .fail(row, assertFieldRules)
+                    .ifPresent(failRule -> {
+                        throw new IllegalStateException("row :" + row + " fail rule: " + failRule);
+                    });
+            return null;
+        });
+    }
+
+    @Override
+    public void outputStream(FlinkEnvironment env, DataStream<Row> dataStream) {
+        dataStream.map(row -> {
+            System.out.println(row);

Review Comment:
   I didn't want to `system.out` here either at first, but I found that you wouldn't know the progress of code if there is nothing output here when all data is valid. Maybe we can use LOGGER here?
   
   There is no reduce function in dataStream API. If we want to use reduce function, we must keyBy() first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lhyundeadsoul commented on a diff in pull request #2022: [Feature][Connector]add AssertSink connector

Posted by GitBox <gi...@apache.org>.
lhyundeadsoul commented on code in PR #2022:
URL: https://github.com/apache/incubator-seatunnel/pull/2022#discussion_r899727455


##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/sink/AssertSink.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.assertion.sink;
+
+import com.google.auto.service.AutoService;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.BaseFlinkSink;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.assertion.AssertExecutor;
+import org.apache.seatunnel.flink.assertion.rule.AssertFieldRule;
+import org.apache.seatunnel.flink.assertion.rule.AssertRuleParser;
+import org.apache.seatunnel.flink.batch.FlinkBatchSink;
+import org.apache.seatunnel.flink.stream.FlinkStreamSink;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.util.List;
+
+/**
+ * A flink sink plugin which can assert illegal data by user defined rules
+ * Refer to https://github.com/apache/incubator-seatunnel/issues/1912
+ */
+@AutoService(BaseFlinkSink.class)
+public class AssertSink implements FlinkBatchSink, FlinkStreamSink {
+    //The assertion executor
+    private static final AssertExecutor ASSERT_EXECUTOR = new AssertExecutor();
+    //User defined rules used to assert illegal data
+    private List<AssertFieldRule> assertFieldRules;
+    private static final String RULES = "rules";
+    private Config config;
+    private List<? extends Config> configList;
+
+    @Override
+    public void outputBatch(FlinkEnvironment env, DataSet<Row> inDataSet) {
+        inDataSet.map(row -> {
+            System.out.println(row);
+            ASSERT_EXECUTOR
+                    .fail(row, assertFieldRules)
+                    .ifPresent(failRule -> {
+                        throw new IllegalStateException("row :" + row + " fail rule: " + failRule);
+                    });
+            return null;
+        });
+    }
+
+    @Override
+    public void outputStream(FlinkEnvironment env, DataStream<Row> dataStream) {
+        dataStream.map(row -> {
+            System.out.println(row);

Review Comment:
   OK, do not log here.
   
   Please take a look this:
   There is no reduce function in dataStream API. If we want to use reduce function, we must keyBy() first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lhyundeadsoul commented on a diff in pull request #2022: [Feature][Connector]add AssertSink connector

Posted by GitBox <gi...@apache.org>.
lhyundeadsoul commented on code in PR #2022:
URL: https://github.com/apache/incubator-seatunnel/pull/2022#discussion_r899725353


##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/sink/AssertSink.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.assertion.sink;
+
+import com.google.auto.service.AutoService;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.BaseFlinkSink;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.assertion.AssertExecutor;
+import org.apache.seatunnel.flink.assertion.rule.AssertFieldRule;
+import org.apache.seatunnel.flink.assertion.rule.AssertRuleParser;
+import org.apache.seatunnel.flink.batch.FlinkBatchSink;
+import org.apache.seatunnel.flink.stream.FlinkStreamSink;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.util.List;
+
+/**
+ * A flink sink plugin which can assert illegal data by user defined rules
+ * Refer to https://github.com/apache/incubator-seatunnel/issues/1912
+ */
+@AutoService(BaseFlinkSink.class)
+public class AssertSink implements FlinkBatchSink, FlinkStreamSink {
+    //The assertion executor
+    private static final AssertExecutor ASSERT_EXECUTOR = new AssertExecutor();
+    //User defined rules used to assert illegal data
+    private List<AssertFieldRule> assertFieldRules;
+    private static final String RULES = "rules";
+    private Config config;
+    private List<? extends Config> configList;
+
+    @Override
+    public void outputBatch(FlinkEnvironment env, DataSet<Row> inDataSet) {
+        inDataSet.map(row -> {

Review Comment:
   > 
   Here is reduce Defination in `org.apache.flink.api.common.functions.ReduceFunction`
   `    T reduce(T value1, T value2) throws Exception;`
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lhyundeadsoul commented on pull request #2022: [Feature][Connector]add AssertSink connector

Posted by GitBox <gi...@apache.org>.
lhyundeadsoul commented on PR #2022:
URL: https://github.com/apache/incubator-seatunnel/pull/2022#issuecomment-1158569407

   There is so many [ImportOrder] checkstyle error.  All of them looks tricky. How can I correct my code import order automatically?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ruanwenjun commented on a diff in pull request #2022: [Feature][Connector]add AssertSink connector

Posted by GitBox <gi...@apache.org>.
ruanwenjun commented on code in PR #2022:
URL: https://github.com/apache/incubator-seatunnel/pull/2022#discussion_r899199866


##########
seatunnel-connectors/seatunnel-connectors-flink/pom.xml:
##########
@@ -43,7 +43,7 @@
         <module>seatunnel-connector-flink-doris</module>
         <module>seatunnel-connector-flink-influxdb</module>
         <module>seatunnel-connector-flink-clickhouse</module>
-        <module>seatunnel-connector-flink-http</module>
+        <module>seatunnel-connector-flink-assert</module>

Review Comment:
   Please add a new module, rather than change http to assert



##########
docs/sidebars.js:
##########
@@ -128,6 +128,9 @@ const sidebars = {
         'transform/sql',
         'transform/split',
         'transform/json',
+        'transform/replace',
+        'transform/udf',
+        'transform/uuid',

Review Comment:
   In my knowledge, the sidebars will be auto generated(I am not sure), but you don't need to do this change in this PR.



##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/sink/AssertSink.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.assertion.sink;
+
+import com.google.auto.service.AutoService;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.BaseFlinkSink;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.assertion.AssertExecutor;
+import org.apache.seatunnel.flink.assertion.rule.AssertFieldRule;
+import org.apache.seatunnel.flink.assertion.rule.AssertRuleParser;
+import org.apache.seatunnel.flink.batch.FlinkBatchSink;
+import org.apache.seatunnel.flink.stream.FlinkStreamSink;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.util.List;
+
+/**
+ * A flink sink plugin which can assert illegal data by user defined rules
+ * Refer to https://github.com/apache/incubator-seatunnel/issues/1912
+ */
+@AutoService(BaseFlinkSink.class)
+public class AssertSink implements FlinkBatchSink, FlinkStreamSink {
+    //The assertion executor
+    private static final AssertExecutor ASSERT_EXECUTOR = new AssertExecutor();
+    //User defined rules used to assert illegal data
+    private List<AssertFieldRule> assertFieldRules;
+    private static final String RULES = "rules";
+    private Config config;
+    private List<? extends Config> configList;
+
+    @Override
+    public void outputBatch(FlinkEnvironment env, DataSet<Row> inDataSet) {
+        inDataSet.map(row -> {

Review Comment:
   It's better to use reduce here.



##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/sink/AssertSink.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.assertion.sink;
+
+import com.google.auto.service.AutoService;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.BaseFlinkSink;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.assertion.AssertExecutor;
+import org.apache.seatunnel.flink.assertion.rule.AssertFieldRule;
+import org.apache.seatunnel.flink.assertion.rule.AssertRuleParser;
+import org.apache.seatunnel.flink.batch.FlinkBatchSink;
+import org.apache.seatunnel.flink.stream.FlinkStreamSink;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.util.List;
+
+/**
+ * A flink sink plugin which can assert illegal data by user defined rules
+ * Refer to https://github.com/apache/incubator-seatunnel/issues/1912
+ */
+@AutoService(BaseFlinkSink.class)
+public class AssertSink implements FlinkBatchSink, FlinkStreamSink {
+    //The assertion executor
+    private static final AssertExecutor ASSERT_EXECUTOR = new AssertExecutor();
+    //User defined rules used to assert illegal data
+    private List<AssertFieldRule> assertFieldRules;
+    private static final String RULES = "rules";
+    private Config config;
+    private List<? extends Config> configList;
+
+    @Override
+    public void outputBatch(FlinkEnvironment env, DataSet<Row> inDataSet) {
+        inDataSet.map(row -> {
+            System.out.println(row);

Review Comment:
   ```suggestion
               
   ```



##########
seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkRunMode.java:
##########
@@ -35,4 +35,8 @@ public enum FlinkRunMode {
     public String getMode() {
         return mode;
     }
+
+    public String toString() {
+        return mode;

Review Comment:
   ```suggestion
           return "FlinkRunMode{" +
               "mode='" + mode + '\'' +
               '}';
   ```



##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/sink/AssertSink.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.assertion.sink;
+
+import com.google.auto.service.AutoService;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.BaseFlinkSink;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.assertion.AssertExecutor;
+import org.apache.seatunnel.flink.assertion.rule.AssertFieldRule;
+import org.apache.seatunnel.flink.assertion.rule.AssertRuleParser;
+import org.apache.seatunnel.flink.batch.FlinkBatchSink;
+import org.apache.seatunnel.flink.stream.FlinkStreamSink;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.util.List;
+
+/**
+ * A flink sink plugin which can assert illegal data by user defined rules
+ * Refer to https://github.com/apache/incubator-seatunnel/issues/1912
+ */
+@AutoService(BaseFlinkSink.class)
+public class AssertSink implements FlinkBatchSink, FlinkStreamSink {
+    //The assertion executor
+    private static final AssertExecutor ASSERT_EXECUTOR = new AssertExecutor();
+    //User defined rules used to assert illegal data
+    private List<AssertFieldRule> assertFieldRules;
+    private static final String RULES = "rules";
+    private Config config;
+    private List<? extends Config> configList;
+
+    @Override
+    public void outputBatch(FlinkEnvironment env, DataSet<Row> inDataSet) {
+        inDataSet.map(row -> {
+            System.out.println(row);
+            ASSERT_EXECUTOR
+                    .fail(row, assertFieldRules)
+                    .ifPresent(failRule -> {
+                        throw new IllegalStateException("row :" + row + " fail rule: " + failRule);
+                    });
+            return null;
+        });
+    }
+
+    @Override
+    public void outputStream(FlinkEnvironment env, DataStream<Row> dataStream) {
+        dataStream.map(row -> {
+            System.out.println(row);

Review Comment:
   ```suggestion
           dataStream.reduce(row -> {
   
   ```



##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/test/java/org/apache/seatunnel/flink/assertion/AssertExecutorTest.java:
##########
@@ -0,0 +1,86 @@
+package org.apache.seatunnel.flink.assertion;

Review Comment:
   Please add license header



##########
seatunnel-examples/seatunnel-flink-examples/src/main/resources/examples/fake_to_console.conf:
##########
@@ -48,6 +48,44 @@ transform {
 sink {
   ConsoleSink {}
 
+  #AssertSink {

Review Comment:
   Please revert this change, and add doc for this plugin.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ruanwenjun commented on a diff in pull request #2022: [Feature][Connector]add AssertSink connector

Posted by GitBox <gi...@apache.org>.
ruanwenjun commented on code in PR #2022:
URL: https://github.com/apache/incubator-seatunnel/pull/2022#discussion_r899722480


##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/sink/AssertSink.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.assertion.sink;
+
+import com.google.auto.service.AutoService;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.BaseFlinkSink;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.assertion.AssertExecutor;
+import org.apache.seatunnel.flink.assertion.rule.AssertFieldRule;
+import org.apache.seatunnel.flink.assertion.rule.AssertRuleParser;
+import org.apache.seatunnel.flink.batch.FlinkBatchSink;
+import org.apache.seatunnel.flink.stream.FlinkStreamSink;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.util.List;
+
+/**
+ * A flink sink plugin which can assert illegal data by user defined rules
+ * Refer to https://github.com/apache/incubator-seatunnel/issues/1912
+ */
+@AutoService(BaseFlinkSink.class)
+public class AssertSink implements FlinkBatchSink, FlinkStreamSink {
+    //The assertion executor
+    private static final AssertExecutor ASSERT_EXECUTOR = new AssertExecutor();
+    //User defined rules used to assert illegal data
+    private List<AssertFieldRule> assertFieldRules;
+    private static final String RULES = "rules";
+    private Config config;
+    private List<? extends Config> configList;
+
+    @Override
+    public void outputBatch(FlinkEnvironment env, DataSet<Row> inDataSet) {
+        inDataSet.map(row -> {

Review Comment:
   Why two row data? In my knowledge, reduce is like map, but it just doesn't need to return data. You don't need to get a result in inDataSet, so you return null here. You can use reduce, then you don't need to return null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lhyundeadsoul commented on a diff in pull request #2022: [Feature][Connector]add AssertSink connector

Posted by GitBox <gi...@apache.org>.
lhyundeadsoul commented on code in PR #2022:
URL: https://github.com/apache/incubator-seatunnel/pull/2022#discussion_r899725353


##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/sink/AssertSink.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.assertion.sink;
+
+import com.google.auto.service.AutoService;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.BaseFlinkSink;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.assertion.AssertExecutor;
+import org.apache.seatunnel.flink.assertion.rule.AssertFieldRule;
+import org.apache.seatunnel.flink.assertion.rule.AssertRuleParser;
+import org.apache.seatunnel.flink.batch.FlinkBatchSink;
+import org.apache.seatunnel.flink.stream.FlinkStreamSink;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.util.List;
+
+/**
+ * A flink sink plugin which can assert illegal data by user defined rules
+ * Refer to https://github.com/apache/incubator-seatunnel/issues/1912
+ */
+@AutoService(BaseFlinkSink.class)
+public class AssertSink implements FlinkBatchSink, FlinkStreamSink {
+    //The assertion executor
+    private static final AssertExecutor ASSERT_EXECUTOR = new AssertExecutor();
+    //User defined rules used to assert illegal data
+    private List<AssertFieldRule> assertFieldRules;
+    private static final String RULES = "rules";
+    private Config config;
+    private List<? extends Config> configList;
+
+    @Override
+    public void outputBatch(FlinkEnvironment env, DataSet<Row> inDataSet) {
+        inDataSet.map(row -> {

Review Comment:
   > 
   Here is reduce Defination in `org.apache.flink.api.common.functions.ReduceFunction`
       T reduce(T value1, T value2) throws Exception;
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ruanwenjun commented on a diff in pull request #2022: [Feature][Connector]add AssertSink connector

Posted by GitBox <gi...@apache.org>.
ruanwenjun commented on code in PR #2022:
URL: https://github.com/apache/incubator-seatunnel/pull/2022#discussion_r900280410


##########
seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java:
##########
@@ -118,6 +118,6 @@ public Config getConfig() {
     }
 
     private boolean whetherExecute(List<FlinkBatchSink> sinks) {
-        return sinks.stream().anyMatch(s -> !"ConsoleSink".equals(s.getPluginName()));
+        return sinks.stream().anyMatch(s -> !"ConsoleSink".equals(s.getPluginName()) && !"AssertSink".equals(s.getPluginName()));

Review Comment:
   In my knowledge, you cannot add this check here, this will not call execute method, so your flink job will not be submitted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lhyundeadsoul commented on pull request #2022: [Feature][Connector]add AssertSink connector

Posted by GitBox <gi...@apache.org>.
lhyundeadsoul commented on PR #2022:
URL: https://github.com/apache/incubator-seatunnel/pull/2022#issuecomment-1157739813

   AssertSink config is at` examples/fake_to_console.conf:56` ,uncomment and it can be used by `org.apache.seatunnel.example.flink.LocalFlinkExample`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ruanwenjun commented on a diff in pull request #2022: [Feature][Connector]add AssertSink connector

Posted by GitBox <gi...@apache.org>.
ruanwenjun commented on code in PR #2022:
URL: https://github.com/apache/incubator-seatunnel/pull/2022#discussion_r900276305


##########
docs/en/connector/sink/Assert.md:
##########
@@ -0,0 +1,105 @@
+# Assert
+
+> # Sink plugin: Assert [Flink]
+
+## Description
+
+A flink sink plugin which can assert illegal data by user defined rules
+
+:::tip
+
+Engine Supported and plugin name
+
+* [ ] Spark
+* [x] Flink: AssertSink
+
+:::
+
+## Options
+
+| name                    | type     | required | default value |
+| ----------------------- | -------- | -------- | ------------- |
+| rules         		  | ConfigList | yes      | -             |
+| &ensp;field_name        | `String` | yes       | -     |
+| &ensp;field_type        | `String` | no       | -          |
+| &ensp;field_value | ConfigList | no       | -             |
+| &ensp;&ensp;rule_type         | `String`    | no       | -             |
+| &ensp;&ensp;rule_value         | double    | no       | -             |
+
+
+### rules
+
+Rule definition of user's available data.  Each rule represents one field validation.
+
+### field_name
+
+field name(string)
+
+### field_type
+
+field type (string),  e.g. `short/int/long/string/boolean`

Review Comment:
   Please give the complete supported field types in the doc, don't just say 
   e.g. `short/int/long/string/boolean`, if I see this, I will believe that the assert plugin only support these 5 types.



##########
docs/en/connector/sink/Assert.md:
##########
@@ -0,0 +1,105 @@
+# Assert
+
+> # Sink plugin: Assert [Flink]
+
+## Description
+
+A flink sink plugin which can assert illegal data by user defined rules
+
+:::tip
+
+Engine Supported and plugin name
+
+* [ ] Spark
+* [x] Flink: AssertSink
+
+:::
+
+## Options
+
+| name                    | type     | required | default value |
+| ----------------------- | -------- | -------- | ------------- |
+| rules         		  | ConfigList | yes      | -             |
+| &ensp;field_name        | `String` | yes       | -     |
+| &ensp;field_type        | `String` | no       | -          |
+| &ensp;field_value | ConfigList | no       | -             |
+| &ensp;&ensp;rule_type         | `String`    | no       | -             |
+| &ensp;&ensp;rule_value         | double    | no       | -             |
+
+
+### rules
+
+Rule definition of user's available data.  Each rule represents one field validation.
+
+### field_name
+
+field name(string)
+
+### field_type
+
+field type (string),  e.g. `short/int/long/string/boolean`
+
+### field_value
+
+A list value rule define the data value validation
+
+### rule_type
+
+The following rules are supported for now
+`
+NOT_NULL,  
+MIN,  
+MAX,  
+MIN_LENGTH,  
+MAX_LENGTH

Review Comment:
   Please clarify the meaning of each rule type, and if we break the rule, what will happen.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lhyundeadsoul commented on a diff in pull request #2022: [Feature][Connector]add AssertSink connector

Posted by GitBox <gi...@apache.org>.
lhyundeadsoul commented on code in PR #2022:
URL: https://github.com/apache/incubator-seatunnel/pull/2022#discussion_r899711006


##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/sink/AssertSink.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.assertion.sink;
+
+import com.google.auto.service.AutoService;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.BaseFlinkSink;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.assertion.AssertExecutor;
+import org.apache.seatunnel.flink.assertion.rule.AssertFieldRule;
+import org.apache.seatunnel.flink.assertion.rule.AssertRuleParser;
+import org.apache.seatunnel.flink.batch.FlinkBatchSink;
+import org.apache.seatunnel.flink.stream.FlinkStreamSink;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.util.List;
+
+/**
+ * A flink sink plugin which can assert illegal data by user defined rules
+ * Refer to https://github.com/apache/incubator-seatunnel/issues/1912
+ */
+@AutoService(BaseFlinkSink.class)
+public class AssertSink implements FlinkBatchSink, FlinkStreamSink {
+    //The assertion executor
+    private static final AssertExecutor ASSERT_EXECUTOR = new AssertExecutor();
+    //User defined rules used to assert illegal data
+    private List<AssertFieldRule> assertFieldRules;
+    private static final String RULES = "rules";
+    private Config config;
+    private List<? extends Config> configList;
+
+    @Override
+    public void outputBatch(FlinkEnvironment env, DataSet<Row> inDataSet) {
+        inDataSet.map(row -> {

Review Comment:
   Thanks for your suggestion. 
   But Reduce function need to process two row data in one place, that looks a little bit strange.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lhyundeadsoul commented on pull request #2022: [Feature][Connector]add AssertSink connector

Posted by GitBox <gi...@apache.org>.
lhyundeadsoul commented on PR #2022:
URL: https://github.com/apache/incubator-seatunnel/pull/2022#issuecomment-1158704525

   > 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lhyundeadsoul commented on a diff in pull request #2022: [Feature][Connector]add AssertSink connector

Posted by GitBox <gi...@apache.org>.
lhyundeadsoul commented on code in PR #2022:
URL: https://github.com/apache/incubator-seatunnel/pull/2022#discussion_r899713701


##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/sink/AssertSink.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.assertion.sink;
+
+import com.google.auto.service.AutoService;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.BaseFlinkSink;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.assertion.AssertExecutor;
+import org.apache.seatunnel.flink.assertion.rule.AssertFieldRule;
+import org.apache.seatunnel.flink.assertion.rule.AssertRuleParser;
+import org.apache.seatunnel.flink.batch.FlinkBatchSink;
+import org.apache.seatunnel.flink.stream.FlinkStreamSink;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.util.List;
+
+/**
+ * A flink sink plugin which can assert illegal data by user defined rules
+ * Refer to https://github.com/apache/incubator-seatunnel/issues/1912
+ */
+@AutoService(BaseFlinkSink.class)
+public class AssertSink implements FlinkBatchSink, FlinkStreamSink {
+    //The assertion executor
+    private static final AssertExecutor ASSERT_EXECUTOR = new AssertExecutor();
+    //User defined rules used to assert illegal data
+    private List<AssertFieldRule> assertFieldRules;
+    private static final String RULES = "rules";
+    private Config config;
+    private List<? extends Config> configList;
+
+    @Override
+    public void outputBatch(FlinkEnvironment env, DataSet<Row> inDataSet) {
+        inDataSet.map(row -> {
+            System.out.println(row);
+            ASSERT_EXECUTOR
+                    .fail(row, assertFieldRules)
+                    .ifPresent(failRule -> {
+                        throw new IllegalStateException("row :" + row + " fail rule: " + failRule);
+                    });
+            return null;
+        });
+    }
+
+    @Override
+    public void outputStream(FlinkEnvironment env, DataStream<Row> dataStream) {
+        dataStream.map(row -> {
+            System.out.println(row);

Review Comment:
   I didn't want to `system.out` here either at first, but I found that you wouldn't know the progress of code if there is nothing output here when all data is valid. Maybe we can use LOGGER here?
   
   There is no reduce function in dataStream API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ruanwenjun commented on pull request #2022: [Feature][Connector]add AssertSink connector

Posted by GitBox <gi...@apache.org>.
ruanwenjun commented on PR #2022:
URL: https://github.com/apache/incubator-seatunnel/pull/2022#issuecomment-1158619212

   > There is so many [ImportOrder] checkstyle error. All of them looks tricky. Is there any approach to correct my code import order automatically? @ruanwenjun
   
   You need to import `tools/checkstyle/checkstyle.xml` into your IDE.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lhyundeadsoul commented on a diff in pull request #2022: [Feature][Connector]add AssertSink connector

Posted by GitBox <gi...@apache.org>.
lhyundeadsoul commented on code in PR #2022:
URL: https://github.com/apache/incubator-seatunnel/pull/2022#discussion_r899725353


##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/sink/AssertSink.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.assertion.sink;
+
+import com.google.auto.service.AutoService;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.BaseFlinkSink;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.assertion.AssertExecutor;
+import org.apache.seatunnel.flink.assertion.rule.AssertFieldRule;
+import org.apache.seatunnel.flink.assertion.rule.AssertRuleParser;
+import org.apache.seatunnel.flink.batch.FlinkBatchSink;
+import org.apache.seatunnel.flink.stream.FlinkStreamSink;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.util.List;
+
+/**
+ * A flink sink plugin which can assert illegal data by user defined rules
+ * Refer to https://github.com/apache/incubator-seatunnel/issues/1912
+ */
+@AutoService(BaseFlinkSink.class)
+public class AssertSink implements FlinkBatchSink, FlinkStreamSink {
+    //The assertion executor
+    private static final AssertExecutor ASSERT_EXECUTOR = new AssertExecutor();
+    //User defined rules used to assert illegal data
+    private List<AssertFieldRule> assertFieldRules;
+    private static final String RULES = "rules";
+    private Config config;
+    private List<? extends Config> configList;
+
+    @Override
+    public void outputBatch(FlinkEnvironment env, DataSet<Row> inDataSet) {
+        inDataSet.map(row -> {

Review Comment:
   > 
   Here is reduce Defination
       T reduce(T value1, T value2) throws Exception;
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org