You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/07 16:14:01 UTC

[incubator-seatunnel] branch dev updated: [Bugfix][seatunnel-core-flink] fix flink sql conf parse exception (#1815)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 13846f45 [Bugfix][seatunnel-core-flink] fix flink sql conf parse exception (#1815)
13846f45 is described below

commit 13846f45d1334c94e185a95c9f7a0d0fe22b47bd
Author: legendtkl <ta...@gmail.com>
AuthorDate: Sun May 8 00:13:55 2022 +0800

    [Bugfix][seatunnel-core-flink] fix flink sql conf parse exception (#1815)
---
 .../apache/seatunnel/core/sql/FlinkSqlStarter.java |  2 +-
 .../apache/seatunnel/core/flink/FlinkStarter.java  |  2 +-
 .../core/flink/utils/CommandLineUtils.java         | 23 ++++------
 .../core/flink/utils/CommandLineUtilsTest.java     | 22 +++++++---
 .../src/test/resources/sql.conf                    | 49 ++++++++++++++++++++++
 5 files changed, 76 insertions(+), 22 deletions(-)

diff --git a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
index 313b966c..7986716b 100644
--- a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
+++ b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
@@ -45,7 +45,7 @@ public class FlinkSqlStarter implements Starter {
 
     @Override
     public List<String> buildCommands() throws Exception {
-        return CommandLineUtils.buildFlinkCommand(flinkCommandArgs, CLASS_NAME, appJar);
+        return CommandLineUtils.buildFlinkCommand(flinkCommandArgs, CLASS_NAME, appJar, FlinkJobType.SQL);
     }
 
     @SuppressWarnings("checkstyle:RegexpSingleline")
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
index 2141dc86..2da33dee 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
@@ -58,7 +58,7 @@ public class FlinkStarter implements Starter {
 
     @Override
     public List<String> buildCommands() throws Exception {
-        return CommandLineUtils.buildFlinkCommand(flinkCommandArgs, APP_NAME, appJar);
+        return CommandLineUtils.buildFlinkCommand(flinkCommandArgs, APP_NAME, appJar, FlinkJobType.JAR);
     }
 
 }
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java
index d4d360d2..4e466e43 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java
@@ -39,15 +39,6 @@ public class CommandLineUtils {
         throw new UnsupportedOperationException("CommandLineUtils is a utility class and cannot be instantiated");
     }
 
-    public static FlinkCommandArgs parseFlinkArgs(String[] args) {
-        FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
-        JCommander.newBuilder()
-            .addObject(flinkCommandArgs)
-            .build()
-            .parse(args);
-        return flinkCommandArgs;
-    }
-
     public static FlinkCommandArgs parseCommandArgs(String[] args, FlinkJobType jobType) {
         FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
         JCommander jCommander = JCommander.newBuilder()
@@ -67,7 +58,7 @@ public class CommandLineUtils {
 
     }
 
-    public static List<String> buildFlinkCommand(FlinkCommandArgs flinkCommandArgs, String className, String jarPath) throws FileNotFoundException {
+    public static List<String> buildFlinkCommand(FlinkCommandArgs flinkCommandArgs, String className, String jarPath, FlinkJobType jobType) throws FileNotFoundException {
         List<String> command = new ArrayList<>();
         command.add("${FLINK_HOME}/bin/flink");
         command.add(flinkCommandArgs.getRunMode().getMode());
@@ -86,11 +77,13 @@ public class CommandLineUtils {
           .map(String::trim)
           .forEach(variable -> command.add("-D" + variable));
 
-        ConfigParser.getConfigEnvValues(flinkCommandArgs.getConfigFile())
-            .entrySet()
-            .stream()
-            .sorted(Comparator.comparing(Map.Entry::getKey))
-            .forEach(entry -> command.add("-D" + entry.getKey() + "=" + entry.getValue()));
+        if (jobType.equals(FlinkJobType.JAR)) {
+            ConfigParser.getConfigEnvValues(flinkCommandArgs.getConfigFile())
+                .entrySet()
+                .stream()
+                .sorted(Comparator.comparing(Map.Entry::getKey))
+                .forEach(entry -> command.add("-D" + entry.getKey() + "=" + entry.getValue()));
+        }
 
         return command;
 
diff --git a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/utils/CommandLineUtilsTest.java b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/utils/CommandLineUtilsTest.java
index 8d879b1c..93a19301 100644
--- a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/utils/CommandLineUtilsTest.java
+++ b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/utils/CommandLineUtilsTest.java
@@ -30,6 +30,7 @@ import java.util.List;
 
 public class CommandLineUtilsTest {
     static final String APP_CONF_PATH = ClassLoader.getSystemResource("app.conf").getPath();
+    static final String SQL_CONF_PATH = ClassLoader.getSystemResource("sql.conf").getPath();
 
     @Test
     public void testParseCommandArgs() {
@@ -49,19 +50,19 @@ public class CommandLineUtilsTest {
     }
 
     @Test
-    public void testBuildFlinkCommand() throws FileNotFoundException {
+    public void testBuildFlinkJarCommand() throws FileNotFoundException {
         String[] args = {"--detached", "-c", APP_CONF_PATH, "-t", "-i", "city=shenyang", "-i", "date=20200202",
             "-r", "run-application", "--unkown", "unkown-command"};
         FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
-        List<String> commands = CommandLineUtils.buildFlinkCommand(flinkCommandArgs, "CLASS_NAME", "/path/to/jar");
+        List<String> commands = CommandLineUtils.buildFlinkCommand(flinkCommandArgs, "CLASS_NAME", "/path/to/jar", FlinkJobType.JAR);
         Assert.assertEquals(commands,
             Arrays.asList("${FLINK_HOME}/bin/flink", "run-application", "--detached", "--unkown", "unkown-command", "-c",
                 "CLASS_NAME", "/path/to/jar", "--config", APP_CONF_PATH, "--check", "-Dcity=shenyang", "-Ddate=20200202",
                 "-Dexecution.checkpoint.data-uri=hdfs://localhost:9000/checkpoint", "-Dexecution.checkpoint.interval=10000",
                 "-Dexecution.parallelism=1"));
 
-        flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.SQL);
-        commands = CommandLineUtils.buildFlinkCommand(flinkCommandArgs, "CLASS_NAME", "/path/to/jar");
+        flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
+        commands = CommandLineUtils.buildFlinkCommand(flinkCommandArgs, "CLASS_NAME", "/path/to/jar", FlinkJobType.JAR);
         Assert.assertEquals(commands,
             Arrays.asList("${FLINK_HOME}/bin/flink", "run-application", "--detached", "--unkown", "unkown-command", "-c",
                 "CLASS_NAME", "/path/to/jar", "--config", APP_CONF_PATH, "--check", "-Dcity=shenyang", "-Ddate=20200202",
@@ -72,7 +73,18 @@ public class CommandLineUtilsTest {
             "-r", "run-application", "--unkown", "unkown-command"};
 
         Assert.assertThrows(FileNotFoundException.class, () -> {
-            CommandLineUtils.buildFlinkCommand(CommandLineUtils.parseCommandArgs(args1, FlinkJobType.SQL), "CLASS_NAME", "/path/to/jar");
+            CommandLineUtils.buildFlinkCommand(CommandLineUtils.parseCommandArgs(args1, FlinkJobType.SQL), "CLASS_NAME", "/path/to/jar", FlinkJobType.JAR);
         });
     }
+
+    @Test
+    public void testBuildFlinkSQLCommand() throws FileNotFoundException{
+        String[] args = {"--detached", "-c", SQL_CONF_PATH, "-t", "-i", "city=shenyang", "-i", "date=20200202",
+            "-r", "run-application", "--unkown", "unkown-command"};
+        FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.SQL);
+        List<String> commands = CommandLineUtils.buildFlinkCommand(flinkCommandArgs, "CLASS_NAME", "/path/to/jar", FlinkJobType.SQL);
+        Assert.assertEquals(commands,
+            Arrays.asList("${FLINK_HOME}/bin/flink", "run-application", "--detached", "--unkown", "unkown-command", "-c",
+                "CLASS_NAME", "/path/to/jar", "--config", SQL_CONF_PATH, "--check", "-Dcity=shenyang", "-Ddate=20200202"));
+    }
 }
diff --git a/seatunnel-core/seatunnel-core-flink/src/test/resources/sql.conf b/seatunnel-core/seatunnel-core-flink/src/test/resources/sql.conf
new file mode 100644
index 00000000..ddca7e01
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink/src/test/resources/sql.conf
@@ -0,0 +1,49 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--
+-- This config file is a demonstration of sql processing in SeaTunnel config
+--
+--
+
+
+SET 'table.dml-sync' = 'true';
+
+CREATE TABLE events (
+  f_type INT,
+  f_uid INT,
+  ts AS localtimestamp,
+  WATERMARK FOR ts AS ts
+) WITH (
+  'connector' = 'datagen',
+  'rows-per-second'='5',
+  'fields.f_type.min'='1',
+  'fields.f_type.max'='5',
+  'fields.f_uid.min'='1',
+  'fields.f_uid.max'='1000'
+);
+
+CREATE TABLE print_table (
+  type INT,
+  uid INT,
+  lstmt TIMESTAMP
+) WITH (
+  'connector' = 'print',
+  'sink.parallelism' = '1'
+);
+
+INSERT INTO print_table SELECT * FROM events where f_type = 1;