You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/07 16:14:01 UTC
[incubator-seatunnel] branch dev updated: [Bugfix][seatunnel-core-flink] fix flink sql conf parse exception (#1815)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 13846f45 [Bugfix][seatunnel-core-flink] fix flink sql conf parse exception (#1815)
13846f45 is described below
commit 13846f45d1334c94e185a95c9f7a0d0fe22b47bd
Author: legendtkl <ta...@gmail.com>
AuthorDate: Sun May 8 00:13:55 2022 +0800
[Bugfix][seatunnel-core-flink] fix flink sql conf parse exception (#1815)
---
.../apache/seatunnel/core/sql/FlinkSqlStarter.java | 2 +-
.../apache/seatunnel/core/flink/FlinkStarter.java | 2 +-
.../core/flink/utils/CommandLineUtils.java | 23 ++++------
.../core/flink/utils/CommandLineUtilsTest.java | 22 +++++++---
.../src/test/resources/sql.conf | 49 ++++++++++++++++++++++
5 files changed, 76 insertions(+), 22 deletions(-)
diff --git a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
index 313b966c..7986716b 100644
--- a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
+++ b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
@@ -45,7 +45,7 @@ public class FlinkSqlStarter implements Starter {
@Override
public List<String> buildCommands() throws Exception {
- return CommandLineUtils.buildFlinkCommand(flinkCommandArgs, CLASS_NAME, appJar);
+ return CommandLineUtils.buildFlinkCommand(flinkCommandArgs, CLASS_NAME, appJar, FlinkJobType.SQL);
}
@SuppressWarnings("checkstyle:RegexpSingleline")
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
index 2141dc86..2da33dee 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
@@ -58,7 +58,7 @@ public class FlinkStarter implements Starter {
@Override
public List<String> buildCommands() throws Exception {
- return CommandLineUtils.buildFlinkCommand(flinkCommandArgs, APP_NAME, appJar);
+ return CommandLineUtils.buildFlinkCommand(flinkCommandArgs, APP_NAME, appJar, FlinkJobType.JAR);
}
}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java
index d4d360d2..4e466e43 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java
@@ -39,15 +39,6 @@ public class CommandLineUtils {
throw new UnsupportedOperationException("CommandLineUtils is a utility class and cannot be instantiated");
}
- public static FlinkCommandArgs parseFlinkArgs(String[] args) {
- FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
- JCommander.newBuilder()
- .addObject(flinkCommandArgs)
- .build()
- .parse(args);
- return flinkCommandArgs;
- }
-
public static FlinkCommandArgs parseCommandArgs(String[] args, FlinkJobType jobType) {
FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
JCommander jCommander = JCommander.newBuilder()
@@ -67,7 +58,7 @@ public class CommandLineUtils {
}
- public static List<String> buildFlinkCommand(FlinkCommandArgs flinkCommandArgs, String className, String jarPath) throws FileNotFoundException {
+ public static List<String> buildFlinkCommand(FlinkCommandArgs flinkCommandArgs, String className, String jarPath, FlinkJobType jobType) throws FileNotFoundException {
List<String> command = new ArrayList<>();
command.add("${FLINK_HOME}/bin/flink");
command.add(flinkCommandArgs.getRunMode().getMode());
@@ -86,11 +77,13 @@ public class CommandLineUtils {
.map(String::trim)
.forEach(variable -> command.add("-D" + variable));
- ConfigParser.getConfigEnvValues(flinkCommandArgs.getConfigFile())
- .entrySet()
- .stream()
- .sorted(Comparator.comparing(Map.Entry::getKey))
- .forEach(entry -> command.add("-D" + entry.getKey() + "=" + entry.getValue()));
+ if (jobType.equals(FlinkJobType.JAR)) {
+ ConfigParser.getConfigEnvValues(flinkCommandArgs.getConfigFile())
+ .entrySet()
+ .stream()
+ .sorted(Comparator.comparing(Map.Entry::getKey))
+ .forEach(entry -> command.add("-D" + entry.getKey() + "=" + entry.getValue()));
+ }
return command;
diff --git a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/utils/CommandLineUtilsTest.java b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/utils/CommandLineUtilsTest.java
index 8d879b1c..93a19301 100644
--- a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/utils/CommandLineUtilsTest.java
+++ b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/utils/CommandLineUtilsTest.java
@@ -30,6 +30,7 @@ import java.util.List;
public class CommandLineUtilsTest {
static final String APP_CONF_PATH = ClassLoader.getSystemResource("app.conf").getPath();
+ static final String SQL_CONF_PATH = ClassLoader.getSystemResource("sql.conf").getPath();
@Test
public void testParseCommandArgs() {
@@ -49,19 +50,19 @@ public class CommandLineUtilsTest {
}
@Test
- public void testBuildFlinkCommand() throws FileNotFoundException {
+ public void testBuildFlinkJarCommand() throws FileNotFoundException {
String[] args = {"--detached", "-c", APP_CONF_PATH, "-t", "-i", "city=shenyang", "-i", "date=20200202",
"-r", "run-application", "--unkown", "unkown-command"};
FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
- List<String> commands = CommandLineUtils.buildFlinkCommand(flinkCommandArgs, "CLASS_NAME", "/path/to/jar");
+ List<String> commands = CommandLineUtils.buildFlinkCommand(flinkCommandArgs, "CLASS_NAME", "/path/to/jar", FlinkJobType.JAR);
Assert.assertEquals(commands,
Arrays.asList("${FLINK_HOME}/bin/flink", "run-application", "--detached", "--unkown", "unkown-command", "-c",
"CLASS_NAME", "/path/to/jar", "--config", APP_CONF_PATH, "--check", "-Dcity=shenyang", "-Ddate=20200202",
"-Dexecution.checkpoint.data-uri=hdfs://localhost:9000/checkpoint", "-Dexecution.checkpoint.interval=10000",
"-Dexecution.parallelism=1"));
- flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.SQL);
- commands = CommandLineUtils.buildFlinkCommand(flinkCommandArgs, "CLASS_NAME", "/path/to/jar");
+ flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
+ commands = CommandLineUtils.buildFlinkCommand(flinkCommandArgs, "CLASS_NAME", "/path/to/jar", FlinkJobType.JAR);
Assert.assertEquals(commands,
Arrays.asList("${FLINK_HOME}/bin/flink", "run-application", "--detached", "--unkown", "unkown-command", "-c",
"CLASS_NAME", "/path/to/jar", "--config", APP_CONF_PATH, "--check", "-Dcity=shenyang", "-Ddate=20200202",
@@ -72,7 +73,18 @@ public class CommandLineUtilsTest {
"-r", "run-application", "--unkown", "unkown-command"};
Assert.assertThrows(FileNotFoundException.class, () -> {
- CommandLineUtils.buildFlinkCommand(CommandLineUtils.parseCommandArgs(args1, FlinkJobType.SQL), "CLASS_NAME", "/path/to/jar");
+ CommandLineUtils.buildFlinkCommand(CommandLineUtils.parseCommandArgs(args1, FlinkJobType.SQL), "CLASS_NAME", "/path/to/jar", FlinkJobType.JAR);
});
}
+
+ @Test
+ public void testBuildFlinkSQLCommand() throws FileNotFoundException{
+ String[] args = {"--detached", "-c", SQL_CONF_PATH, "-t", "-i", "city=shenyang", "-i", "date=20200202",
+ "-r", "run-application", "--unkown", "unkown-command"};
+ FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.SQL);
+ List<String> commands = CommandLineUtils.buildFlinkCommand(flinkCommandArgs, "CLASS_NAME", "/path/to/jar", FlinkJobType.SQL);
+ Assert.assertEquals(commands,
+ Arrays.asList("${FLINK_HOME}/bin/flink", "run-application", "--detached", "--unkown", "unkown-command", "-c",
+ "CLASS_NAME", "/path/to/jar", "--config", SQL_CONF_PATH, "--check", "-Dcity=shenyang", "-Ddate=20200202"));
+ }
}
diff --git a/seatunnel-core/seatunnel-core-flink/src/test/resources/sql.conf b/seatunnel-core/seatunnel-core-flink/src/test/resources/sql.conf
new file mode 100644
index 00000000..ddca7e01
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink/src/test/resources/sql.conf
@@ -0,0 +1,49 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--
+-- This config file is a demonstration of sql processing in SeaTunnel config
+--
+--
+
+
+SET 'table.dml-sync' = 'true';
+
+CREATE TABLE events (
+ f_type INT,
+ f_uid INT,
+ ts AS localtimestamp,
+ WATERMARK FOR ts AS ts
+) WITH (
+ 'connector' = 'datagen',
+ 'rows-per-second'='5',
+ 'fields.f_type.min'='1',
+ 'fields.f_type.max'='5',
+ 'fields.f_uid.min'='1',
+ 'fields.f_uid.max'='1000'
+);
+
+CREATE TABLE print_table (
+ type INT,
+ uid INT,
+ lstmt TIMESTAMP
+) WITH (
+ 'connector' = 'print',
+ 'sink.parallelism' = '1'
+);
+
+INSERT INTO print_table SELECT * FROM events where f_type = 1;