You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by li...@apache.org on 2020/12/10 14:37:38 UTC

[incubator-dolphinscheduler] branch dev updated: [Fix-3457][flink] fix flink args build problem (#4166)

This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new cbc30b4  [Fix-3457][flink] fix flink args build problem (#4166)
cbc30b4 is described below

commit cbc30b4900215424dcbbfb49539259d32273efc3
Author: Shiwen Cheng <ch...@gmail.com>
AuthorDate: Thu Dec 10 22:37:21 2020 +0800

    [Fix-3457][flink] fix flink args build problem (#4166)
    
    * [Fix][Flink] fix flink args build problem
    
    * [Fix][Flink] fix FlinkArgsUtilsTest
    
    * [Improvement][UI] hide version and cluster input when deployMode is local
---
 .../src/main/resources/application-api.properties  |  2 +-
 .../apache/dolphinscheduler/common/Constants.java  |  7 +----
 .../server/utils/FlinkArgsUtils.java               | 31 +++++++++++-----------
 .../server/worker/task/flink/FlinkTask.java        |  2 ++
 .../server/utils/FlinkArgsUtilsTest.java           | 22 +++++++--------
 .../pages/dag/_source/formModel/tasks/flink.vue    |  4 +--
 .../src/js/module/i18n/locale/zh_CN.js             |  2 +-
 pom.xml                                            |  2 +-
 8 files changed, 34 insertions(+), 38 deletions(-)

diff --git a/dolphinscheduler-api/src/main/resources/application-api.properties b/dolphinscheduler-api/src/main/resources/application-api.properties
index 5af7fbd..92217a7 100644
--- a/dolphinscheduler-api/src/main/resources/application-api.properties
+++ b/dolphinscheduler-api/src/main/resources/application-api.properties
@@ -35,7 +35,7 @@ server.compression.enabled=true
 server.compression.mime-types=text/html,text/xml,text/plain,text/css,text/javascript,application/javascript,application/json,application/xml
 
 #post content
-server.jetty.max-http-post-size=5000000
+server.jetty.max-http-form-post-size=5000000
 
 spring.messages.encoding=UTF-8
 
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index e473428..cd6008d 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -605,12 +605,6 @@ public final class Constants {
 
 
     /**
-     * --queue --qu
-     */
-    public static final String FLINK_QUEUE = "--qu";
-
-
-    /**
      * exit code success
      */
     public static final int EXIT_CODE_SUCCESS = 0;
@@ -838,6 +832,7 @@ public final class Constants {
     public static final String FLINK_RUN_MODE = "-m";
     public static final String FLINK_YARN_SLOT = "-ys";
     public static final String FLINK_APP_NAME = "-ynm";
+    public static final String FLINK_QUEUE = "-yqu";
     public static final String FLINK_TASK_MANAGE = "-yn";
 
     public static final String FLINK_JOB_MANAGE_MEM = "-yjm";
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java
index eaaafc9..5d3b57b 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java
@@ -47,6 +47,7 @@ public class FlinkArgsUtils {
         if (StringUtils.isNotEmpty(tmpDeployMode)) {
             deployMode = tmpDeployMode;
         }
+        String others = param.getOthers();
         if (!LOCAL_DEPLOY_MODE.equals(deployMode)) {
             args.add(Constants.FLINK_RUN_MODE);  //-m
 
@@ -64,7 +65,7 @@ public class FlinkArgsUtils {
                 args.add(appName);
             }
 
-            // judgy flink version,from flink1.10,the parameter -yn removed
+            // judge flink version,from flink1.10,the parameter -yn removed
             String flinkVersion = param.getFlinkVersion();
             if (FLINK_VERSION_BEFORE_1_10.equals(flinkVersion)) {
                 int taskManager = param.getTaskManager();
@@ -85,10 +86,23 @@ public class FlinkArgsUtils {
                 args.add(taskManagerMemory);
             }
 
+            if (StringUtils.isEmpty(others) || !others.contains(Constants.FLINK_QUEUE)) {
+                String queue = param.getQueue();
+                if (StringUtils.isNotEmpty(queue)) { // -yqu
+                    args.add(Constants.FLINK_QUEUE);
+                    args.add(queue);
+                }
+            }
+
             args.add(Constants.FLINK_DETACH); //-d
 
         }
 
+        // -p -s -yqu -yat -sae -yD -D
+        if (StringUtils.isNotEmpty(others)) {
+            args.add(others);
+        }
+
         ProgramType programType = param.getProgramType();
         String mainClass = param.getMainClass();
         if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) {
@@ -106,21 +120,6 @@ public class FlinkArgsUtils {
             args.add(mainArgs);
         }
 
-        // --files --conf --libjar ...
-        String others = param.getOthers();
-        String queue = param.getQueue();
-        if (StringUtils.isNotEmpty(others)) {
-
-            if (!others.contains(Constants.FLINK_QUEUE) && StringUtils.isNotEmpty(queue) && !deployMode.equals(LOCAL_DEPLOY_MODE)) {
-                args.add(Constants.FLINK_QUEUE);
-                args.add(param.getQueue());
-            }
-            args.add(others);
-        } else if (StringUtils.isNotEmpty(queue) && !deployMode.equals(LOCAL_DEPLOY_MODE)) {
-            args.add(Constants.FLINK_QUEUE);
-            args.add(param.getQueue());
-        }
-
         return args;
     }
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
index 58d5eaf..9de28e3 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
@@ -42,6 +42,7 @@ public class FlinkTask extends AbstractYarnTask {
 
   /**
    *  flink command
+   *  usage: flink run [OPTIONS] <jar-file> <arguments>
    */
   private static final String FLINK_COMMAND = "flink";
   private static final String FLINK_RUN = "run";
@@ -102,6 +103,7 @@ public class FlinkTask extends AbstractYarnTask {
    */
   @Override
   protected String buildCommand() {
+    // flink run [OPTIONS] <jar-file> <arguments>
     List<String> args = new ArrayList<>();
 
     args.add(FLINK_COMMAND);
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java
index a315232..bea6775 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java
@@ -46,9 +46,9 @@ public class FlinkArgsUtilsTest {
     public ProgramType programType = ProgramType.JAVA;
     public String mainClass = "com.test";
     public ResourceInfo mainJar = null;
-    public String mainArgs = "testArgs";
+    public String mainArgs = "testArgs --input file:///home";
     public String queue = "queue1";
-    public String others = "--input file:///home";
+    public String others = "-p 4";
     public String flinkVersion = "<1.10";
 
 
@@ -109,20 +109,20 @@ public class FlinkArgsUtilsTest {
         assertEquals("-ytm", result.get(10));
         assertEquals(result.get(11),taskManagerMemory);
 
-        assertEquals("-d", result.get(12));
+        assertEquals("-yqu", result.get(12));
+        assertEquals(result.get(13),queue);
 
-        assertEquals("-c", result.get(13));
-        assertEquals(result.get(14),mainClass);
+        assertEquals("-d", result.get(14));
 
-        assertEquals(result.get(15),mainJar.getRes());
-        assertEquals(result.get(16),mainArgs);
+        assertEquals(result.get(15),others);
 
-        assertEquals("--qu", result.get(17));
-        assertEquals(result.get(18),queue);
+        assertEquals("-c", result.get(16));
+        assertEquals(result.get(17),mainClass);
 
-        assertEquals(result.get(19),others);
+        assertEquals(result.get(18),mainJar.getRes());
+        assertEquals(result.get(19),mainArgs);
 
-        //Others param without --qu
+        //Others param without -yqu
         FlinkParameters param1 = new FlinkParameters();
         param1.setQueue(queue);
         param1.setDeployMode(mode);
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue
index 87d9146..0fb4ee3 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue
@@ -62,7 +62,7 @@
         </x-radio-group>
       </div>
     </m-list-box>
-    <m-list-box>
+    <m-list-box v-if="deployMode === 'cluster'">
       <div slot="text">{{$t('Flink Version')}}</div>
       <div slot="content">
         <x-select
@@ -78,7 +78,7 @@
         </x-select>
       </div>
     </m-list-box>
-    <div class="list-box-4p">
+    <div class="list-box-4p" v-if="deployMode === 'cluster'">
       <div class="clearfix list">
         <span class="sp1" style="word-break:break-all">{{$t('jobManagerMemory')}}</span>
         <span class="sp2">
diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
index bc080d5..c3ab69b 100755
--- a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
+++ b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
@@ -535,7 +535,7 @@ export default {
   'Execute time': '执行时间',
   'Complement range': '补数范围',
   slot: 'slot数量',
-  taskManager: 'taskManage数量',
+  taskManager: 'taskManager数量',
   jobManagerMemory: 'jobManager内存数',
   taskManagerMemory: 'taskManager内存数',
   'Http Url': '请求地址',
diff --git a/pom.xml b/pom.xml
index 796a5a0..c4f526c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -841,7 +841,7 @@
                         <include>**/server/utils/DataxUtilsTest.java</include>
                         <include>**/server/utils/ExecutionContextTestUtils.java</include>
                         <include>**/server/utils/HostTest.java</include>
-                        <!--<include>**/server/utils/FlinkArgsUtilsTest.java</include>-->
+                        <include>**/server/utils/FlinkArgsUtilsTest.java</include>
                         <include>**/server/utils/LogUtilsTest.java</include>
                         <include>**/server/utils/ParamUtilsTest.java</include>
                         <include>**/server/utils/ProcessUtilsTest.java</include>