You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by li...@apache.org on 2020/06/12 10:23:38 UTC

[incubator-dolphinscheduler] branch dev updated: Compatible with flink1.10 or newer (#2952)

This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 284c50f  Compatible with flink1.10 or newer (#2952)
284c50f is described below

commit 284c50f66cb7ce5466bb987d7e6966683962c3a6
Author: lijufeng2016 <92...@qq.com>
AuthorDate: Fri Jun 12 18:23:28 2020 +0800

    Compatible with flink1.10 or newer (#2952)
    
    * 兼容flink1.10以上版本
    
    * fix null point bug
    
    Co-authored-by: 李巨丰 <li...@2345.com>
    Co-authored-by: dailidong <da...@gmail.com>
---
 .../common/task/flink/FlinkParameters.java         | 13 +++++
 .../server/utils/FlinkArgsUtils.java               | 18 +++---
 .../server/utils/FlinkArgsUtilsTest.java           |  4 +-
 .../pages/dag/_source/formModel/tasks/flink.vue    | 67 +++++++++++++++-------
 .../src/js/module/i18n/locale/en_US.js             |  1 +
 .../src/js/module/i18n/locale/zh_CN.js             |  1 +
 6 files changed, 75 insertions(+), 29 deletions(-)

diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java
index 05cbb1d..b89a920 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java
@@ -91,6 +91,11 @@ public class FlinkParameters extends AbstractParameters {
   private String others;
 
   /**
+   * flink version
+   */
+  private String flinkVersion;
+
+  /**
    * program type
    * 0 JAVA,1 SCALA,2 PYTHON
    */
@@ -200,6 +205,14 @@ public class FlinkParameters extends AbstractParameters {
     this.programType = programType;
   }
 
+  public String getFlinkVersion() {
+    return flinkVersion;
+  }
+
+  public void setFlinkVersion(String flinkVersion) {
+    this.flinkVersion = flinkVersion;
+  }
+
   @Override
   public boolean checkParameters() {
     return mainJar != null && programType != null;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java
index 12c7eb2..eaaafc9 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java
@@ -28,10 +28,12 @@ import java.util.List;
 
 
 /**
- *  spark args utils
+ * flink args utils
  */
 public class FlinkArgsUtils {
     private static final String LOCAL_DEPLOY_MODE = "local";
+    private static final String FLINK_VERSION_BEFORE_1_10 = "<1.10";
+
     /**
      * build args
      * @param param flink parameters
@@ -44,7 +46,6 @@ public class FlinkArgsUtils {
         String tmpDeployMode = param.getDeployMode();
         if (StringUtils.isNotEmpty(tmpDeployMode)) {
             deployMode = tmpDeployMode;
-
         }
         if (!LOCAL_DEPLOY_MODE.equals(deployMode)) {
             args.add(Constants.FLINK_RUN_MODE);  //-m
@@ -63,12 +64,15 @@ public class FlinkArgsUtils {
                 args.add(appName);
             }
 
-            int taskManager = param.getTaskManager();
-            if (taskManager != 0) {                        //-yn
-                args.add(Constants.FLINK_TASK_MANAGE);
-                args.add(String.format("%d", taskManager));
+            // judgy flink version,from flink1.10,the parameter -yn removed
+            String flinkVersion = param.getFlinkVersion();
+            if (FLINK_VERSION_BEFORE_1_10.equals(flinkVersion)) {
+                int taskManager = param.getTaskManager();
+                if (taskManager != 0) {                        //-yn
+                    args.add(Constants.FLINK_TASK_MANAGE);
+                    args.add(String.format("%d", taskManager));
+                }
             }
-
             String jobManagerMemory = param.getJobManagerMemory();
             if (StringUtils.isNotEmpty(jobManagerMemory)) {
                 args.add(Constants.FLINK_JOB_MANAGE_MEM);
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java
index 2e4861e..a315232 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java
@@ -49,6 +49,7 @@ public class FlinkArgsUtilsTest {
     public String mainArgs = "testArgs";
     public String queue = "queue1";
     public String others = "--input file:///home";
+    public String flinkVersion = "<1.10";
 
 
     @Before
@@ -79,6 +80,7 @@ public class FlinkArgsUtilsTest {
         param.setMainArgs(mainArgs);
         param.setQueue(queue);
         param.setOthers(others);
+        param.setFlinkVersion(flinkVersion);
 
         //Invoke buildArgs
         List<String> result = FlinkArgsUtils.buildArgs(param);
@@ -128,4 +130,4 @@ public class FlinkArgsUtilsTest {
         assertEquals(5, result.size());
 
     }
-}
\ No newline at end of file
+}
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue
index 3f85f36..2e87721 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue
@@ -62,56 +62,73 @@
         </x-radio-group>
       </div>
     </m-list-box>
+    <m-list-box>
+      <div slot="text">{{$t('Flink Version')}}</div>
+      <div slot="content">
+        <x-select
+          style="width: 100px;"
+          v-model="flinkVersion"
+          :disabled="isDetails">
+          <x-option
+            v-for="version in flinkVersionList"
+            :key="version.code"
+            :value="version.code"
+            :label="version.code">
+          </x-option>
+        </x-select>
+      </div>
+    </m-list-box>
     <div class="list-box-4p">
       <div class="clearfix list">
-        <span class="sp1">{{$t('slot')}}</span>
+        <span class="sp1" style="word-break:break-all">{{$t('jobManagerMemory')}}</span>
         <span class="sp2">
           <x-input
-                  :disabled="isDetails"
-                  type="input"
-                  v-model="slot"
-                  :placeholder="$t('Please enter driver core number')"
-                  style="width: 200px;"
-                  autocomplete="off">
+            :disabled="isDetails"
+            type="input"
+            v-model="jobManagerMemory"
+            :placeholder="$t('Please enter the number of Executor')"
+            style="width: 200px;"
+            autocomplete="off">
         </x-input>
         </span>
-        <span class="sp1 sp3">{{$t('taskManager')}}</span>
+        <span class="sp1 sp3">{{$t('taskManagerMemory')}}</span>
         <span class="sp2">
           <x-input
-                  :disabled="isDetails"
-                  type="input"
-                  v-model="taskManager"
-                  :placeholder="$t('Please enter driver memory use')"
-                  style="width: 186px;"
-                  autocomplete="off">
+            :disabled="isDetails"
+            type="input"
+            v-model="taskManagerMemory"
+            :placeholder="$t('Please enter the Executor memory')"
+            style="width: 186px;"
+            autocomplete="off">
         </x-input>
         </span>
       </div>
       <div class="clearfix list">
-        <span class="sp1" style="word-break:break-all">{{$t('jobManagerMemory')}}</span>
+        <span class="sp1">{{$t('slot')}}</span>
         <span class="sp2">
           <x-input
                   :disabled="isDetails"
                   type="input"
-                  v-model="jobManagerMemory"
-                  :placeholder="$t('Please enter the number of Executor')"
+                  v-model="slot"
+                  :placeholder="$t('Please enter driver core number')"
                   style="width: 200px;"
                   autocomplete="off">
         </x-input>
         </span>
-        <span class="sp1 sp3">{{$t('taskManagerMemory')}}</span>
+        <div v-if="flinkVersion !== '>=1.10'">
+        <span class="sp1 sp3">{{$t('taskManager')}}</span>
         <span class="sp2">
           <x-input
                   :disabled="isDetails"
                   type="input"
-                  v-model="taskManagerMemory"
-                  :placeholder="$t('Please enter the Executor memory')"
+                  v-model="taskManager"
+                  :placeholder="$t('Please enter driver memory use')"
                   style="width: 186px;"
                   autocomplete="off">
         </x-input>
         </span>
+        </div>
       </div>
-
     </div>
     <m-list-box>
       <div slot="text">{{$t('Command-line parameters')}}</div>
@@ -207,6 +224,11 @@
         programType: 'SCALA',
         // Program type(List)
         programTypeList: [{ code: 'JAVA' }, { code: 'SCALA' }, { code: 'PYTHON' }],
+
+        flinkVersion:'<1.10',
+        // Flink Versions(List)
+        flinkVersionList: [{ code: '<1.10' }, { code: '>=1.10' }],
+
         normalizer(node) {
           return {
             label: node.name
@@ -324,6 +346,7 @@
             return {id: v}
           }),
           localParams: this.localParams,
+          flinkVersion: this.flinkVersion,
           slot: this.slot,
           taskManager: this.taskManager,
           jobManagerMemory: this.jobManagerMemory,
@@ -485,11 +508,13 @@
             this.mainJar = o.params.mainJar.id || ''
           }
           this.deployMode = o.params.deployMode || ''
+          this.flinkVersion = o.params.flinkVersion || '<1.10'
           this.slot = o.params.slot || 1
           this.taskManager = o.params.taskManager || '2'
           this.jobManagerMemory = o.params.jobManagerMemory || '1G'
           this.taskManagerMemory = o.params.taskManagerMemory || '2G'
 
+
           this.mainArgs = o.params.mainArgs || ''
           this.others = o.params.others
           this.programType = o.params.programType || 'SCALA'
diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
index fde1534..e47b680 100755
--- a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
+++ b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
@@ -98,6 +98,7 @@ export default {
   Script: 'Script',
   'Please enter script(required)': 'Please enter script(required)',
   'Deploy Mode': 'Deploy Mode',
+  'Flink Version':'Flink Version',
   'Driver core number': 'Driver core number',
   'Please enter driver core number': 'Please enter driver core number',
   'Driver memory use': 'Driver memory use',
diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
index 9ad3923..c9bb0e2 100755
--- a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
+++ b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
@@ -99,6 +99,7 @@ export default {
   Script: '脚本',
   'Please enter script(required)': '请输入脚本(必填)',
   'Deploy Mode': '部署方式',
+  'Flink Version': 'Flink版本',
   'Driver core number': 'Driver内核数',
   'Please enter driver core number': '请输入Driver内核数',
   'Driver memory use': 'Driver内存数',