You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/07/01 02:11:05 UTC

[incubator-seatunnel] branch dev updated: [Feature][Connector] add IT for Assert Sink in e2e module (#2036)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new cd1241d02 [Feature][Connector] add IT for Assert Sink in e2e module (#2036)
cd1241d02 is described below

commit cd1241d0234c9f4d2b7e0cd38ab3f7bc46f166b5
Author: Jared Li <lh...@gmail.com>
AuthorDate: Fri Jul 1 10:11:00 2022 +0800

    [Feature][Connector] add IT for Assert Sink in e2e module (#2036)
    
    * [Feature][Connector] add IT for Assert Sink in e2e module
    
    * [Feature][Connector] add IT for Assert Sink in e2e module
    add licence
    
    * Update fakesource_to_assert.conf
    
    amend assert config to fit FakeSource config
    
    * [Feature][Connector] amend method name
    
    * [connector][assert] there must be a sink in flink progress. so I add print()
    
    * [connector][assert] use collect instead print in DataSet
    
    * [connector][assert] Fixed AssertSink submit job twice
---
 .../seatunnel/flink/batch/FlinkBatchExecution.java |  2 +-
 seatunnel-connectors/plugin-mapping.properties     |  1 +
 .../seatunnel/flink/assertion/sink/AssertSink.java | 24 +++---
 .../e2e/flink/assertion/FakeSourceToAssertIT.java  | 35 +++++++++
 .../resources/assertion/fakesource_to_assert.conf  | 88 ++++++++++++++++++++++
 5 files changed, 139 insertions(+), 11 deletions(-)

diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
index 5ea03d315..a0ff40894 100644
--- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
+++ b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
@@ -118,6 +118,6 @@ public class FlinkBatchExecution implements Execution<FlinkBatchSource, FlinkBat
     }
 
     private boolean whetherExecute(List<FlinkBatchSink> sinks) {
-        return sinks.stream().anyMatch(s -> !"ConsoleSink".equals(s.getPluginName()));
+        return sinks.stream().noneMatch(s -> "ConsoleSink".equals(s.getPluginName()) || "AssertSink".equals(s.getPluginName()));
     }
 }
diff --git a/seatunnel-connectors/plugin-mapping.properties b/seatunnel-connectors/plugin-mapping.properties
index 619911fe9..f4bfbf954 100644
--- a/seatunnel-connectors/plugin-mapping.properties
+++ b/seatunnel-connectors/plugin-mapping.properties
@@ -41,6 +41,7 @@ flink.sink.FileSink = seatunnel-connector-flink-file
 flink.sink.InfluxDbSink = seatunnel-connector-flink-influxdb
 flink.sink.JdbcSink = seatunnel-connector-flink-jdbc
 flink.sink.Kafka = seatunnel-connector-flink-kafka
+flink.sink.AssertSink = seatunnel-connector-flink-assert
 
 # Spark Source
 
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/sink/AssertSink.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/sink/AssertSink.java
index 7de846245..a1087dcb2 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/sink/AssertSink.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/sink/AssertSink.java
@@ -29,6 +29,7 @@ import org.apache.seatunnel.flink.stream.FlinkStreamSink;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import com.google.auto.service.AutoService;
+import lombok.SneakyThrows;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -37,11 +38,11 @@ import org.apache.flink.types.Row;
 import java.util.List;
 
 /**
- * A flink sink plugin which can assert illegal data by user defined rules
- * Refer to https://github.com/apache/incubator-seatunnel/issues/1912
+ * A flink sink plugin which can assert illegal data by user defined rules Refer to https://github.com/apache/incubator-seatunnel/issues/1912
  */
 @AutoService(BaseFlinkSink.class)
 public class AssertSink implements FlinkBatchSink, FlinkStreamSink {
+
     //The assertion executor
     private static final AssertExecutor ASSERT_EXECUTOR = new AssertExecutor();
     //User defined rules used to assert illegal data
@@ -50,16 +51,19 @@ public class AssertSink implements FlinkBatchSink, FlinkStreamSink {
     private Config config;
     private List<? extends Config> configList;
 
+    @SneakyThrows
     @Override
     public void outputBatch(FlinkEnvironment env, DataSet<Row> inDataSet) {
-        inDataSet.map(row -> {
-            ASSERT_EXECUTOR
-                .fail(row, assertFieldRules)
-                .ifPresent(failRule -> {
-                    throw new IllegalStateException("row :" + row + " fail rule: " + failRule);
-                });
-            return null;
-        });
+        try {
+            inDataSet.collect().forEach(row ->
+                ASSERT_EXECUTOR
+                    .fail(row, assertFieldRules)
+                    .ifPresent(failRule -> {
+                        throw new IllegalStateException("row :" + row + " fail rule: " + failRule);
+                    }));
+        } catch (Exception ex) {
+            throw new RuntimeException("AssertSink execute failed", ex);
+        }
     }
 
     @Override
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/assertion/FakeSourceToAssertIT.java b/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/assertion/FakeSourceToAssertIT.java
new file mode 100644
index 000000000..cc67b836a
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/assertion/FakeSourceToAssertIT.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.assertion;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+public class FakeSourceToAssertIT extends FlinkContainer {
+
+    @Test
+    public void testFakeSourceToAssertSink() throws IOException, InterruptedException {
+        Container.ExecResult execResult = executeSeaTunnelFlinkJob("/assertion/fakesource_to_assert.conf");
+        Assert.assertEquals(0, execResult.getExitCode());
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/src/test/resources/assertion/fakesource_to_assert.conf b/seatunnel-e2e/seatunnel-flink-e2e/src/test/resources/assertion/fakesource_to_assert.conf
new file mode 100644
index 000000000..8965fdafb
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-e2e/src/test/resources/assertion/fakesource_to_assert.conf
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the feature source plugin**
+    FakeSource {
+      result_table_name = "fake"
+      field_name = "name,age"
+    }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+    sql {
+      sql = "select name,age from fake"
+    }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+   AssertSink {
+       rules = 
+           [{
+               field_name = name
+               field_type = string
+               field_value = [
+                   {
+                       rule_type = NOT_NULL
+                   },
+                   {
+                       rule_type = MIN_LENGTH
+                       rule_value = 3
+                   },
+                   {
+                        rule_type = MAX_LENGTH
+                        rule_value = 20
+                   }
+               ]
+           },{
+               field_name = age
+               field_type = int
+               field_value = [
+                   {
+                       rule_type = NOT_NULL
+                   },
+                   {
+                       rule_type = MIN
+                       rule_value = 1
+                   },
+                   {
+                        rule_type = MAX
+                        rule_value = 100
+                   }
+               ]
+           }
+           ]
+   }
+  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
\ No newline at end of file