You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/04/18 09:59:10 UTC

[incubator-seatunnel] branch 2.1.1-prepare updated: [Bug][Connector] Fixed ConsoleSink submit job twice. (#1710)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch 2.1.1-prepare
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/2.1.1-prepare by this push:
     new ec62ef0e [Bug][Connector] Fixed ConsoleSink submit job twice. (#1710)
ec62ef0e is described below

commit ec62ef0edcd0997b7bd53c0bd80ecce370d8c09e
Author: TrickyZerg <32...@users.noreply.github.com>
AuthorDate: Mon Apr 18 17:57:01 2022 +0800

    [Bug][Connector] Fixed ConsoleSink submit job twice. (#1710)
    
    * fix ConsoleSink submit job twice
---
 .../seatunnel/flink/batch/FlinkBatchExecution.java    | 19 ++++++++++++-------
 .../seatunnel/flink/console/sink/ConsoleSink.java     |  1 -
 2 files changed, 12 insertions(+), 8 deletions(-)

diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
index 3da634fb..dba81476 100644
--- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
+++ b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
@@ -72,13 +72,15 @@ public class FlinkBatchExecution implements Execution<FlinkBatchSource, FlinkBat
             sink.outputBatch(flinkEnvironment, dataSet);
         }
 
-        try {
-            LOGGER.info("Flink Execution Plan:{}", flinkEnvironment.getBatchEnvironment().getExecutionPlan());
-            JobExecutionResult execute = flinkEnvironment.getBatchEnvironment().execute(flinkEnvironment.getJobName());
-            LOGGER.info(execute.toString());
-        } catch (Exception e) {
-            LOGGER.warn("Flink with job name [{}] execute failed", flinkEnvironment.getJobName());
-            throw e;
+        if (whetherExecute(sinks)) {
+            try {
+                LOGGER.info("Flink Execution Plan:{}", flinkEnvironment.getBatchEnvironment().getExecutionPlan());
+                JobExecutionResult execute = flinkEnvironment.getBatchEnvironment().execute(flinkEnvironment.getJobName());
+                LOGGER.info(execute.toString());
+            } catch (Exception e) {
+                LOGGER.warn("Flink with job name [{}] execute failed", flinkEnvironment.getJobName());
+                throw e;
+            }
         }
     }
 
@@ -116,4 +118,7 @@ public class FlinkBatchExecution implements Execution<FlinkBatchSource, FlinkBat
         return config;
     }
 
+    private boolean whetherExecute(List<FlinkBatchSink> sinks) {
+        return sinks.stream().anyMatch(s -> !"ConsoleSink".equals(s.getPluginName()));
+    }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/console/sink/ConsoleSink.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/console/sink/ConsoleSink.java
index 27318143..a7394c12 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/console/sink/ConsoleSink.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/console/sink/ConsoleSink.java
@@ -48,7 +48,6 @@ public class ConsoleSink extends RichOutputFormat<Row> implements FlinkBatchSink
         } catch (Exception e) {
             LOGGER.error("Failed to print result! ", e);
         }
-        rowDataSet.output(this);
     }
 
     @Override