You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/07/01 02:11:05 UTC
[incubator-seatunnel] branch dev updated: [Feature][Connector] add IT for Assert Sink in e2e module (#2036)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new cd1241d02 [Feature][Connector] add IT for Assert Sink in e2e module (#2036)
cd1241d02 is described below
commit cd1241d0234c9f4d2b7e0cd38ab3f7bc46f166b5
Author: Jared Li <lh...@gmail.com>
AuthorDate: Fri Jul 1 10:11:00 2022 +0800
[Feature][Connector] add IT for Assert Sink in e2e module (#2036)
* [Feature][Connector] add IT for Assert Sink in e2e module
* [Feature][Connector] add IT for Assert Sink in e2e module
add licence
* Update fakesource_to_assert.conf
amend assert config to fit FakeSource config
* [Feature][Connector] amend method name
* [connector][assert] there must be a sink in flink progress. so I add print()
* [connector][assert] use collect instead print in DataSet
* [connector][assert] Fixed AssertSink submit job twice
---
.../seatunnel/flink/batch/FlinkBatchExecution.java | 2 +-
seatunnel-connectors/plugin-mapping.properties | 1 +
.../seatunnel/flink/assertion/sink/AssertSink.java | 24 +++---
.../e2e/flink/assertion/FakeSourceToAssertIT.java | 35 +++++++++
.../resources/assertion/fakesource_to_assert.conf | 88 ++++++++++++++++++++++
5 files changed, 139 insertions(+), 11 deletions(-)
diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
index 5ea03d315..a0ff40894 100644
--- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
+++ b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
@@ -118,6 +118,6 @@ public class FlinkBatchExecution implements Execution<FlinkBatchSource, FlinkBat
}
private boolean whetherExecute(List<FlinkBatchSink> sinks) {
- return sinks.stream().anyMatch(s -> !"ConsoleSink".equals(s.getPluginName()));
+ return sinks.stream().noneMatch(s -> "ConsoleSink".equals(s.getPluginName()) || "AssertSink".equals(s.getPluginName()));
}
}
diff --git a/seatunnel-connectors/plugin-mapping.properties b/seatunnel-connectors/plugin-mapping.properties
index 619911fe9..f4bfbf954 100644
--- a/seatunnel-connectors/plugin-mapping.properties
+++ b/seatunnel-connectors/plugin-mapping.properties
@@ -41,6 +41,7 @@ flink.sink.FileSink = seatunnel-connector-flink-file
flink.sink.InfluxDbSink = seatunnel-connector-flink-influxdb
flink.sink.JdbcSink = seatunnel-connector-flink-jdbc
flink.sink.Kafka = seatunnel-connector-flink-kafka
+flink.sink.AssertSink = seatunnel-connector-flink-assert
# Spark Source
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/sink/AssertSink.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/sink/AssertSink.java
index 7de846245..a1087dcb2 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/sink/AssertSink.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-assert/src/main/java/org/apache/seatunnel/flink/assertion/sink/AssertSink.java
@@ -29,6 +29,7 @@ import org.apache.seatunnel.flink.stream.FlinkStreamSink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import com.google.auto.service.AutoService;
+import lombok.SneakyThrows;
import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -37,11 +38,11 @@ import org.apache.flink.types.Row;
import java.util.List;
/**
- * A flink sink plugin which can assert illegal data by user defined rules
- * Refer to https://github.com/apache/incubator-seatunnel/issues/1912
+ * A flink sink plugin which can assert illegal data by user defined rules Refer to https://github.com/apache/incubator-seatunnel/issues/1912
*/
@AutoService(BaseFlinkSink.class)
public class AssertSink implements FlinkBatchSink, FlinkStreamSink {
+
//The assertion executor
private static final AssertExecutor ASSERT_EXECUTOR = new AssertExecutor();
//User defined rules used to assert illegal data
@@ -50,16 +51,19 @@ public class AssertSink implements FlinkBatchSink, FlinkStreamSink {
private Config config;
private List<? extends Config> configList;
+ @SneakyThrows
@Override
public void outputBatch(FlinkEnvironment env, DataSet<Row> inDataSet) {
- inDataSet.map(row -> {
- ASSERT_EXECUTOR
- .fail(row, assertFieldRules)
- .ifPresent(failRule -> {
- throw new IllegalStateException("row :" + row + " fail rule: " + failRule);
- });
- return null;
- });
+ try {
+ inDataSet.collect().forEach(row ->
+ ASSERT_EXECUTOR
+ .fail(row, assertFieldRules)
+ .ifPresent(failRule -> {
+ throw new IllegalStateException("row :" + row + " fail rule: " + failRule);
+ }));
+ } catch (Exception ex) {
+ throw new RuntimeException("AssertSink execute failed", ex);
+ }
}
@Override
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/assertion/FakeSourceToAssertIT.java b/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/assertion/FakeSourceToAssertIT.java
new file mode 100644
index 000000000..cc67b836a
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/assertion/FakeSourceToAssertIT.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.assertion;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+public class FakeSourceToAssertIT extends FlinkContainer {
+
+ @Test
+ public void testFakeSourceToAssertSink() throws IOException, InterruptedException {
+ Container.ExecResult execResult = executeSeaTunnelFlinkJob("/assertion/fakesource_to_assert.conf");
+ Assert.assertEquals(0, execResult.getExitCode());
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/src/test/resources/assertion/fakesource_to_assert.conf b/seatunnel-e2e/seatunnel-flink-e2e/src/test/resources/assertion/fakesource_to_assert.conf
new file mode 100644
index 000000000..8965fdafb
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-e2e/src/test/resources/assertion/fakesource_to_assert.conf
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ FakeSource {
+ result_table_name = "fake"
+ field_name = "name,age"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+ sql {
+ sql = "select name,age from fake"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+ AssertSink {
+ rules =
+ [{
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN_LENGTH
+ rule_value = 3
+ },
+ {
+ rule_type = MAX_LENGTH
+ rule_value = 20
+ }
+ ]
+ },{
+ field_name = age
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN
+ rule_value = 1
+ },
+ {
+ rule_type = MAX
+ rule_value = 100
+ }
+ ]
+ }
+ ]
+ }
+ # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
\ No newline at end of file