You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/04/18 09:59:10 UTC
[incubator-seatunnel] branch 2.1.1-prepare updated: [Bug][Connector] Fixed ConsoleSink submit job twice. (#1710)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch 2.1.1-prepare
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/2.1.1-prepare by this push:
new ec62ef0e [Bug][Connector] Fixed ConsoleSink submit job twice. (#1710)
ec62ef0e is described below
commit ec62ef0edcd0997b7bd53c0bd80ecce370d8c09e
Author: TrickyZerg <32...@users.noreply.github.com>
AuthorDate: Mon Apr 18 17:57:01 2022 +0800
[Bug][Connector] Fixed ConsoleSink submit job twice. (#1710)
* fix ConsoleSink submit job twice
---
.../seatunnel/flink/batch/FlinkBatchExecution.java | 19 ++++++++++++-------
.../seatunnel/flink/console/sink/ConsoleSink.java | 1 -
2 files changed, 12 insertions(+), 8 deletions(-)
diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
index 3da634fb..dba81476 100644
--- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
+++ b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
@@ -72,13 +72,15 @@ public class FlinkBatchExecution implements Execution<FlinkBatchSource, FlinkBat
sink.outputBatch(flinkEnvironment, dataSet);
}
- try {
- LOGGER.info("Flink Execution Plan:{}", flinkEnvironment.getBatchEnvironment().getExecutionPlan());
- JobExecutionResult execute = flinkEnvironment.getBatchEnvironment().execute(flinkEnvironment.getJobName());
- LOGGER.info(execute.toString());
- } catch (Exception e) {
- LOGGER.warn("Flink with job name [{}] execute failed", flinkEnvironment.getJobName());
- throw e;
+ if (whetherExecute(sinks)) {
+ try {
+ LOGGER.info("Flink Execution Plan:{}", flinkEnvironment.getBatchEnvironment().getExecutionPlan());
+ JobExecutionResult execute = flinkEnvironment.getBatchEnvironment().execute(flinkEnvironment.getJobName());
+ LOGGER.info(execute.toString());
+ } catch (Exception e) {
+ LOGGER.warn("Flink with job name [{}] execute failed", flinkEnvironment.getJobName());
+ throw e;
+ }
}
}
@@ -116,4 +118,7 @@ public class FlinkBatchExecution implements Execution<FlinkBatchSource, FlinkBat
return config;
}
+ private boolean whetherExecute(List<FlinkBatchSink> sinks) {
+ return sinks.stream().anyMatch(s -> !"ConsoleSink".equals(s.getPluginName()));
+ }
}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/console/sink/ConsoleSink.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/console/sink/ConsoleSink.java
index 27318143..a7394c12 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/console/sink/ConsoleSink.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/console/sink/ConsoleSink.java
@@ -48,7 +48,6 @@ public class ConsoleSink extends RichOutputFormat<Row> implements FlinkBatchSink
} catch (Exception e) {
LOGGER.error("Failed to print result! ", e);
}
- rowDataSet.output(this);
}
@Override