You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/11 15:39:20 UTC

[incubator-seatunnel] branch dev updated: [Bug][Starter] Fix variables cannot be set in flink mode (#1854)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e35b65a3 [Bug][Starter] Fix variables cannot be set in flink mode (#1854)
e35b65a3 is described below

commit e35b65a3403494948fc55b1787933278c3ac4651
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Wed May 11 23:39:16 2022 +0800

    [Bug][Starter] Fix variables cannot be set in flink mode (#1854)
    
    * Fix variables cannot be set in flink mode
---
 .../src/main/bin/start-seatunnel-sql.sh            |  4 ++
 .../core/sql/FlinkEnvParameterParser.java          | 45 ++++++++++++++++
 .../apache/seatunnel/core/sql/FlinkSqlStarter.java |  2 +-
 .../core/sql/FlinkEnvParameterParserTest.java      | 37 +++++++++++++
 .../src/main/bin/start-seatunnel-flink.sh          |  4 ++
 .../core/flink/FlinkEnvParameterParser.java        | 60 ++++++++++++++++++++++
 .../apache/seatunnel/core/flink/FlinkStarter.java  |  4 +-
 .../core/flink/utils/CommandLineUtils.java         | 21 +-------
 .../core/flink/FlinkEnvParameterParserTest.java    | 47 +++++++++++++++++
 .../seatunnel/core/flink/FlinkStarterTest.java     |  1 -
 .../core/flink/utils/CommandLineUtilsTest.java     | 26 +++++-----
 11 files changed, 214 insertions(+), 37 deletions(-)

diff --git a/seatunnel-core/seatunnel-core-flink-sql/src/main/bin/start-seatunnel-sql.sh b/seatunnel-core/seatunnel-core-flink-sql/src/main/bin/start-seatunnel-sql.sh
index bc0e13fb..9294a86b 100755
--- a/seatunnel-core/seatunnel-core-flink-sql/src/main/bin/start-seatunnel-sql.sh
+++ b/seatunnel-core/seatunnel-core-flink-sql/src/main/bin/start-seatunnel-sql.sh
@@ -32,6 +32,10 @@ else
     args=$@
 fi
 
+ENV_PARAMETERS=$(java -cp ${APP_JAR} org.apache.seatunnel.core.sql.FlinkEnvParameterParser ${args})
+echo "Export JVM_ARGS: ${ENV_PARAMETERS}"
+export JVM_ARGS="${ENV_PARAMETERS}"
+
 CMD=$(java -cp ${APP_JAR} org.apache.seatunnel.core.sql.FlinkSqlStarter ${args}) && EXIT_CODE=$? || EXIT_CODE=$?
 if [ ${EXIT_CODE} -eq 234 ]; then
     # print usage
diff --git a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkEnvParameterParser.java b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkEnvParameterParser.java
new file mode 100644
index 00000000..b14838e8
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkEnvParameterParser.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.sql;
+
+import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
+import org.apache.seatunnel.core.flink.config.FlinkJobType;
+import org.apache.seatunnel.core.flink.utils.CommandLineUtils;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class FlinkEnvParameterParser {
+
+    @SuppressWarnings("checkstyle:RegexpSingleline")
+    public static void main(String[] args) {
+        FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.SQL);
+        List<String> envParameters = getEnvParameters(flinkCommandArgs);
+        System.out.println(String.join(" ", envParameters));
+    }
+
+    static List<String> getEnvParameters(FlinkCommandArgs flinkCommandArgs) {
+        return flinkCommandArgs.getVariables().stream()
+            .filter(StringUtils::isNotBlank)
+            .map(String::trim)
+            .map(parameter -> "-D" + parameter)
+            .collect(Collectors.toList());
+    }
+}
diff --git a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
index 7986716b..313b966c 100644
--- a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
+++ b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
@@ -45,7 +45,7 @@ public class FlinkSqlStarter implements Starter {
 
     @Override
     public List<String> buildCommands() throws Exception {
-        return CommandLineUtils.buildFlinkCommand(flinkCommandArgs, CLASS_NAME, appJar, FlinkJobType.SQL);
+        return CommandLineUtils.buildFlinkCommand(flinkCommandArgs, CLASS_NAME, appJar);
     }
 
     @SuppressWarnings("checkstyle:RegexpSingleline")
diff --git a/seatunnel-core/seatunnel-core-flink-sql/src/test/java/org/apache/seatunnel/core/sql/FlinkEnvParameterParserTest.java b/seatunnel-core/seatunnel-core-flink-sql/src/test/java/org/apache/seatunnel/core/sql/FlinkEnvParameterParserTest.java
new file mode 100644
index 00000000..74ca79b4
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink-sql/src/test/java/org/apache/seatunnel/core/sql/FlinkEnvParameterParserTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.sql;
+
+import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class FlinkEnvParameterParserTest {
+
+    @Test
+    public void getEnvParameters() {
+        FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
+        flinkCommandArgs.setVariables(Arrays.asList("age=18", "name=zhangsan"));
+        List<String> envParameters = FlinkEnvParameterParser.getEnvParameters(flinkCommandArgs);
+        Assert.assertArrayEquals(new String[]{"-Dage=18", "-Dname=zhangsan"}, envParameters.toArray());
+    }
+}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/bin/start-seatunnel-flink.sh b/seatunnel-core/seatunnel-core-flink/src/main/bin/start-seatunnel-flink.sh
index 05fe96b3..f9041d17 100755
--- a/seatunnel-core/seatunnel-core-flink/src/main/bin/start-seatunnel-flink.sh
+++ b/seatunnel-core/seatunnel-core-flink/src/main/bin/start-seatunnel-flink.sh
@@ -32,6 +32,10 @@ else
     args=$@
 fi
 
+ENV_PARAMETERS=$(java -cp ${APP_JAR} org.apache.seatunnel.core.flink.FlinkEnvParameterParser ${args})
+echo "Export JVM_ARGS: ${ENV_PARAMETERS}"
+export JVM_ARGS="${ENV_PARAMETERS}"
+
 CMD=$(java -cp ${APP_JAR} org.apache.seatunnel.core.flink.FlinkStarter ${args}) && EXIT_CODE=$? || EXIT_CODE=$?
 if [ ${EXIT_CODE} -eq 234 ]; then
     # print usage
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkEnvParameterParser.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkEnvParameterParser.java
new file mode 100644
index 00000000..7da5437f
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkEnvParameterParser.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.flink;
+
+import org.apache.seatunnel.core.base.config.ConfigParser;
+import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
+import org.apache.seatunnel.core.flink.config.FlinkJobType;
+import org.apache.seatunnel.core.flink.utils.CommandLineUtils;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Used to parse the variables need to set into the environment.
+ */
+public class FlinkEnvParameterParser {
+
+    @SuppressWarnings("checkstyle:RegexpSingleline")
+    public static void main(String[] args) throws FileNotFoundException {
+        FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
+        List<String> envParameters = getEnvParameters(flinkCommandArgs);
+        System.out.println(String.join(" ", envParameters));
+    }
+
+    static List<String> getEnvParameters(FlinkCommandArgs flinkCommandArgs) throws FileNotFoundException {
+        List<String> envParameters = new ArrayList<>();
+        // add variables
+        flinkCommandArgs.getVariables().stream()
+            .filter(StringUtils::isNotBlank)
+            .map(String::trim)
+            .forEach(parameter -> envParameters.add("-D" + parameter));
+        // add env
+        ConfigParser.getConfigEnvValues(flinkCommandArgs.getConfigFile())
+            .entrySet()
+            .stream()
+            .sorted(Comparator.comparing(Map.Entry::getKey))
+            .forEach(entry -> envParameters.add("-D" + entry.getKey() + "=" + entry.getValue()));
+        return envParameters;
+    }
+}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
index 2da33dee..f6ba8071 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
@@ -57,8 +57,8 @@ public class FlinkStarter implements Starter {
     }
 
     @Override
-    public List<String> buildCommands() throws Exception {
-        return CommandLineUtils.buildFlinkCommand(flinkCommandArgs, APP_NAME, appJar, FlinkJobType.JAR);
+    public List<String> buildCommands() {
+        return CommandLineUtils.buildFlinkCommand(flinkCommandArgs, APP_NAME, appJar);
     }
 
 }
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java
index 4e466e43..51ba3c81 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java
@@ -19,19 +19,14 @@ package org.apache.seatunnel.core.flink.utils;
 
 import static org.apache.seatunnel.core.flink.constant.FlinkConstant.USAGE_EXIT_CODE;
 
-import org.apache.seatunnel.core.base.config.ConfigParser;
 import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
 import org.apache.seatunnel.core.flink.config.FlinkJobType;
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.UnixStyleUsageFormatter;
 
-import java.io.FileNotFoundException;
 import java.util.ArrayList;
-import java.util.Comparator;
 import java.util.List;
-import java.util.Map;
-import java.util.Objects;
 
 public class CommandLineUtils {
 
@@ -58,7 +53,7 @@ public class CommandLineUtils {
 
     }
 
-    public static List<String> buildFlinkCommand(FlinkCommandArgs flinkCommandArgs, String className, String jarPath, FlinkJobType jobType) throws FileNotFoundException {
+    public static List<String> buildFlinkCommand(FlinkCommandArgs flinkCommandArgs, String className, String jarPath) {
         List<String> command = new ArrayList<>();
         command.add("${FLINK_HOME}/bin/flink");
         command.add(flinkCommandArgs.getRunMode().getMode());
@@ -71,21 +66,7 @@ public class CommandLineUtils {
         if (flinkCommandArgs.isCheckConfig()) {
             command.add("--check");
         }
-        // set System properties
-        flinkCommandArgs.getVariables().stream()
-          .filter(Objects::nonNull)
-          .map(String::trim)
-          .forEach(variable -> command.add("-D" + variable));
-
-        if (jobType.equals(FlinkJobType.JAR)) {
-            ConfigParser.getConfigEnvValues(flinkCommandArgs.getConfigFile())
-                .entrySet()
-                .stream()
-                .sorted(Comparator.comparing(Map.Entry::getKey))
-                .forEach(entry -> command.add("-D" + entry.getKey() + "=" + entry.getValue()));
-        }
 
         return command;
-
     }
 }
diff --git a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/FlinkEnvParameterParserTest.java b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/FlinkEnvParameterParserTest.java
new file mode 100644
index 00000000..37cf5343
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/FlinkEnvParameterParserTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.flink;
+
+import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.FileNotFoundException;
+import java.util.Arrays;
+import java.util.List;
+
+public class FlinkEnvParameterParserTest {
+
+    static final String APP_CONF_PATH = ClassLoader.getSystemResource("app.conf").getPath();
+
+    @Test
+    public void getEnvParameters() throws FileNotFoundException {
+        FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
+        flinkCommandArgs.setConfigFile(APP_CONF_PATH);
+        flinkCommandArgs.setVariables(Arrays.asList("name=tom", "age=1"));
+
+        List<String> envParameters = FlinkEnvParameterParser.getEnvParameters(flinkCommandArgs);
+        envParameters.sort(String::compareTo);
+
+        String[] expected = {"-Dexecution.parallelism=1",
+            "-Dexecution.checkpoint.interval=10000", "-Dexecution.checkpoint.data-uri=hdfs://localhost:9000/checkpoint", "-Dname=tom", "-Dage=1"};
+        Arrays.sort(expected);
+        Assert.assertArrayEquals(expected, envParameters.toArray());
+    }
+}
diff --git a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/FlinkStarterTest.java b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/FlinkStarterTest.java
index 8662a804..e12a8987 100644
--- a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/FlinkStarterTest.java
+++ b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/FlinkStarterTest.java
@@ -32,7 +32,6 @@ public class FlinkStarterTest {
         // since we cannot get the actual jar path, so we just check the command contains the command
         Assert.assertTrue(flinkExecuteCommand.contains("--config " + APP_CONF_PATH));
         Assert.assertTrue(flinkExecuteCommand.contains("-m yarn-cluster"));
-        Assert.assertTrue(flinkExecuteCommand.contains("-Dkey1=value1"));
         Assert.assertTrue(flinkExecuteCommand.contains("${FLINK_HOME}/bin/flink run"));
 
         String[] args1 = {"--config", APP_CONF_PATH, "-m", "yarn-cluster", "-i", "key1=value1", "-i", "key2=value2", "--run-mode", "run-application"};
diff --git a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/utils/CommandLineUtilsTest.java b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/utils/CommandLineUtilsTest.java
index 93a19301..96b71371 100644
--- a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/utils/CommandLineUtilsTest.java
+++ b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/utils/CommandLineUtilsTest.java
@@ -54,27 +54,27 @@ public class CommandLineUtilsTest {
         String[] args = {"--detached", "-c", APP_CONF_PATH, "-t", "-i", "city=shenyang", "-i", "date=20200202",
             "-r", "run-application", "--unkown", "unkown-command"};
         FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
-        List<String> commands = CommandLineUtils.buildFlinkCommand(flinkCommandArgs, "CLASS_NAME", "/path/to/jar", FlinkJobType.JAR);
+        List<String> commands = CommandLineUtils.buildFlinkCommand(flinkCommandArgs, "CLASS_NAME", "/path/to/jar");
         Assert.assertEquals(commands,
             Arrays.asList("${FLINK_HOME}/bin/flink", "run-application", "--detached", "--unkown", "unkown-command", "-c",
-                "CLASS_NAME", "/path/to/jar", "--config", APP_CONF_PATH, "--check", "-Dcity=shenyang", "-Ddate=20200202",
-                "-Dexecution.checkpoint.data-uri=hdfs://localhost:9000/checkpoint", "-Dexecution.checkpoint.interval=10000",
-                "-Dexecution.parallelism=1"));
+                "CLASS_NAME", "/path/to/jar", "--config", APP_CONF_PATH, "--check"));
 
         flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
-        commands = CommandLineUtils.buildFlinkCommand(flinkCommandArgs, "CLASS_NAME", "/path/to/jar", FlinkJobType.JAR);
+        commands = CommandLineUtils.buildFlinkCommand(flinkCommandArgs, "CLASS_NAME", "/path/to/jar");
         Assert.assertEquals(commands,
             Arrays.asList("${FLINK_HOME}/bin/flink", "run-application", "--detached", "--unkown", "unkown-command", "-c",
-                "CLASS_NAME", "/path/to/jar", "--config", APP_CONF_PATH, "--check", "-Dcity=shenyang", "-Ddate=20200202",
-                "-Dexecution.checkpoint.data-uri=hdfs://localhost:9000/checkpoint", "-Dexecution.checkpoint.interval=10000",
-                "-Dexecution.parallelism=1"));
+                "CLASS_NAME", "/path/to/jar", "--config", APP_CONF_PATH, "--check"));
 
         String[] args1 = {"--detached", "-c", "app.conf", "-t", "-i", "city=shenyang", "-i", "date=20200202",
             "-r", "run-application", "--unkown", "unkown-command"};
 
-        Assert.assertThrows(FileNotFoundException.class, () -> {
-            CommandLineUtils.buildFlinkCommand(CommandLineUtils.parseCommandArgs(args1, FlinkJobType.SQL), "CLASS_NAME", "/path/to/jar", FlinkJobType.JAR);
-        });
+        List<String> command = CommandLineUtils.buildFlinkCommand(
+            CommandLineUtils.parseCommandArgs(args1, FlinkJobType.SQL), "CLASS_NAME", "/path/to/jar");
+        Assert.assertEquals(
+            Arrays.asList("${FLINK_HOME}/bin/flink", "run-application", "--detached", "--unkown", "unkown-command", "-c",
+                "CLASS_NAME", "/path/to/jar", "--config", "app.conf", "--check"),
+            command);
+
     }
 
     @Test
@@ -82,9 +82,9 @@ public class CommandLineUtilsTest {
         String[] args = {"--detached", "-c", SQL_CONF_PATH, "-t", "-i", "city=shenyang", "-i", "date=20200202",
             "-r", "run-application", "--unkown", "unkown-command"};
         FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.SQL);
-        List<String> commands = CommandLineUtils.buildFlinkCommand(flinkCommandArgs, "CLASS_NAME", "/path/to/jar", FlinkJobType.SQL);
+        List<String> commands = CommandLineUtils.buildFlinkCommand(flinkCommandArgs, "CLASS_NAME", "/path/to/jar");
         Assert.assertEquals(commands,
             Arrays.asList("${FLINK_HOME}/bin/flink", "run-application", "--detached", "--unkown", "unkown-command", "-c",
-                "CLASS_NAME", "/path/to/jar", "--config", SQL_CONF_PATH, "--check", "-Dcity=shenyang", "-Ddate=20200202"));
+                "CLASS_NAME", "/path/to/jar", "--config", SQL_CONF_PATH, "--check"));
     }
 }