You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/02/07 09:43:42 UTC

[incubator-seatunnel] branch dev updated: [add exit status code] By adding the wrong status code, determine whether the program is executed correctly and successfully (#1195)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new dbf3772  [add exit status code] By adding the wrong status code, determine whether the program is executed correctly and successfully (#1195)
dbf3772 is described below

commit dbf37723b669f539f409498c412722812af8a656
Author: bigdataf <33...@users.noreply.github.com>
AuthorDate: Mon Feb 7 17:43:35 2022 +0800

    [add exit status code] By adding the wrong status code, determine whether the program is executed correctly and successfully (#1195)
    
    * [SeaTunnel#1191] DorisSink,DruidSink,DruidSource,Elasticsearch,FileSource,FileSink,InfluxDbSink Support for parallel parameters
    
    Co-authored-by: jianmei.gao <ji...@mtime.com>
---
 .../src/main/java/org/apache/seatunnel/env/Execution.java        | 2 +-
 .../org/apache/seatunnel/flink/batch/FlinkBatchExecution.java    | 9 ++++++---
 .../org/apache/seatunnel/flink/stream/FlinkStreamExecution.java  | 5 +++--
 .../src/main/java/org/apache/seatunnel/Seatunnel.java            | 4 ++--
 .../src/main/java/org/apache/seatunnel/SeatunnelFlink.java       | 6 +++++-
 .../src/main/java/org/apache/seatunnel/SeatunnelSpark.java       | 7 +++++--
 .../org/apache/seatunnel/example/flink/LocalFlinkExample.java    | 6 +++++-
 .../org/apache/seatunnel/example/spark/LocalSparkExample.java    | 6 +++++-
 8 files changed, 32 insertions(+), 13 deletions(-)

diff --git a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/Execution.java b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/Execution.java
index 89efee6..f7aec8f 100644
--- a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/Execution.java
+++ b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/Execution.java
@@ -36,5 +36,5 @@ public interface Execution<SR extends BaseSource, TF extends BaseTransform, SK e
      * @param transforms transform plugin list
      * @param sinks      sink plugin list
      */
-    void start(List<SR> sources, List<TF> transforms, List<SK> sinks);
+    void start(List<SR> sources, List<TF> transforms, List<SK> sinks) throws Exception;
 }
diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
index 4de7312..fe199fe 100644
--- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
+++ b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.plugin.Plugin;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
@@ -48,7 +49,7 @@ public class FlinkBatchExecution implements Execution<FlinkBatchSource, FlinkBat
     }
 
     @Override
-    public void start(List<FlinkBatchSource> sources, List<FlinkBatchTransform> transforms, List<FlinkBatchSink> sinks) {
+    public void start(List<FlinkBatchSource> sources, List<FlinkBatchTransform> transforms, List<FlinkBatchSink> sinks) throws Exception {
         List<DataSet> data = new ArrayList<>();
 
         for (FlinkBatchSource source : sources) {
@@ -78,9 +79,11 @@ public class FlinkBatchExecution implements Execution<FlinkBatchSource, FlinkBat
 
         try {
             LOGGER.info("Flink Execution Plan:{}", flinkEnvironment.getBatchEnvironment().getExecutionPlan());
-            flinkEnvironment.getBatchEnvironment().execute(flinkEnvironment.getJobName());
+            JobExecutionResult execute = flinkEnvironment.getBatchEnvironment().execute(flinkEnvironment.getJobName());
+            LOGGER.info(execute.toString());
         } catch (Exception e) {
-            LOGGER.warn("Flink with job name [{}] execute failed", flinkEnvironment.getJobName(), e);
+            LOGGER.warn("Flink with job name [{}] execute failed", flinkEnvironment.getJobName());
+            throw e;
         }
     }
 
diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
index 64687ab..e3a647b 100644
--- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
+++ b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
@@ -48,7 +48,7 @@ public class FlinkStreamExecution implements Execution<FlinkStreamSource, FlinkS
     }
 
     @Override
-    public void start(List<FlinkStreamSource> sources, List<FlinkStreamTransform> transforms, List<FlinkStreamSink> sinks) {
+    public void start(List<FlinkStreamSource> sources, List<FlinkStreamTransform> transforms, List<FlinkStreamSink> sinks) throws Exception {
         List<DataStream> data = new ArrayList<>();
 
         for (FlinkStreamSource source : sources) {
@@ -80,7 +80,8 @@ public class FlinkStreamExecution implements Execution<FlinkStreamSource, FlinkS
             LOGGER.info("Flink Execution Plan:{}", flinkEnvironment.getStreamExecutionEnvironment().getExecutionPlan());
             flinkEnvironment.getStreamExecutionEnvironment().execute(flinkEnvironment.getJobName());
         } catch (Exception e) {
-            LOGGER.warn("Flink with job name [{}] execute failed", flinkEnvironment.getJobName(), e);
+            LOGGER.warn("Flink with job name [{}] execute failed", flinkEnvironment.getJobName());
+            throw e;
         }
     }
 
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/Seatunnel.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/Seatunnel.java
index 9f8021e..2d65577 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/Seatunnel.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/Seatunnel.java
@@ -49,7 +49,7 @@ import java.util.Optional;
 public class Seatunnel {
     private static final Logger LOGGER = LoggerFactory.getLogger(Seatunnel.class);
 
-    public static void run(CommandLineArgs commandLineArgs, Engine engine) {
+    public static void run(CommandLineArgs commandLineArgs, Engine engine) throws Exception{
         Common.setDeployMode(commandLineArgs.getDeployMode());
         String configFilePath = getConfigFilePath(commandLineArgs, engine);
         boolean testConfig = commandLineArgs.isTestConfig();
@@ -89,7 +89,7 @@ public class Seatunnel {
         return path;
     }
 
-    private static void entryPoint(String configFile, Engine engine) {
+    private static void entryPoint(String configFile, Engine engine) throws Exception {
 
         ConfigBuilder configBuilder = new ConfigBuilder(configFile, engine);
         List<BaseSource> sources = configBuilder.createPlugins(PluginType.SOURCE);
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/SeatunnelFlink.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/SeatunnelFlink.java
index 9b6b18b..d2c88f8 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/SeatunnelFlink.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/SeatunnelFlink.java
@@ -26,7 +26,11 @@ public class SeatunnelFlink {
 
     public static void main(String[] args) {
         CommandLineArgs flinkArgs = CommandLineUtils.parseFlinkArgs(args);
-        Seatunnel.run(flinkArgs, FLINK);
+        try {
+            Seatunnel.run(flinkArgs, FLINK);
+        } catch (Exception e) {
+            System.exit(-1);
+        }
     }
 
 }
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/SeatunnelSpark.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/SeatunnelSpark.java
index d9bf0d4..28473c7 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/SeatunnelSpark.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/SeatunnelSpark.java
@@ -26,7 +26,10 @@ public class SeatunnelSpark {
 
     public static void main(String[] args) {
         CommandLineArgs sparkArgs = CommandLineUtils.parseSparkArgs(args);
-        Seatunnel.run(sparkArgs, SPARK);
+        try {
+            Seatunnel.run(sparkArgs, SPARK);
+        } catch (Exception e) {
+            System.exit(-1);
+        }
     }
-
 }
diff --git a/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/LocalFlinkExample.java b/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/LocalFlinkExample.java
index 1cd1f8b..c79a01e 100644
--- a/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/LocalFlinkExample.java
+++ b/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/LocalFlinkExample.java
@@ -29,7 +29,11 @@ public class LocalFlinkExample {
     public static void main(String[] args) {
         String configFile = getTestConfigFile("fake_to_console.conf");
         CommandLineArgs flinkArgs = new CommandLineArgs(configFile, false);
-        Seatunnel.run(flinkArgs, FLINK);
+        try {
+            Seatunnel.run(flinkArgs, FLINK);
+        } catch (Exception e) {
+            System.exit(-1);
+        }
     }
 
     public static String getTestConfigFile(String configFile) {
diff --git a/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkExample.java b/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkExample.java
index b0a0030..49886ae 100644
--- a/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkExample.java
+++ b/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkExample.java
@@ -29,7 +29,11 @@ public class LocalSparkExample {
     public static void main(String[] args) {
         String configFile = getTestConfigFile("spark.batch.conf.template");
         CommandLineArgs sparkArgs = new CommandLineArgs(configFile, false);
-        Seatunnel.run(sparkArgs, SPARK);
+        try {
+            Seatunnel.run(sparkArgs, SPARK);
+        } catch (Exception e) {
+            System.exit(-1);
+        }
     }
 
     public static String getTestConfigFile(String configFile) {