You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by so...@apache.org on 2022/02/19 09:19:03 UTC

[dolphinscheduler] branch dev updated: [Feature][UI Next] Add flink task (#8446)

This is an automated email from the ASF dual-hosted git repository.

songjian pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new c0348a3  [Feature][UI Next] Add flink task (#8446)
c0348a3 is described below

commit c0348a3cdf18aca020251806dcb5541884faed29
Author: Devosend <de...@gmail.com>
AuthorDate: Sat Feb 19 17:18:55 2022 +0800

    [Feature][UI Next] Add flink task (#8446)
    
    * Add flink task
    
    * delete redundant file
---
 .../src/locales/modules/en_US.ts                   |  18 +-
 .../src/locales/modules/zh_CN.ts                   |  17 +-
 .../src/service/modules/resources/index.ts         |   4 +-
 .../projects/task/components/node/fields/index.ts  |   1 +
 .../task/components/node/fields/use-flink.ts       | 343 +++++++++++++++++++++
 .../projects/task/components/node/format-data.ts   |  32 +-
 .../task/components/node/tasks/use-flink.ts        |  88 ++++++
 .../views/projects/task/components/node/types.ts   |   6 +
 .../projects/task/components/node/use-task.ts      |   9 +
 9 files changed, 502 insertions(+), 16 deletions(-)

diff --git a/dolphinscheduler-ui-next/src/locales/modules/en_US.ts b/dolphinscheduler-ui-next/src/locales/modules/en_US.ts
index 8ca116a..1834718 100644
--- a/dolphinscheduler-ui-next/src/locales/modules/en_US.ts
+++ b/dolphinscheduler-ui-next/src/locales/modules/en_US.ts
@@ -637,7 +637,23 @@ const project = {
     main_arguments_tips: 'Please enter main arguments',
     option_parameters: 'Option Parameters',
     option_parameters_tips: 'Please enter option parameters',
-    positive_integer_tips: 'should be a positive integer'
+    positive_integer_tips: 'should be a positive integer',
+    flink_version: 'Flink Version',
+    job_manager_memory: 'JobManager Memory',
+    job_manager_memory_tips: 'Please enter JobManager memory',
+    task_manager_memory: 'TaskManager Memory',
+    task_manager_memory_tips: 'Please enter TaskManager memory',
+    slot_number: 'Slot Number',
+    slot_number_tips: 'Please enter Slot number',
+    parallelism: 'Parallelism',
+    custom_parallelism: 'Configure parallelism',
+    parallelism_tips: 'Please enter Parallelism',
+    parallelism_number_tips: 'Parallelism number should be positive integer',
+    parallelism_complement_tips:
+      'If there are a large number of tasks requiring complement, you can use the custom parallelism to ' +
+      'set the complement task thread to a reasonable value to avoid too large impact on the server.',
+    task_manager_number: 'TaskManager Number',
+    task_manager_number_tips: 'Please enter TaskManager number'
   }
 }
 
diff --git a/dolphinscheduler-ui-next/src/locales/modules/zh_CN.ts b/dolphinscheduler-ui-next/src/locales/modules/zh_CN.ts
index 3103a53..83a93c1 100644
--- a/dolphinscheduler-ui-next/src/locales/modules/zh_CN.ts
+++ b/dolphinscheduler-ui-next/src/locales/modules/zh_CN.ts
@@ -630,7 +630,22 @@ const project = {
     main_arguments_tips: '请输入主程序参数',
     option_parameters: '选项参数',
     option_parameters_tips: '请输入选项参数',
-    positive_integer_tips: '应为正整数'
+    positive_integer_tips: '应为正整数',
+    flink_version: 'Flink版本',
+    job_manager_memory: 'JobManager内存数',
+    job_manager_memory_tips: '请输入JobManager内存数',
+    task_manager_memory: 'TaskManager内存数',
+    task_manager_memory_tips: '请输入TaskManager内存数',
+    slot_number: 'Slot数量',
+    slot_number_tips: '请输入Slot数量',
+    parallelism: '并行度',
+    custom_parallelism: '自定义并行度',
+    parallelism_tips: '请输入并行度',
+    parallelism_number_tips: '并行度必须为正整数',
+    parallelism_complement_tips:
+      '如果存在大量任务需要补数时,可以利用自定义并行度将补数的任务线程设置成合理的数值,避免对服务器造成过大的影响',
+    task_manager_number: 'TaskManager数量',
+    task_manager_number_tips: '请输入TaskManager数量'
   }
 }
 
diff --git a/dolphinscheduler-ui-next/src/service/modules/resources/index.ts b/dolphinscheduler-ui-next/src/service/modules/resources/index.ts
index a662ab6..62bb592 100644
--- a/dolphinscheduler-ui-next/src/service/modules/resources/index.ts
+++ b/dolphinscheduler-ui-next/src/service/modules/resources/index.ts
@@ -118,7 +118,9 @@ export function onlineCreateResource(
   })
 }
 
-export function queryResourceByProgramType(params: ResourceTypeReq): any {
+export function queryResourceByProgramType(
+  params: ResourceTypeReq & ProgramTypeReq
+): any {
   return axios({
     url: '/resources/query-by-type',
     method: 'get',
diff --git a/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/index.ts b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/index.ts
index 6aaf106..3d45226 100644
--- a/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/index.ts
+++ b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/index.ts
@@ -33,3 +33,4 @@ export { useChildNode } from './use-child-node'
 export { useShell } from './use-shell'
 export { useSpark } from './use-spark'
 export { useMr } from './use-mr'
+export { useFlink } from './use-flink'
diff --git a/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-flink.ts b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-flink.ts
new file mode 100644
index 0000000..8b2c829
--- /dev/null
+++ b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-flink.ts
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import { ref, onMounted, computed } from 'vue'
+import { useI18n } from 'vue-i18n'
+import { queryResourceByProgramType } from '@/service/modules/resources'
+import { removeUselessChildren } from './use-shell'
+import type { IJsonItem } from '../types'
+
+export function useFlink(model: { [field: string]: any }): IJsonItem[] {
+  const { t } = useI18n()
+
+  const mainClassSpan = computed(() =>
+    model.programType === 'PYTHON' ? 0 : 24
+  )
+
+  const taskManagerNumberSpan = computed(() =>
+    model.flinkVersion === '<1.10' && model.deployMode === 'cluster' ? 12 : 0
+  )
+
+  const deployModeSpan = computed(() =>
+    model.deployMode === 'cluster' ? 12 : 0
+  )
+
+  const mainJarOptions = ref([])
+  const resources: { [field: string]: any } = {}
+
+  const getResourceList = async (programType: string) => {
+    if (resources[programType] !== void 0) {
+      mainJarOptions.value = resources[programType]
+      return
+    }
+    try {
+      const res = await queryResourceByProgramType({
+        type: 'FILE',
+        programType
+      })
+      removeUselessChildren(res)
+      mainJarOptions.value = res || []
+      resources[programType] = res
+    } catch (err) {}
+  }
+
+  onMounted(() => {
+    getResourceList(model.programType)
+  })
+
+  return [
+    {
+      type: 'select',
+      field: 'programType',
+      span: 12,
+      name: t('project.node.program_type'),
+      options: PROGRAM_TYPES,
+      props: {
+        'on-update:value': (value: string) => {
+          model.mainJar = null
+          model.mainClass = ''
+          getResourceList(value)
+        }
+      },
+      value: model.programType
+    },
+    {
+      type: 'input',
+      field: 'mainClass',
+      span: mainClassSpan,
+      name: t('project.node.main_class'),
+      props: {
+        placeholder: t('project.node.main_class_tips')
+      },
+      validate: {
+        trigger: ['input', 'blur'],
+        required: model.programType !== 'PYTHON',
+        validator(validate: any, value: string) {
+          if (model.programType !== 'PYTHON' && !value) {
+            return new Error(t('project.node.main_class_tips'))
+          }
+        }
+      }
+    },
+    {
+      type: 'tree-select',
+      field: 'mainJar',
+      name: t('project.node.main_package'),
+      props: {
+        cascade: true,
+        showPath: true,
+        checkStrategy: 'child',
+        placeholder: t('project.node.main_package_tips'),
+        keyField: 'id',
+        labelField: 'fullName'
+      },
+      validate: {
+        trigger: ['input', 'blur'],
+        required: model.programType !== 'PYTHON',
+        validator(validate: any, value: string) {
+          if (!value) {
+            return new Error(t('project.node.main_package_tips'))
+          }
+        }
+      },
+      options: mainJarOptions
+    },
+    {
+      type: 'radio',
+      field: 'deployMode',
+      name: t('project.node.deploy_mode'),
+      options: DeployModes
+    },
+    {
+      type: 'select',
+      field: 'flinkVersion',
+      span: 12,
+      name: t('project.node.flink_version'),
+      options: FLINK_VERSIONS,
+      value: model.flinkVersion
+    },
+    {
+      type: 'input',
+      field: 'appName',
+      name: t('project.node.app_name'),
+      props: {
+        placeholder: t('project.node.app_name_tips')
+      }
+    },
+    {
+      type: 'input',
+      field: 'jobManagerMemory',
+      name: t('project.node.job_manager_memory'),
+      span: deployModeSpan,
+      props: {
+        placeholder: t('project.node.job_manager_memory_tips'),
+        min: 1
+      },
+      validate: {
+        trigger: ['input', 'blur'],
+        validator(validate: any, value: string) {
+          if (!value) {
+            return
+          }
+          if (!Number.isInteger(parseInt(value))) {
+            return new Error(
+              t('project.node.job_manager_memory_tips') +
+                t('project.node.positive_integer_tips')
+            )
+          }
+        }
+      }
+    },
+    {
+      type: 'input',
+      field: 'taskManagerMemory',
+      name: t('project.node.task_manager_memory'),
+      span: deployModeSpan,
+      props: {
+        placeholder: t('project.node.task_manager_memory_tips')
+      },
+      validate: {
+        trigger: ['input', 'blur'],
+        validator(validate: any, value: string) {
+          if (!value) {
+            return
+          }
+          if (!Number.isInteger(parseInt(value))) {
+            return new Error(
+              t('project.node.task_manager_memory') +
+                t('project.node.positive_integer_tips')
+            )
+          }
+        }
+      },
+      value: model.taskManagerMemory
+    },
+    {
+      type: 'input-number',
+      field: 'slot',
+      name: t('project.node.slot_number'),
+      span: deployModeSpan,
+      props: {
+        placeholder: t('project.node.slot_number_tips'),
+        min: 1
+      },
+      value: model.slot
+    },
+    {
+      type: 'input-number',
+      field: 'taskManager',
+      name: t('project.node.task_manager_number'),
+      span: taskManagerNumberSpan,
+      props: {
+        placeholder: t('project.node.task_manager_number_tips')
+      },
+      value: model.taskManager
+    },
+    {
+      type: 'input-number',
+      field: 'parallelism',
+      name: t('project.node.parallelism'),
+      span: 12,
+      props: {
+        placeholder: t('project.node.parallelism_tips'),
+        min: 1
+      },
+      validate: {
+        trigger: ['input', 'blur'],
+        required: true,
+        validator(validate: any, value: string) {
+          if (!value) {
+            return new Error(t('project.node.parallelism_tips'))
+          }
+        }
+      },
+      value: model.parallelism
+    },
+    {
+      type: 'input',
+      field: 'mainArgs',
+      name: t('project.node.main_arguments'),
+      props: {
+        type: 'textarea',
+        placeholder: t('project.node.main_arguments_tips')
+      }
+    },
+    {
+      type: 'input',
+      field: 'others',
+      name: t('project.node.option_parameters'),
+      props: {
+        type: 'textarea',
+        placeholder: t('project.node.option_parameters_tips')
+      }
+    },
+    {
+      type: 'tree-select',
+      field: 'resourceList',
+      name: t('project.node.resources'),
+      options: mainJarOptions,
+      props: {
+        multiple: true,
+        checkable: true,
+        cascade: true,
+        showPath: true,
+        checkStrategy: 'child',
+        placeholder: t('project.node.resources_tips'),
+        keyField: 'id',
+        labelField: 'name'
+      }
+    },
+    {
+      type: 'custom-parameters',
+      field: 'localParams',
+      name: t('project.node.custom_parameters'),
+      children: [
+        {
+          type: 'input',
+          field: 'prop',
+          span: 10,
+          props: {
+            placeholder: t('project.node.prop_tips'),
+            maxLength: 256
+          },
+          validate: {
+            trigger: ['input', 'blur'],
+            required: true,
+            validator(validate: any, value: string) {
+              if (!value) {
+                return new Error(t('project.node.prop_tips'))
+              }
+
+              const sameItems = model.localParams.filter(
+                (item: { prop: string }) => item.prop === value
+              )
+
+              if (sameItems.length > 1) {
+                return new Error(t('project.node.prop_repeat'))
+              }
+            }
+          }
+        },
+        {
+          type: 'input',
+          field: 'value',
+          span: 10,
+          props: {
+            placeholder: t('project.node.value_tips'),
+            maxLength: 256
+          }
+        }
+      ]
+    }
+  ]
+}
+
+const PROGRAM_TYPES = [
+  {
+    label: 'JAVA',
+    value: 'JAVA'
+  },
+  {
+    label: 'SCALA',
+    value: 'SCALA'
+  },
+  {
+    label: 'PYTHON',
+    value: 'PYTHON'
+  }
+]
+
+const FLINK_VERSIONS = [
+  {
+    label: '<1.10',
+    value: '<1.10'
+  },
+  {
+    label: '>=1.10',
+    value: '>=1.10'
+  }
+]
+
+const DeployModes = [
+  {
+    label: 'cluster',
+    value: 'cluster'
+  },
+  {
+    label: 'local',
+    value: 'local'
+  }
+]
diff --git a/dolphinscheduler-ui-next/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui-next/src/views/projects/task/components/node/format-data.ts
index a7af766..add6196 100644
--- a/dolphinscheduler-ui-next/src/views/projects/task/components/node/format-data.ts
+++ b/dolphinscheduler-ui-next/src/views/projects/task/components/node/format-data.ts
@@ -24,32 +24,38 @@ export function formatParams(data: INodeData): {
   taskDefinitionJsonObj: object
 } {
   const taskParams: ITaskParams = {}
-  if (data.taskType === 'SPARK') {
+  if (
+    data.taskType === 'SPARK' ||
+    data.taskType === 'MR' ||
+    data.taskType === 'FLINK'
+  ) {
     taskParams.programType = data.programType
-    taskParams.sparkVersion = data.sparkVersion
     taskParams.mainClass = data.mainClass
     if (data.mainJar) {
       taskParams.mainJar = { id: data.mainJar }
     }
     taskParams.deployMode = data.deployMode
     taskParams.appName = data.appName
+    taskParams.mainArgs = data.mainArgs
+    taskParams.others = data.others
+  }
+
+  if (data.taskType === 'SPARK') {
+    taskParams.sparkVersion = data.sparkVersion
     taskParams.driverCores = data.driverCores
     taskParams.driverMemory = data.driverMemory
     taskParams.numExecutors = data.numExecutors
     taskParams.executorMemory = data.executorMemory
     taskParams.executorCores = data.executorCores
-    taskParams.mainArgs = data.mainArgs
-    taskParams.others = data.others
   }
-  if (data.taskType === 'MR') {
-    taskParams.programType = data.programType
-    taskParams.mainClass = data.mainClass
-    if (data.mainJar) {
-      taskParams.mainJar = { id: data.mainJar }
-    }
-    taskParams.appName = data.appName
-    taskParams.mainArgs = data.mainArgs
-    taskParams.others = data.others
+
+  if (data.taskType === 'FLINK') {
+    taskParams.flinkVersion = data.flinkVersion
+    taskParams.jobManagerMemory = data.jobManagerMemory
+    taskParams.taskManagerMemory = data.taskManagerMemory
+    taskParams.slot = data.slot
+    taskParams.taskManager = data.taskManager
+    taskParams.parallelism = data.parallelism
   }
 
   const params = {
diff --git a/dolphinscheduler-ui-next/src/views/projects/task/components/node/tasks/use-flink.ts b/dolphinscheduler-ui-next/src/views/projects/task/components/node/tasks/use-flink.ts
new file mode 100644
index 0000000..5b0f7e0
--- /dev/null
+++ b/dolphinscheduler-ui-next/src/views/projects/task/components/node/tasks/use-flink.ts
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { reactive } from 'vue'
+import * as Fields from '../fields/index'
+import type { IJsonItem, INodeData, ITaskData } from '../types'
+
+export function useFlink({
+  projectCode,
+  from = 0,
+  readonly,
+  data
+}: {
+  projectCode: number
+  from?: number
+  readonly?: boolean
+  data?: ITaskData
+}) {
+  const model = reactive<INodeData>({
+    name: '',
+    flag: 'YES',
+    description: '',
+    timeoutFlag: false,
+    localParams: [],
+    environmentCode: null,
+    failRetryInterval: 1,
+    failRetryTimes: 0,
+    workerGroup: 'default',
+    delayTime: 0,
+    timeout: 30,
+    programType: 'SCALA',
+    deployMode: 'cluster',
+    flinkVersion: '<1.10',
+    jobManagerMemory: '1G',
+    taskManagerMemory: '2G',
+    slot: 1,
+    taskManager: 2,
+    parallelism: 1
+  })
+
+  let extra: IJsonItem[] = []
+  if (from === 1) {
+    extra = [
+      Fields.useTaskType(model, readonly),
+      Fields.useProcessName({
+        model,
+        projectCode,
+        isCreate: !data?.id,
+        from,
+        processName: data?.processName,
+        code: data?.code
+      })
+    ]
+  }
+
+  return {
+    json: [
+      Fields.useName(),
+      ...extra,
+      Fields.useRunFlag(),
+      Fields.useDescription(),
+      Fields.useTaskPriority(),
+      Fields.useWorkerGroup(),
+      Fields.useEnvironmentName(model, !data?.id),
+      ...Fields.useTaskGroup(model, projectCode),
+      ...Fields.useFailed(),
+      Fields.useDelayTime(model),
+      ...Fields.useTimeoutAlarm(model),
+      ...Fields.useFlink(model),
+      Fields.usePreTasks(model)
+    ] as IJsonItem[],
+    model
+  }
+}
diff --git a/dolphinscheduler-ui-next/src/views/projects/task/components/node/types.ts b/dolphinscheduler-ui-next/src/views/projects/task/components/node/types.ts
index 18f2312..43fb597 100644
--- a/dolphinscheduler-ui-next/src/views/projects/task/components/node/types.ts
+++ b/dolphinscheduler-ui-next/src/views/projects/task/components/node/types.ts
@@ -47,6 +47,12 @@ interface ITaskParams {
   rawScript?: string
   programType?: string
   sparkVersion?: string
+  flinkVersion?: string
+  jobManagerMemory?: string
+  taskManagerMemory?: string
+  slot?: number
+  taskManager?: number
+  parallelism?: number
   mainClass?: string
   deployMode?: string
   appName?: string
diff --git a/dolphinscheduler-ui-next/src/views/projects/task/components/node/use-task.ts b/dolphinscheduler-ui-next/src/views/projects/task/components/node/use-task.ts
index 354dd43..b010229 100644
--- a/dolphinscheduler-ui-next/src/views/projects/task/components/node/use-task.ts
+++ b/dolphinscheduler-ui-next/src/views/projects/task/components/node/use-task.ts
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 
+import { useFlink } from './tasks/use-flink'
 import { useShell } from './tasks/use-shell'
 import { useSubProcess } from './tasks/use-sub-process'
 import { usePython } from './tasks/use-python'
@@ -75,5 +76,13 @@ export function useTask({
       data
     })
   }
+  if (taskType === 'FLINK') {
+    node = useFlink({
+      projectCode,
+      from,
+      readonly,
+      data
+    })
+  }
   return node
 }