You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/11 15:39:20 UTC
[incubator-seatunnel] branch dev updated: [Bug][Starter] Fix variables cannot be set in flink mode (#1854)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e35b65a3 [Bug][Starter] Fix variables cannot be set in flink mode (#1854)
e35b65a3 is described below
commit e35b65a3403494948fc55b1787933278c3ac4651
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Wed May 11 23:39:16 2022 +0800
[Bug][Starter] Fix variables cannot be set in flink mode (#1854)
* Fix variables cannot be set in flink mode
---
.../src/main/bin/start-seatunnel-sql.sh | 4 ++
.../core/sql/FlinkEnvParameterParser.java | 45 ++++++++++++++++
.../apache/seatunnel/core/sql/FlinkSqlStarter.java | 2 +-
.../core/sql/FlinkEnvParameterParserTest.java | 37 +++++++++++++
.../src/main/bin/start-seatunnel-flink.sh | 4 ++
.../core/flink/FlinkEnvParameterParser.java | 60 ++++++++++++++++++++++
.../apache/seatunnel/core/flink/FlinkStarter.java | 4 +-
.../core/flink/utils/CommandLineUtils.java | 21 +-------
.../core/flink/FlinkEnvParameterParserTest.java | 47 +++++++++++++++++
.../seatunnel/core/flink/FlinkStarterTest.java | 1 -
.../core/flink/utils/CommandLineUtilsTest.java | 26 +++++-----
11 files changed, 214 insertions(+), 37 deletions(-)
diff --git a/seatunnel-core/seatunnel-core-flink-sql/src/main/bin/start-seatunnel-sql.sh b/seatunnel-core/seatunnel-core-flink-sql/src/main/bin/start-seatunnel-sql.sh
index bc0e13fb..9294a86b 100755
--- a/seatunnel-core/seatunnel-core-flink-sql/src/main/bin/start-seatunnel-sql.sh
+++ b/seatunnel-core/seatunnel-core-flink-sql/src/main/bin/start-seatunnel-sql.sh
@@ -32,6 +32,10 @@ else
args=$@
fi
+ENV_PARAMETERS=$(java -cp ${APP_JAR} org.apache.seatunnel.core.sql.FlinkEnvParameterParser ${args})
+echo "Export JVM_ARGS: ${ENV_PARAMETERS}"
+export JVM_ARGS="${ENV_PARAMETERS}"
+
CMD=$(java -cp ${APP_JAR} org.apache.seatunnel.core.sql.FlinkSqlStarter ${args}) && EXIT_CODE=$? || EXIT_CODE=$?
if [ ${EXIT_CODE} -eq 234 ]; then
# print usage
diff --git a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkEnvParameterParser.java b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkEnvParameterParser.java
new file mode 100644
index 00000000..b14838e8
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkEnvParameterParser.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.sql;
+
+import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
+import org.apache.seatunnel.core.flink.config.FlinkJobType;
+import org.apache.seatunnel.core.flink.utils.CommandLineUtils;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class FlinkEnvParameterParser {
+
+ @SuppressWarnings("checkstyle:RegexpSingleline")
+ public static void main(String[] args) {
+ FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.SQL);
+ List<String> envParameters = getEnvParameters(flinkCommandArgs);
+ System.out.println(String.join(" ", envParameters));
+ }
+
+ static List<String> getEnvParameters(FlinkCommandArgs flinkCommandArgs) {
+ return flinkCommandArgs.getVariables().stream()
+ .filter(StringUtils::isNotBlank)
+ .map(String::trim)
+ .map(parameter -> "-D" + parameter)
+ .collect(Collectors.toList());
+ }
+}
diff --git a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
index 7986716b..313b966c 100644
--- a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
+++ b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
@@ -45,7 +45,7 @@ public class FlinkSqlStarter implements Starter {
@Override
public List<String> buildCommands() throws Exception {
- return CommandLineUtils.buildFlinkCommand(flinkCommandArgs, CLASS_NAME, appJar, FlinkJobType.SQL);
+ return CommandLineUtils.buildFlinkCommand(flinkCommandArgs, CLASS_NAME, appJar);
}
@SuppressWarnings("checkstyle:RegexpSingleline")
diff --git a/seatunnel-core/seatunnel-core-flink-sql/src/test/java/org/apache/seatunnel/core/sql/FlinkEnvParameterParserTest.java b/seatunnel-core/seatunnel-core-flink-sql/src/test/java/org/apache/seatunnel/core/sql/FlinkEnvParameterParserTest.java
new file mode 100644
index 00000000..74ca79b4
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink-sql/src/test/java/org/apache/seatunnel/core/sql/FlinkEnvParameterParserTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.sql;
+
+import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class FlinkEnvParameterParserTest {
+
+ @Test
+ public void getEnvParameters() {
+ FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
+ flinkCommandArgs.setVariables(Arrays.asList("age=18", "name=zhangsan"));
+ List<String> envParameters = FlinkEnvParameterParser.getEnvParameters(flinkCommandArgs);
+ Assert.assertArrayEquals(new String[]{"-Dage=18", "-Dname=zhangsan"}, envParameters.toArray());
+ }
+}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/bin/start-seatunnel-flink.sh b/seatunnel-core/seatunnel-core-flink/src/main/bin/start-seatunnel-flink.sh
index 05fe96b3..f9041d17 100755
--- a/seatunnel-core/seatunnel-core-flink/src/main/bin/start-seatunnel-flink.sh
+++ b/seatunnel-core/seatunnel-core-flink/src/main/bin/start-seatunnel-flink.sh
@@ -32,6 +32,10 @@ else
args=$@
fi
+ENV_PARAMETERS=$(java -cp ${APP_JAR} org.apache.seatunnel.core.flink.FlinkEnvParameterParser ${args})
+echo "Export JVM_ARGS: ${ENV_PARAMETERS}"
+export JVM_ARGS="${ENV_PARAMETERS}"
+
CMD=$(java -cp ${APP_JAR} org.apache.seatunnel.core.flink.FlinkStarter ${args}) && EXIT_CODE=$? || EXIT_CODE=$?
if [ ${EXIT_CODE} -eq 234 ]; then
# print usage
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkEnvParameterParser.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkEnvParameterParser.java
new file mode 100644
index 00000000..7da5437f
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkEnvParameterParser.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.flink;
+
+import org.apache.seatunnel.core.base.config.ConfigParser;
+import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
+import org.apache.seatunnel.core.flink.config.FlinkJobType;
+import org.apache.seatunnel.core.flink.utils.CommandLineUtils;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Used to parse the variables need to set into the environment.
+ */
+public class FlinkEnvParameterParser {
+
+ @SuppressWarnings("checkstyle:RegexpSingleline")
+ public static void main(String[] args) throws FileNotFoundException {
+ FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
+ List<String> envParameters = getEnvParameters(flinkCommandArgs);
+ System.out.println(String.join(" ", envParameters));
+ }
+
+ static List<String> getEnvParameters(FlinkCommandArgs flinkCommandArgs) throws FileNotFoundException {
+ List<String> envParameters = new ArrayList<>();
+ // add variables
+ flinkCommandArgs.getVariables().stream()
+ .filter(StringUtils::isNotBlank)
+ .map(String::trim)
+ .forEach(parameter -> envParameters.add("-D" + parameter));
+ // add env
+ ConfigParser.getConfigEnvValues(flinkCommandArgs.getConfigFile())
+ .entrySet()
+ .stream()
+ .sorted(Comparator.comparing(Map.Entry::getKey))
+ .forEach(entry -> envParameters.add("-D" + entry.getKey() + "=" + entry.getValue()));
+ return envParameters;
+ }
+}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
index 2da33dee..f6ba8071 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
@@ -57,8 +57,8 @@ public class FlinkStarter implements Starter {
}
@Override
- public List<String> buildCommands() throws Exception {
- return CommandLineUtils.buildFlinkCommand(flinkCommandArgs, APP_NAME, appJar, FlinkJobType.JAR);
+ public List<String> buildCommands() {
+ return CommandLineUtils.buildFlinkCommand(flinkCommandArgs, APP_NAME, appJar);
}
}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java
index 4e466e43..51ba3c81 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java
@@ -19,19 +19,14 @@ package org.apache.seatunnel.core.flink.utils;
import static org.apache.seatunnel.core.flink.constant.FlinkConstant.USAGE_EXIT_CODE;
-import org.apache.seatunnel.core.base.config.ConfigParser;
import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.core.flink.config.FlinkJobType;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.UnixStyleUsageFormatter;
-import java.io.FileNotFoundException;
import java.util.ArrayList;
-import java.util.Comparator;
import java.util.List;
-import java.util.Map;
-import java.util.Objects;
public class CommandLineUtils {
@@ -58,7 +53,7 @@ public class CommandLineUtils {
}
- public static List<String> buildFlinkCommand(FlinkCommandArgs flinkCommandArgs, String className, String jarPath, FlinkJobType jobType) throws FileNotFoundException {
+ public static List<String> buildFlinkCommand(FlinkCommandArgs flinkCommandArgs, String className, String jarPath) {
List<String> command = new ArrayList<>();
command.add("${FLINK_HOME}/bin/flink");
command.add(flinkCommandArgs.getRunMode().getMode());
@@ -71,21 +66,7 @@ public class CommandLineUtils {
if (flinkCommandArgs.isCheckConfig()) {
command.add("--check");
}
- // set System properties
- flinkCommandArgs.getVariables().stream()
- .filter(Objects::nonNull)
- .map(String::trim)
- .forEach(variable -> command.add("-D" + variable));
-
- if (jobType.equals(FlinkJobType.JAR)) {
- ConfigParser.getConfigEnvValues(flinkCommandArgs.getConfigFile())
- .entrySet()
- .stream()
- .sorted(Comparator.comparing(Map.Entry::getKey))
- .forEach(entry -> command.add("-D" + entry.getKey() + "=" + entry.getValue()));
- }
return command;
-
}
}
diff --git a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/FlinkEnvParameterParserTest.java b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/FlinkEnvParameterParserTest.java
new file mode 100644
index 00000000..37cf5343
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/FlinkEnvParameterParserTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.flink;
+
+import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.FileNotFoundException;
+import java.util.Arrays;
+import java.util.List;
+
+public class FlinkEnvParameterParserTest {
+
+ static final String APP_CONF_PATH = ClassLoader.getSystemResource("app.conf").getPath();
+
+ @Test
+ public void getEnvParameters() throws FileNotFoundException {
+ FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
+ flinkCommandArgs.setConfigFile(APP_CONF_PATH);
+ flinkCommandArgs.setVariables(Arrays.asList("name=tom", "age=1"));
+
+ List<String> envParameters = FlinkEnvParameterParser.getEnvParameters(flinkCommandArgs);
+ envParameters.sort(String::compareTo);
+
+ String[] expected = {"-Dexecution.parallelism=1",
+ "-Dexecution.checkpoint.interval=10000", "-Dexecution.checkpoint.data-uri=hdfs://localhost:9000/checkpoint", "-Dname=tom", "-Dage=1"};
+ Arrays.sort(expected);
+ Assert.assertArrayEquals(expected, envParameters.toArray());
+ }
+}
diff --git a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/FlinkStarterTest.java b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/FlinkStarterTest.java
index 8662a804..e12a8987 100644
--- a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/FlinkStarterTest.java
+++ b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/FlinkStarterTest.java
@@ -32,7 +32,6 @@ public class FlinkStarterTest {
// since we cannot get the actual jar path, so we just check the command contains the command
Assert.assertTrue(flinkExecuteCommand.contains("--config " + APP_CONF_PATH));
Assert.assertTrue(flinkExecuteCommand.contains("-m yarn-cluster"));
- Assert.assertTrue(flinkExecuteCommand.contains("-Dkey1=value1"));
Assert.assertTrue(flinkExecuteCommand.contains("${FLINK_HOME}/bin/flink run"));
String[] args1 = {"--config", APP_CONF_PATH, "-m", "yarn-cluster", "-i", "key1=value1", "-i", "key2=value2", "--run-mode", "run-application"};
diff --git a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/utils/CommandLineUtilsTest.java b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/utils/CommandLineUtilsTest.java
index 93a19301..96b71371 100644
--- a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/utils/CommandLineUtilsTest.java
+++ b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/utils/CommandLineUtilsTest.java
@@ -54,27 +54,27 @@ public class CommandLineUtilsTest {
String[] args = {"--detached", "-c", APP_CONF_PATH, "-t", "-i", "city=shenyang", "-i", "date=20200202",
"-r", "run-application", "--unkown", "unkown-command"};
FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
- List<String> commands = CommandLineUtils.buildFlinkCommand(flinkCommandArgs, "CLASS_NAME", "/path/to/jar", FlinkJobType.JAR);
+ List<String> commands = CommandLineUtils.buildFlinkCommand(flinkCommandArgs, "CLASS_NAME", "/path/to/jar");
Assert.assertEquals(commands,
Arrays.asList("${FLINK_HOME}/bin/flink", "run-application", "--detached", "--unkown", "unkown-command", "-c",
- "CLASS_NAME", "/path/to/jar", "--config", APP_CONF_PATH, "--check", "-Dcity=shenyang", "-Ddate=20200202",
- "-Dexecution.checkpoint.data-uri=hdfs://localhost:9000/checkpoint", "-Dexecution.checkpoint.interval=10000",
- "-Dexecution.parallelism=1"));
+ "CLASS_NAME", "/path/to/jar", "--config", APP_CONF_PATH, "--check"));
flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
- commands = CommandLineUtils.buildFlinkCommand(flinkCommandArgs, "CLASS_NAME", "/path/to/jar", FlinkJobType.JAR);
+ commands = CommandLineUtils.buildFlinkCommand(flinkCommandArgs, "CLASS_NAME", "/path/to/jar");
Assert.assertEquals(commands,
Arrays.asList("${FLINK_HOME}/bin/flink", "run-application", "--detached", "--unkown", "unkown-command", "-c",
- "CLASS_NAME", "/path/to/jar", "--config", APP_CONF_PATH, "--check", "-Dcity=shenyang", "-Ddate=20200202",
- "-Dexecution.checkpoint.data-uri=hdfs://localhost:9000/checkpoint", "-Dexecution.checkpoint.interval=10000",
- "-Dexecution.parallelism=1"));
+ "CLASS_NAME", "/path/to/jar", "--config", APP_CONF_PATH, "--check"));
String[] args1 = {"--detached", "-c", "app.conf", "-t", "-i", "city=shenyang", "-i", "date=20200202",
"-r", "run-application", "--unkown", "unkown-command"};
- Assert.assertThrows(FileNotFoundException.class, () -> {
- CommandLineUtils.buildFlinkCommand(CommandLineUtils.parseCommandArgs(args1, FlinkJobType.SQL), "CLASS_NAME", "/path/to/jar", FlinkJobType.JAR);
- });
+ List<String> command = CommandLineUtils.buildFlinkCommand(
+ CommandLineUtils.parseCommandArgs(args1, FlinkJobType.SQL), "CLASS_NAME", "/path/to/jar");
+ Assert.assertEquals(
+ Arrays.asList("${FLINK_HOME}/bin/flink", "run-application", "--detached", "--unkown", "unkown-command", "-c",
+ "CLASS_NAME", "/path/to/jar", "--config", "app.conf", "--check"),
+ command);
+
}
@Test
@@ -82,9 +82,9 @@ public class CommandLineUtilsTest {
String[] args = {"--detached", "-c", SQL_CONF_PATH, "-t", "-i", "city=shenyang", "-i", "date=20200202",
"-r", "run-application", "--unkown", "unkown-command"};
FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.SQL);
- List<String> commands = CommandLineUtils.buildFlinkCommand(flinkCommandArgs, "CLASS_NAME", "/path/to/jar", FlinkJobType.SQL);
+ List<String> commands = CommandLineUtils.buildFlinkCommand(flinkCommandArgs, "CLASS_NAME", "/path/to/jar");
Assert.assertEquals(commands,
Arrays.asList("${FLINK_HOME}/bin/flink", "run-application", "--detached", "--unkown", "unkown-command", "-c",
- "CLASS_NAME", "/path/to/jar", "--config", SQL_CONF_PATH, "--check", "-Dcity=shenyang", "-Ddate=20200202"));
+ "CLASS_NAME", "/path/to/jar", "--config", SQL_CONF_PATH, "--check"));
}
}