You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/06/07 08:52:22 UTC

[incubator-seatunnel] branch api-draft updated: [api-draft#1990][Common] The DeployMode code cleanup (#1991)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new dc81c316 [api-draft#1990][Common] The DeployMode code cleanup (#1991)
dc81c316 is described below

commit dc81c3167d4b5ac34a3ac08e145fe0f78d938cae
Author: Kerwin <37...@users.noreply.github.com>
AuthorDate: Tue Jun 7 16:52:16 2022 +0800

    [api-draft#1990][Common] The DeployMode code cleanup (#1991)
    
    * deploy mode code clean
    
    * fix code style
    
    * fix query app root dir path
---
 .../org/apache/seatunnel/common/config/Common.java | 30 +++++-----------------
 .../apache/seatunnel/common/config/CommonTest.java |  2 +-
 .../core/base/command/BaseTaskExecuteCommand.java  |  5 ++--
 .../apache/seatunnel/core/sql/FlinkSqlStarter.java |  2 +-
 .../apache/seatunnel/core/flink/FlinkStarter.java  |  2 +-
 .../core/flink/command/FlinkCommandBuilder.java    |  6 +----
 .../apache/seatunnel/core/spark/SparkStarter.java  |  4 +--
 .../core/spark/command/SparkCommandBuilder.java    |  5 +---
 .../seatunnel/core/starter/flink/FlinkStarter.java |  2 +-
 .../starter/flink/command/FlinkCommandBuilder.java |  5 +---
 .../seatunnel/core/starter/spark/SparkStarter.java |  4 +--
 .../starter/spark/command/SparkCommandBuilder.java |  5 +---
 12 files changed, 21 insertions(+), 51 deletions(-)

diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
index dd74c022..d48c190e 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
@@ -21,10 +21,6 @@ import java.io.File;
 import java.net.URISyntaxException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
 
 public class Common {
 
@@ -37,28 +33,16 @@ public class Common {
      */
     public static final int COLLECTION_SIZE = 16;
 
-    private static final List<String> ALLOWED_MODES = Arrays.stream(DeployMode.values())
-        .map(DeployMode::getName).collect(Collectors.toList());
-
-    private static Optional<String> MODE = Optional.empty();
-
-    public static boolean isModeAllowed(String mode) {
-        return ALLOWED_MODES.contains(mode.toLowerCase());
-    }
+    private static DeployMode MODE;
 
     /**
      * Set mode. return false in case of failure
      */
-    public static Boolean setDeployMode(String m) {
-        if (isModeAllowed(m)) {
-            MODE = Optional.of(m);
-            return true;
-        } else {
-            return false;
-        }
+    public static void setDeployMode(DeployMode mode) {
+        MODE = mode;
     }
 
-    public static Optional<String> getDeployMode() {
+    public static DeployMode getDeployMode() {
         return MODE;
     }
 
@@ -70,7 +54,7 @@ public class Common {
      * When running seatunnel in --master yarn or --master mesos, you can put plugins related files in plugins dir.
      */
     public static Path appRootDir() {
-        if (MODE.equals(Optional.of(DeployMode.CLIENT.getName()))) {
+        if (DeployMode.CLIENT == MODE) {
             try {
                 String path = Common.class.getProtectionDomain().getCodeSource().getLocation().toURI().getPath();
                 path = new File(path).getPath();
@@ -78,10 +62,10 @@ public class Common {
             } catch (URISyntaxException e) {
                 throw new RuntimeException(e);
             }
-        } else if (MODE.equals(Optional.of(DeployMode.CLUSTER.getName()))) {
+        } else if (DeployMode.CLUSTER == MODE) {
             return Paths.get("");
         } else {
-            throw new IllegalStateException("MODE not support : " + MODE.orElse("null"));
+            throw new IllegalStateException("deploy mode not support : " + MODE);
         }
     }
 
diff --git a/seatunnel-common/src/test/java/org/apache/seatunnel/common/config/CommonTest.java b/seatunnel-common/src/test/java/org/apache/seatunnel/common/config/CommonTest.java
index ee715c10..fed8b8f6 100644
--- a/seatunnel-common/src/test/java/org/apache/seatunnel/common/config/CommonTest.java
+++ b/seatunnel-common/src/test/java/org/apache/seatunnel/common/config/CommonTest.java
@@ -26,7 +26,7 @@ import java.io.File;
 public class CommonTest {
 
     static {
-        Common.setDeployMode("client");
+        Common.setDeployMode(DeployMode.CLIENT);
     }
 
     @Test
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/BaseTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/BaseTaskExecuteCommand.java
index bab835e2..b14b6fa9 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/BaseTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/BaseTaskExecuteCommand.java
@@ -33,7 +33,6 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.util.List;
 import java.util.Objects;
-import java.util.Optional;
 
 /**
  * Base task execute command.
@@ -126,8 +125,8 @@ public abstract class BaseTaskExecuteCommand<T extends AbstractCommandArgs, E ex
     }
 
     private void deployModeCheck() {
-        final Optional<String> mode = Common.getDeployMode();
-        if (mode.isPresent() && DeployMode.CLUSTER.getName().equals(mode.get())) {
+        final DeployMode mode = Common.getDeployMode();
+        if (DeployMode.CLUSTER == mode) {
 
             LOGGER.info("preparing cluster mode work dir files...");
             File workDir = new File(".");
diff --git a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
index 1e5f52a4..e8795ecc 100644
--- a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
+++ b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
@@ -39,7 +39,7 @@ public class FlinkSqlStarter implements Starter {
     FlinkSqlStarter(String[] args) {
         this.flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.SQL);
         // set the deployment mode, used to get the job jar path.
-        Common.setDeployMode(flinkCommandArgs.getDeployMode().getName());
+        Common.setDeployMode(flinkCommandArgs.getDeployMode());
         this.appJar = Common.appLibDir().resolve(APP_JAR_NAME).toString();
     }
 
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
index 6fe18a49..20ea8866 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
@@ -46,7 +46,7 @@ public class FlinkStarter implements Starter {
     FlinkStarter(String[] args) {
         this.flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
         // set the deployment mode, used to get the job jar path.
-        Common.setDeployMode(flinkCommandArgs.getDeployMode().getName());
+        Common.setDeployMode(flinkCommandArgs.getDeployMode());
         this.appJar = Common.appLibDir().resolve(APP_JAR_NAME).toString();
     }
 
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkCommandBuilder.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkCommandBuilder.java
index 5aef9f72..72368960 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkCommandBuilder.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkCommandBuilder.java
@@ -26,11 +26,7 @@ public class FlinkCommandBuilder implements CommandBuilder<FlinkCommandArgs> {
 
     @Override
     public Command<FlinkCommandArgs> buildCommand(FlinkCommandArgs commandArgs) {
-        if (!Common.setDeployMode(commandArgs.getDeployMode().getName())) {
-            throw new IllegalArgumentException(
-                String.format("Deploy mode: %s is Illegal", commandArgs.getDeployMode()));
-        }
-
+        Common.setDeployMode(commandArgs.getDeployMode());
         return commandArgs.isCheckConfig() ? new FlinkApiConfValidateCommand(commandArgs)
             : new FlinkApiTaskExecuteCommand(commandArgs);
     }
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
index 4675cb2e..02678f76 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
@@ -143,7 +143,7 @@ public class SparkStarter implements Starter {
     @Override
     public List<String> buildCommands() throws IOException {
         setSparkConf();
-        Common.setDeployMode(commandArgs.getDeployMode().getName());
+        Common.setDeployMode(commandArgs.getDeployMode());
         this.jars.addAll(getPluginsJarDependencies());
         this.jars.addAll(listJars(Common.appLibDir()));
         this.jars.addAll(getConnectorJarDependencies());
@@ -395,7 +395,7 @@ public class SparkStarter implements Starter {
 
         @Override
         public List<String> buildCommands() throws IOException {
-            Common.setDeployMode(commandArgs.getDeployMode().getName());
+            Common.setDeployMode(commandArgs.getDeployMode());
             Path pluginTarball = Common.pluginTarball();
             if (Files.notExists(pluginTarball)) {
                 CompressionUtils.tarGzip(Common.pluginRootDir(), pluginTarball);
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkCommandBuilder.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkCommandBuilder.java
index 4ea8ed1e..bb8739e6 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkCommandBuilder.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkCommandBuilder.java
@@ -26,10 +26,7 @@ public class SparkCommandBuilder implements CommandBuilder<SparkCommandArgs> {
 
     @Override
     public Command<SparkCommandArgs> buildCommand(SparkCommandArgs commandArgs) {
-        if (!Common.setDeployMode(commandArgs.getDeployMode().getName())) {
-            throw new IllegalArgumentException(
-                String.format("Deploy mode: %s is Illegal", commandArgs.getDeployMode()));
-        }
+        Common.setDeployMode(commandArgs.getDeployMode());
         return commandArgs.isCheckConfig() ? new SparkConfValidateCommand(commandArgs)
             : new SparkTaskExecuteCommand(commandArgs);
     }
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
index 367e0a1a..03a5bbfb 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
@@ -46,7 +46,7 @@ public class FlinkStarter implements Starter {
     FlinkStarter(String[] args) {
         this.flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
         // set the deployment mode, used to get the job jar path.
-        Common.setDeployMode(flinkCommandArgs.getDeployMode().getName());
+        Common.setDeployMode(flinkCommandArgs.getDeployMode());
         this.appJar = Common.appLibDir().resolve(APP_JAR_NAME).toString();
     }
 
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkCommandBuilder.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkCommandBuilder.java
index 437ce5d8..1ec76ff4 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkCommandBuilder.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkCommandBuilder.java
@@ -26,10 +26,7 @@ public class FlinkCommandBuilder implements CommandBuilder<FlinkCommandArgs> {
 
     @Override
     public Command<FlinkCommandArgs> buildCommand(FlinkCommandArgs commandArgs) {
-        if (Boolean.FALSE.equals(Common.setDeployMode(commandArgs.getDeployMode().getName()))) {
-            throw new IllegalArgumentException(
-                String.format("Deploy mode: %s is Illegal", commandArgs.getDeployMode()));
-        }
+        Common.setDeployMode(commandArgs.getDeployMode());
         return commandArgs.isCheckConfig() ? new FlinkApiConfValidateCommand(commandArgs)
             : new FlinkApiTaskExecuteCommand(commandArgs);
     }
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
index 84ad01aa..6cc8ef1a 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
@@ -148,7 +148,7 @@ public class SparkStarter implements Starter {
     @Override
     public List<String> buildCommands() throws IOException {
         setSparkConf();
-        Common.setDeployMode(commandArgs.getDeployMode().getName());
+        Common.setDeployMode(commandArgs.getDeployMode());
         this.jars.addAll(getPluginsJarDependencies());
         this.jars.addAll(listJars(Common.appLibDir()));
         this.jars.addAll(getConnectorJarDependencies());
@@ -416,7 +416,7 @@ public class SparkStarter implements Starter {
 
         @Override
         public List<String> buildCommands() throws IOException {
-            Common.setDeployMode(commandArgs.getDeployMode().getName());
+            Common.setDeployMode(commandArgs.getDeployMode());
             Path pluginTarball = Common.pluginTarball();
             if (Files.notExists(pluginTarball)) {
                 CompressionUtils.tarGzip(Common.pluginRootDir(), pluginTarball);
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkCommandBuilder.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkCommandBuilder.java
index 65d6e729..9f6a9ec5 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkCommandBuilder.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkCommandBuilder.java
@@ -26,10 +26,7 @@ public class SparkCommandBuilder implements CommandBuilder<SparkCommandArgs> {
 
     @Override
     public Command<SparkCommandArgs> buildCommand(SparkCommandArgs commandArgs) {
-        if (Boolean.FALSE.equals(Common.setDeployMode(commandArgs.getDeployMode().getName()))) {
-            throw new IllegalArgumentException(
-                    String.format("Deploy mode: %s is Illegal", commandArgs.getDeployMode()));
-        }
+        Common.setDeployMode(commandArgs.getDeployMode());
         return new SeaTunnelApiCommandBuilder().buildCommand(commandArgs);
     }