You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/06/07 08:52:22 UTC
[incubator-seatunnel] branch api-draft updated: [api-draft#1990][Common] The DeployMode code cleanup (#1991)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new dc81c316 [api-draft#1990][Common] The DeployMode code cleanup (#1991)
dc81c316 is described below
commit dc81c3167d4b5ac34a3ac08e145fe0f78d938cae
Author: Kerwin <37...@users.noreply.github.com>
AuthorDate: Tue Jun 7 16:52:16 2022 +0800
[api-draft#1990][Common] The DeployMode code cleanup (#1991)
* deploy mode code clean
* fix code style
* fix query app root dir path
---
.../org/apache/seatunnel/common/config/Common.java | 30 +++++-----------------
.../apache/seatunnel/common/config/CommonTest.java | 2 +-
.../core/base/command/BaseTaskExecuteCommand.java | 5 ++--
.../apache/seatunnel/core/sql/FlinkSqlStarter.java | 2 +-
.../apache/seatunnel/core/flink/FlinkStarter.java | 2 +-
.../core/flink/command/FlinkCommandBuilder.java | 6 +----
.../apache/seatunnel/core/spark/SparkStarter.java | 4 +--
.../core/spark/command/SparkCommandBuilder.java | 5 +---
.../seatunnel/core/starter/flink/FlinkStarter.java | 2 +-
.../starter/flink/command/FlinkCommandBuilder.java | 5 +---
.../seatunnel/core/starter/spark/SparkStarter.java | 4 +--
.../starter/spark/command/SparkCommandBuilder.java | 5 +---
12 files changed, 21 insertions(+), 51 deletions(-)
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
index dd74c022..d48c190e 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
@@ -21,10 +21,6 @@ import java.io.File;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.nio.file.Paths;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
public class Common {
@@ -37,28 +33,16 @@ public class Common {
*/
public static final int COLLECTION_SIZE = 16;
- private static final List<String> ALLOWED_MODES = Arrays.stream(DeployMode.values())
- .map(DeployMode::getName).collect(Collectors.toList());
-
- private static Optional<String> MODE = Optional.empty();
-
- public static boolean isModeAllowed(String mode) {
- return ALLOWED_MODES.contains(mode.toLowerCase());
- }
+ private static DeployMode MODE;
/**
* Set mode. return false in case of failure
*/
- public static Boolean setDeployMode(String m) {
- if (isModeAllowed(m)) {
- MODE = Optional.of(m);
- return true;
- } else {
- return false;
- }
+ public static void setDeployMode(DeployMode mode) {
+ MODE = mode;
}
- public static Optional<String> getDeployMode() {
+ public static DeployMode getDeployMode() {
return MODE;
}
@@ -70,7 +54,7 @@ public class Common {
* When running seatunnel in --master yarn or --master mesos, you can put plugins related files in plugins dir.
*/
public static Path appRootDir() {
- if (MODE.equals(Optional.of(DeployMode.CLIENT.getName()))) {
+ if (DeployMode.CLIENT == MODE) {
try {
String path = Common.class.getProtectionDomain().getCodeSource().getLocation().toURI().getPath();
path = new File(path).getPath();
@@ -78,10 +62,10 @@ public class Common {
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
- } else if (MODE.equals(Optional.of(DeployMode.CLUSTER.getName()))) {
+ } else if (DeployMode.CLUSTER == MODE) {
return Paths.get("");
} else {
- throw new IllegalStateException("MODE not support : " + MODE.orElse("null"));
+ throw new IllegalStateException("deploy mode not support : " + MODE);
}
}
diff --git a/seatunnel-common/src/test/java/org/apache/seatunnel/common/config/CommonTest.java b/seatunnel-common/src/test/java/org/apache/seatunnel/common/config/CommonTest.java
index ee715c10..fed8b8f6 100644
--- a/seatunnel-common/src/test/java/org/apache/seatunnel/common/config/CommonTest.java
+++ b/seatunnel-common/src/test/java/org/apache/seatunnel/common/config/CommonTest.java
@@ -26,7 +26,7 @@ import java.io.File;
public class CommonTest {
static {
- Common.setDeployMode("client");
+ Common.setDeployMode(DeployMode.CLIENT);
}
@Test
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/BaseTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/BaseTaskExecuteCommand.java
index bab835e2..b14b6fa9 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/BaseTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/BaseTaskExecuteCommand.java
@@ -33,7 +33,6 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.List;
import java.util.Objects;
-import java.util.Optional;
/**
* Base task execute command.
@@ -126,8 +125,8 @@ public abstract class BaseTaskExecuteCommand<T extends AbstractCommandArgs, E ex
}
private void deployModeCheck() {
- final Optional<String> mode = Common.getDeployMode();
- if (mode.isPresent() && DeployMode.CLUSTER.getName().equals(mode.get())) {
+ final DeployMode mode = Common.getDeployMode();
+ if (DeployMode.CLUSTER == mode) {
LOGGER.info("preparing cluster mode work dir files...");
File workDir = new File(".");
diff --git a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
index 1e5f52a4..e8795ecc 100644
--- a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
+++ b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
@@ -39,7 +39,7 @@ public class FlinkSqlStarter implements Starter {
FlinkSqlStarter(String[] args) {
this.flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.SQL);
// set the deployment mode, used to get the job jar path.
- Common.setDeployMode(flinkCommandArgs.getDeployMode().getName());
+ Common.setDeployMode(flinkCommandArgs.getDeployMode());
this.appJar = Common.appLibDir().resolve(APP_JAR_NAME).toString();
}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
index 6fe18a49..20ea8866 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
@@ -46,7 +46,7 @@ public class FlinkStarter implements Starter {
FlinkStarter(String[] args) {
this.flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
// set the deployment mode, used to get the job jar path.
- Common.setDeployMode(flinkCommandArgs.getDeployMode().getName());
+ Common.setDeployMode(flinkCommandArgs.getDeployMode());
this.appJar = Common.appLibDir().resolve(APP_JAR_NAME).toString();
}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkCommandBuilder.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkCommandBuilder.java
index 5aef9f72..72368960 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkCommandBuilder.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkCommandBuilder.java
@@ -26,11 +26,7 @@ public class FlinkCommandBuilder implements CommandBuilder<FlinkCommandArgs> {
@Override
public Command<FlinkCommandArgs> buildCommand(FlinkCommandArgs commandArgs) {
- if (!Common.setDeployMode(commandArgs.getDeployMode().getName())) {
- throw new IllegalArgumentException(
- String.format("Deploy mode: %s is Illegal", commandArgs.getDeployMode()));
- }
-
+ Common.setDeployMode(commandArgs.getDeployMode());
return commandArgs.isCheckConfig() ? new FlinkApiConfValidateCommand(commandArgs)
: new FlinkApiTaskExecuteCommand(commandArgs);
}
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
index 4675cb2e..02678f76 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
@@ -143,7 +143,7 @@ public class SparkStarter implements Starter {
@Override
public List<String> buildCommands() throws IOException {
setSparkConf();
- Common.setDeployMode(commandArgs.getDeployMode().getName());
+ Common.setDeployMode(commandArgs.getDeployMode());
this.jars.addAll(getPluginsJarDependencies());
this.jars.addAll(listJars(Common.appLibDir()));
this.jars.addAll(getConnectorJarDependencies());
@@ -395,7 +395,7 @@ public class SparkStarter implements Starter {
@Override
public List<String> buildCommands() throws IOException {
- Common.setDeployMode(commandArgs.getDeployMode().getName());
+ Common.setDeployMode(commandArgs.getDeployMode());
Path pluginTarball = Common.pluginTarball();
if (Files.notExists(pluginTarball)) {
CompressionUtils.tarGzip(Common.pluginRootDir(), pluginTarball);
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkCommandBuilder.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkCommandBuilder.java
index 4ea8ed1e..bb8739e6 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkCommandBuilder.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkCommandBuilder.java
@@ -26,10 +26,7 @@ public class SparkCommandBuilder implements CommandBuilder<SparkCommandArgs> {
@Override
public Command<SparkCommandArgs> buildCommand(SparkCommandArgs commandArgs) {
- if (!Common.setDeployMode(commandArgs.getDeployMode().getName())) {
- throw new IllegalArgumentException(
- String.format("Deploy mode: %s is Illegal", commandArgs.getDeployMode()));
- }
+ Common.setDeployMode(commandArgs.getDeployMode());
return commandArgs.isCheckConfig() ? new SparkConfValidateCommand(commandArgs)
: new SparkTaskExecuteCommand(commandArgs);
}
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
index 367e0a1a..03a5bbfb 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
@@ -46,7 +46,7 @@ public class FlinkStarter implements Starter {
FlinkStarter(String[] args) {
this.flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
// set the deployment mode, used to get the job jar path.
- Common.setDeployMode(flinkCommandArgs.getDeployMode().getName());
+ Common.setDeployMode(flinkCommandArgs.getDeployMode());
this.appJar = Common.appLibDir().resolve(APP_JAR_NAME).toString();
}
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkCommandBuilder.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkCommandBuilder.java
index 437ce5d8..1ec76ff4 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkCommandBuilder.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkCommandBuilder.java
@@ -26,10 +26,7 @@ public class FlinkCommandBuilder implements CommandBuilder<FlinkCommandArgs> {
@Override
public Command<FlinkCommandArgs> buildCommand(FlinkCommandArgs commandArgs) {
- if (Boolean.FALSE.equals(Common.setDeployMode(commandArgs.getDeployMode().getName()))) {
- throw new IllegalArgumentException(
- String.format("Deploy mode: %s is Illegal", commandArgs.getDeployMode()));
- }
+ Common.setDeployMode(commandArgs.getDeployMode());
return commandArgs.isCheckConfig() ? new FlinkApiConfValidateCommand(commandArgs)
: new FlinkApiTaskExecuteCommand(commandArgs);
}
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
index 84ad01aa..6cc8ef1a 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
@@ -148,7 +148,7 @@ public class SparkStarter implements Starter {
@Override
public List<String> buildCommands() throws IOException {
setSparkConf();
- Common.setDeployMode(commandArgs.getDeployMode().getName());
+ Common.setDeployMode(commandArgs.getDeployMode());
this.jars.addAll(getPluginsJarDependencies());
this.jars.addAll(listJars(Common.appLibDir()));
this.jars.addAll(getConnectorJarDependencies());
@@ -416,7 +416,7 @@ public class SparkStarter implements Starter {
@Override
public List<String> buildCommands() throws IOException {
- Common.setDeployMode(commandArgs.getDeployMode().getName());
+ Common.setDeployMode(commandArgs.getDeployMode());
Path pluginTarball = Common.pluginTarball();
if (Files.notExists(pluginTarball)) {
CompressionUtils.tarGzip(Common.pluginRootDir(), pluginTarball);
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkCommandBuilder.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkCommandBuilder.java
index 65d6e729..9f6a9ec5 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkCommandBuilder.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkCommandBuilder.java
@@ -26,10 +26,7 @@ public class SparkCommandBuilder implements CommandBuilder<SparkCommandArgs> {
@Override
public Command<SparkCommandArgs> buildCommand(SparkCommandArgs commandArgs) {
- if (Boolean.FALSE.equals(Common.setDeployMode(commandArgs.getDeployMode().getName()))) {
- throw new IllegalArgumentException(
- String.format("Deploy mode: %s is Illegal", commandArgs.getDeployMode()));
- }
+ Common.setDeployMode(commandArgs.getDeployMode());
return new SeaTunnelApiCommandBuilder().buildCommand(commandArgs);
}