You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by li...@apache.org on 2020/12/10 14:37:38 UTC
[incubator-dolphinscheduler] branch dev updated: [Fix-3457][flink]
fix flink args build problem (#4166)
This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new cbc30b4 [Fix-3457][flink] fix flink args build problem (#4166)
cbc30b4 is described below
commit cbc30b4900215424dcbbfb49539259d32273efc3
Author: Shiwen Cheng <ch...@gmail.com>
AuthorDate: Thu Dec 10 22:37:21 2020 +0800
[Fix-3457][flink] fix flink args build problem (#4166)
* [Fix][Flink] fix flink args build problem
* [Fix][Flink] fix FlinkArgsUtilsTest
* [Improvement][UI] hide version and cluster input when deployMode is local
---
.../src/main/resources/application-api.properties | 2 +-
.../apache/dolphinscheduler/common/Constants.java | 7 +----
.../server/utils/FlinkArgsUtils.java | 31 +++++++++++-----------
.../server/worker/task/flink/FlinkTask.java | 2 ++
.../server/utils/FlinkArgsUtilsTest.java | 22 +++++++--------
.../pages/dag/_source/formModel/tasks/flink.vue | 4 +--
.../src/js/module/i18n/locale/zh_CN.js | 2 +-
pom.xml | 2 +-
8 files changed, 34 insertions(+), 38 deletions(-)
diff --git a/dolphinscheduler-api/src/main/resources/application-api.properties b/dolphinscheduler-api/src/main/resources/application-api.properties
index 5af7fbd..92217a7 100644
--- a/dolphinscheduler-api/src/main/resources/application-api.properties
+++ b/dolphinscheduler-api/src/main/resources/application-api.properties
@@ -35,7 +35,7 @@ server.compression.enabled=true
server.compression.mime-types=text/html,text/xml,text/plain,text/css,text/javascript,application/javascript,application/json,application/xml
#post content
-server.jetty.max-http-post-size=5000000
+server.jetty.max-http-form-post-size=5000000
spring.messages.encoding=UTF-8
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index e473428..cd6008d 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -605,12 +605,6 @@ public final class Constants {
/**
- * --queue --qu
- */
- public static final String FLINK_QUEUE = "--qu";
-
-
- /**
* exit code success
*/
public static final int EXIT_CODE_SUCCESS = 0;
@@ -838,6 +832,7 @@ public final class Constants {
public static final String FLINK_RUN_MODE = "-m";
public static final String FLINK_YARN_SLOT = "-ys";
public static final String FLINK_APP_NAME = "-ynm";
+ public static final String FLINK_QUEUE = "-yqu";
public static final String FLINK_TASK_MANAGE = "-yn";
public static final String FLINK_JOB_MANAGE_MEM = "-yjm";
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java
index eaaafc9..5d3b57b 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java
@@ -47,6 +47,7 @@ public class FlinkArgsUtils {
if (StringUtils.isNotEmpty(tmpDeployMode)) {
deployMode = tmpDeployMode;
}
+ String others = param.getOthers();
if (!LOCAL_DEPLOY_MODE.equals(deployMode)) {
args.add(Constants.FLINK_RUN_MODE); //-m
@@ -64,7 +65,7 @@ public class FlinkArgsUtils {
args.add(appName);
}
- // judgy flink version,from flink1.10,the parameter -yn removed
+ // judge flink version,from flink1.10,the parameter -yn removed
String flinkVersion = param.getFlinkVersion();
if (FLINK_VERSION_BEFORE_1_10.equals(flinkVersion)) {
int taskManager = param.getTaskManager();
@@ -85,10 +86,23 @@ public class FlinkArgsUtils {
args.add(taskManagerMemory);
}
+ if (StringUtils.isEmpty(others) || !others.contains(Constants.FLINK_QUEUE)) {
+ String queue = param.getQueue();
+ if (StringUtils.isNotEmpty(queue)) { // -yqu
+ args.add(Constants.FLINK_QUEUE);
+ args.add(queue);
+ }
+ }
+
args.add(Constants.FLINK_DETACH); //-d
}
+ // -p -s -yqu -yat -sae -yD -D
+ if (StringUtils.isNotEmpty(others)) {
+ args.add(others);
+ }
+
ProgramType programType = param.getProgramType();
String mainClass = param.getMainClass();
if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) {
@@ -106,21 +120,6 @@ public class FlinkArgsUtils {
args.add(mainArgs);
}
- // --files --conf --libjar ...
- String others = param.getOthers();
- String queue = param.getQueue();
- if (StringUtils.isNotEmpty(others)) {
-
- if (!others.contains(Constants.FLINK_QUEUE) && StringUtils.isNotEmpty(queue) && !deployMode.equals(LOCAL_DEPLOY_MODE)) {
- args.add(Constants.FLINK_QUEUE);
- args.add(param.getQueue());
- }
- args.add(others);
- } else if (StringUtils.isNotEmpty(queue) && !deployMode.equals(LOCAL_DEPLOY_MODE)) {
- args.add(Constants.FLINK_QUEUE);
- args.add(param.getQueue());
- }
-
return args;
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
index 58d5eaf..9de28e3 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
@@ -42,6 +42,7 @@ public class FlinkTask extends AbstractYarnTask {
/**
* flink command
+ * usage: flink run [OPTIONS] <jar-file> <arguments>
*/
private static final String FLINK_COMMAND = "flink";
private static final String FLINK_RUN = "run";
@@ -102,6 +103,7 @@ public class FlinkTask extends AbstractYarnTask {
*/
@Override
protected String buildCommand() {
+ // flink run [OPTIONS] <jar-file> <arguments>
List<String> args = new ArrayList<>();
args.add(FLINK_COMMAND);
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java
index a315232..bea6775 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java
@@ -46,9 +46,9 @@ public class FlinkArgsUtilsTest {
public ProgramType programType = ProgramType.JAVA;
public String mainClass = "com.test";
public ResourceInfo mainJar = null;
- public String mainArgs = "testArgs";
+ public String mainArgs = "testArgs --input file:///home";
public String queue = "queue1";
- public String others = "--input file:///home";
+ public String others = "-p 4";
public String flinkVersion = "<1.10";
@@ -109,20 +109,20 @@ public class FlinkArgsUtilsTest {
assertEquals("-ytm", result.get(10));
assertEquals(result.get(11),taskManagerMemory);
- assertEquals("-d", result.get(12));
+ assertEquals("-yqu", result.get(12));
+ assertEquals(result.get(13),queue);
- assertEquals("-c", result.get(13));
- assertEquals(result.get(14),mainClass);
+ assertEquals("-d", result.get(14));
- assertEquals(result.get(15),mainJar.getRes());
- assertEquals(result.get(16),mainArgs);
+ assertEquals(result.get(15),others);
- assertEquals("--qu", result.get(17));
- assertEquals(result.get(18),queue);
+ assertEquals("-c", result.get(16));
+ assertEquals(result.get(17),mainClass);
- assertEquals(result.get(19),others);
+ assertEquals(result.get(18),mainJar.getRes());
+ assertEquals(result.get(19),mainArgs);
- //Others param without --qu
+ //Others param without -yqu
FlinkParameters param1 = new FlinkParameters();
param1.setQueue(queue);
param1.setDeployMode(mode);
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue
index 87d9146..0fb4ee3 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue
@@ -62,7 +62,7 @@
</x-radio-group>
</div>
</m-list-box>
- <m-list-box>
+ <m-list-box v-if="deployMode === 'cluster'">
<div slot="text">{{$t('Flink Version')}}</div>
<div slot="content">
<x-select
@@ -78,7 +78,7 @@
</x-select>
</div>
</m-list-box>
- <div class="list-box-4p">
+ <div class="list-box-4p" v-if="deployMode === 'cluster'">
<div class="clearfix list">
<span class="sp1" style="word-break:break-all">{{$t('jobManagerMemory')}}</span>
<span class="sp2">
diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
index bc080d5..c3ab69b 100755
--- a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
+++ b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
@@ -535,7 +535,7 @@ export default {
'Execute time': '执行时间',
'Complement range': '补数范围',
slot: 'slot数量',
- taskManager: 'taskManage数量',
+ taskManager: 'taskManager数量',
jobManagerMemory: 'jobManager内存数',
taskManagerMemory: 'taskManager内存数',
'Http Url': '请求地址',
diff --git a/pom.xml b/pom.xml
index 796a5a0..c4f526c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -841,7 +841,7 @@
<include>**/server/utils/DataxUtilsTest.java</include>
<include>**/server/utils/ExecutionContextTestUtils.java</include>
<include>**/server/utils/HostTest.java</include>
- <!--<include>**/server/utils/FlinkArgsUtilsTest.java</include>-->
+ <include>**/server/utils/FlinkArgsUtilsTest.java</include>
<include>**/server/utils/LogUtilsTest.java</include>
<include>**/server/utils/ParamUtilsTest.java</include>
<include>**/server/utils/ProcessUtilsTest.java</include>