You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by so...@apache.org on 2022/02/19 09:19:03 UTC
[dolphinscheduler] branch dev updated: [Feature][UI Next] Add flink task (#8446)
This is an automated email from the ASF dual-hosted git repository.
songjian pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new c0348a3 [Feature][UI Next] Add flink task (#8446)
c0348a3 is described below
commit c0348a3cdf18aca020251806dcb5541884faed29
Author: Devosend <de...@gmail.com>
AuthorDate: Sat Feb 19 17:18:55 2022 +0800
[Feature][UI Next] Add flink task (#8446)
* Add flink task
* delete redundant file
---
.../src/locales/modules/en_US.ts | 18 +-
.../src/locales/modules/zh_CN.ts | 17 +-
.../src/service/modules/resources/index.ts | 4 +-
.../projects/task/components/node/fields/index.ts | 1 +
.../task/components/node/fields/use-flink.ts | 343 +++++++++++++++++++++
.../projects/task/components/node/format-data.ts | 32 +-
.../task/components/node/tasks/use-flink.ts | 88 ++++++
.../views/projects/task/components/node/types.ts | 6 +
.../projects/task/components/node/use-task.ts | 9 +
9 files changed, 502 insertions(+), 16 deletions(-)
diff --git a/dolphinscheduler-ui-next/src/locales/modules/en_US.ts b/dolphinscheduler-ui-next/src/locales/modules/en_US.ts
index 8ca116a..1834718 100644
--- a/dolphinscheduler-ui-next/src/locales/modules/en_US.ts
+++ b/dolphinscheduler-ui-next/src/locales/modules/en_US.ts
@@ -637,7 +637,23 @@ const project = {
main_arguments_tips: 'Please enter main arguments',
option_parameters: 'Option Parameters',
option_parameters_tips: 'Please enter option parameters',
- positive_integer_tips: 'should be a positive integer'
+ positive_integer_tips: 'should be a positive integer',
+ flink_version: 'Flink Version',
+ job_manager_memory: 'JobManager Memory',
+ job_manager_memory_tips: 'Please enter JobManager memory',
+ task_manager_memory: 'TaskManager Memory',
+ task_manager_memory_tips: 'Please enter TaskManager memory',
+ slot_number: 'Slot Number',
+ slot_number_tips: 'Please enter Slot number',
+ parallelism: 'Parallelism',
+ custom_parallelism: 'Configure parallelism',
+ parallelism_tips: 'Please enter Parallelism',
+ parallelism_number_tips: 'Parallelism number should be positive integer',
+ parallelism_complement_tips:
+ 'If there are a large number of tasks requiring complement, you can use the custom parallelism to ' +
+ 'set the complement task thread to a reasonable value to avoid too large impact on the server.',
+ task_manager_number: 'TaskManager Number',
+ task_manager_number_tips: 'Please enter TaskManager number'
}
}
diff --git a/dolphinscheduler-ui-next/src/locales/modules/zh_CN.ts b/dolphinscheduler-ui-next/src/locales/modules/zh_CN.ts
index 3103a53..83a93c1 100644
--- a/dolphinscheduler-ui-next/src/locales/modules/zh_CN.ts
+++ b/dolphinscheduler-ui-next/src/locales/modules/zh_CN.ts
@@ -630,7 +630,22 @@ const project = {
main_arguments_tips: '请输入主程序参数',
option_parameters: '选项参数',
option_parameters_tips: '请输入选项参数',
- positive_integer_tips: '应为正整数'
+ positive_integer_tips: '应为正整数',
+ flink_version: 'Flink版本',
+ job_manager_memory: 'JobManager内存数',
+ job_manager_memory_tips: '请输入JobManager内存数',
+ task_manager_memory: 'TaskManager内存数',
+ task_manager_memory_tips: '请输入TaskManager内存数',
+ slot_number: 'Slot数量',
+ slot_number_tips: '请输入Slot数量',
+ parallelism: '并行度',
+ custom_parallelism: '自定义并行度',
+ parallelism_tips: '请输入并行度',
+ parallelism_number_tips: '并行度必须为正整数',
+ parallelism_complement_tips:
+ '如果存在大量任务需要补数时,可以利用自定义并行度将补数的任务线程设置成合理的数值,避免对服务器造成过大的影响',
+ task_manager_number: 'TaskManager数量',
+ task_manager_number_tips: '请输入TaskManager数量'
}
}
diff --git a/dolphinscheduler-ui-next/src/service/modules/resources/index.ts b/dolphinscheduler-ui-next/src/service/modules/resources/index.ts
index a662ab6..62bb592 100644
--- a/dolphinscheduler-ui-next/src/service/modules/resources/index.ts
+++ b/dolphinscheduler-ui-next/src/service/modules/resources/index.ts
@@ -118,7 +118,9 @@ export function onlineCreateResource(
})
}
-export function queryResourceByProgramType(params: ResourceTypeReq): any {
+export function queryResourceByProgramType(
+ params: ResourceTypeReq & ProgramTypeReq
+): any {
return axios({
url: '/resources/query-by-type',
method: 'get',
diff --git a/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/index.ts b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/index.ts
index 6aaf106..3d45226 100644
--- a/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/index.ts
+++ b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/index.ts
@@ -33,3 +33,4 @@ export { useChildNode } from './use-child-node'
export { useShell } from './use-shell'
export { useSpark } from './use-spark'
export { useMr } from './use-mr'
+export { useFlink } from './use-flink'
diff --git a/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-flink.ts b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-flink.ts
new file mode 100644
index 0000000..8b2c829
--- /dev/null
+++ b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-flink.ts
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import { ref, onMounted, computed } from 'vue'
+import { useI18n } from 'vue-i18n'
+import { queryResourceByProgramType } from '@/service/modules/resources'
+import { removeUselessChildren } from './use-shell'
+import type { IJsonItem } from '../types'
+
+export function useFlink(model: { [field: string]: any }): IJsonItem[] {
+ const { t } = useI18n()
+
+ const mainClassSpan = computed(() =>
+ model.programType === 'PYTHON' ? 0 : 24
+ )
+
+ const taskManagerNumberSpan = computed(() =>
+ model.flinkVersion === '<1.10' && model.deployMode === 'cluster' ? 12 : 0
+ )
+
+ const deployModeSpan = computed(() =>
+ model.deployMode === 'cluster' ? 12 : 0
+ )
+
+ const mainJarOptions = ref([])
+ const resources: { [field: string]: any } = {}
+
+ const getResourceList = async (programType: string) => {
+ if (resources[programType] !== void 0) {
+ mainJarOptions.value = resources[programType]
+ return
+ }
+ try {
+ const res = await queryResourceByProgramType({
+ type: 'FILE',
+ programType
+ })
+ removeUselessChildren(res)
+ mainJarOptions.value = res || []
+ resources[programType] = res
+ } catch (err) {}
+ }
+
+ onMounted(() => {
+ getResourceList(model.programType)
+ })
+
+ return [
+ {
+ type: 'select',
+ field: 'programType',
+ span: 12,
+ name: t('project.node.program_type'),
+ options: PROGRAM_TYPES,
+ props: {
+ 'on-update:value': (value: string) => {
+ model.mainJar = null
+ model.mainClass = ''
+ getResourceList(value)
+ }
+ },
+ value: model.programType
+ },
+ {
+ type: 'input',
+ field: 'mainClass',
+ span: mainClassSpan,
+ name: t('project.node.main_class'),
+ props: {
+ placeholder: t('project.node.main_class_tips')
+ },
+ validate: {
+ trigger: ['input', 'blur'],
+ required: model.programType !== 'PYTHON',
+ validator(validate: any, value: string) {
+ if (model.programType !== 'PYTHON' && !value) {
+ return new Error(t('project.node.main_class_tips'))
+ }
+ }
+ }
+ },
+ {
+ type: 'tree-select',
+ field: 'mainJar',
+ name: t('project.node.main_package'),
+ props: {
+ cascade: true,
+ showPath: true,
+ checkStrategy: 'child',
+ placeholder: t('project.node.main_package_tips'),
+ keyField: 'id',
+ labelField: 'fullName'
+ },
+ validate: {
+ trigger: ['input', 'blur'],
+ required: model.programType !== 'PYTHON',
+ validator(validate: any, value: string) {
+ if (!value) {
+ return new Error(t('project.node.main_package_tips'))
+ }
+ }
+ },
+ options: mainJarOptions
+ },
+ {
+ type: 'radio',
+ field: 'deployMode',
+ name: t('project.node.deploy_mode'),
+ options: DeployModes
+ },
+ {
+ type: 'select',
+ field: 'flinkVersion',
+ span: 12,
+ name: t('project.node.flink_version'),
+ options: FLINK_VERSIONS,
+ value: model.flinkVersion
+ },
+ {
+ type: 'input',
+ field: 'appName',
+ name: t('project.node.app_name'),
+ props: {
+ placeholder: t('project.node.app_name_tips')
+ }
+ },
+ {
+ type: 'input',
+ field: 'jobManagerMemory',
+ name: t('project.node.job_manager_memory'),
+ span: deployModeSpan,
+ props: {
+ placeholder: t('project.node.job_manager_memory_tips'),
+ min: 1
+ },
+ validate: {
+ trigger: ['input', 'blur'],
+ validator(validate: any, value: string) {
+ if (!value) {
+ return
+ }
+ if (!Number.isInteger(parseInt(value))) {
+ return new Error(
+ t('project.node.job_manager_memory_tips') +
+ t('project.node.positive_integer_tips')
+ )
+ }
+ }
+ }
+ },
+ {
+ type: 'input',
+ field: 'taskManagerMemory',
+ name: t('project.node.task_manager_memory'),
+ span: deployModeSpan,
+ props: {
+ placeholder: t('project.node.task_manager_memory_tips')
+ },
+ validate: {
+ trigger: ['input', 'blur'],
+ validator(validate: any, value: string) {
+ if (!value) {
+ return
+ }
+ if (!Number.isInteger(parseInt(value))) {
+ return new Error(
+ t('project.node.task_manager_memory') +
+ t('project.node.positive_integer_tips')
+ )
+ }
+ }
+ },
+ value: model.taskManagerMemory
+ },
+ {
+ type: 'input-number',
+ field: 'slot',
+ name: t('project.node.slot_number'),
+ span: deployModeSpan,
+ props: {
+ placeholder: t('project.node.slot_number_tips'),
+ min: 1
+ },
+ value: model.slot
+ },
+ {
+ type: 'input-number',
+ field: 'taskManager',
+ name: t('project.node.task_manager_number'),
+ span: taskManagerNumberSpan,
+ props: {
+ placeholder: t('project.node.task_manager_number_tips')
+ },
+ value: model.taskManager
+ },
+ {
+ type: 'input-number',
+ field: 'parallelism',
+ name: t('project.node.parallelism'),
+ span: 12,
+ props: {
+ placeholder: t('project.node.parallelism_tips'),
+ min: 1
+ },
+ validate: {
+ trigger: ['input', 'blur'],
+ required: true,
+ validator(validate: any, value: string) {
+ if (!value) {
+ return new Error(t('project.node.parallelism_tips'))
+ }
+ }
+ },
+ value: model.parallelism
+ },
+ {
+ type: 'input',
+ field: 'mainArgs',
+ name: t('project.node.main_arguments'),
+ props: {
+ type: 'textarea',
+ placeholder: t('project.node.main_arguments_tips')
+ }
+ },
+ {
+ type: 'input',
+ field: 'others',
+ name: t('project.node.option_parameters'),
+ props: {
+ type: 'textarea',
+ placeholder: t('project.node.option_parameters_tips')
+ }
+ },
+ {
+ type: 'tree-select',
+ field: 'resourceList',
+ name: t('project.node.resources'),
+ options: mainJarOptions,
+ props: {
+ multiple: true,
+ checkable: true,
+ cascade: true,
+ showPath: true,
+ checkStrategy: 'child',
+ placeholder: t('project.node.resources_tips'),
+ keyField: 'id',
+ labelField: 'name'
+ }
+ },
+ {
+ type: 'custom-parameters',
+ field: 'localParams',
+ name: t('project.node.custom_parameters'),
+ children: [
+ {
+ type: 'input',
+ field: 'prop',
+ span: 10,
+ props: {
+ placeholder: t('project.node.prop_tips'),
+ maxLength: 256
+ },
+ validate: {
+ trigger: ['input', 'blur'],
+ required: true,
+ validator(validate: any, value: string) {
+ if (!value) {
+ return new Error(t('project.node.prop_tips'))
+ }
+
+ const sameItems = model.localParams.filter(
+ (item: { prop: string }) => item.prop === value
+ )
+
+ if (sameItems.length > 1) {
+ return new Error(t('project.node.prop_repeat'))
+ }
+ }
+ }
+ },
+ {
+ type: 'input',
+ field: 'value',
+ span: 10,
+ props: {
+ placeholder: t('project.node.value_tips'),
+ maxLength: 256
+ }
+ }
+ ]
+ }
+ ]
+}
+
+const PROGRAM_TYPES = [
+ {
+ label: 'JAVA',
+ value: 'JAVA'
+ },
+ {
+ label: 'SCALA',
+ value: 'SCALA'
+ },
+ {
+ label: 'PYTHON',
+ value: 'PYTHON'
+ }
+]
+
+const FLINK_VERSIONS = [
+ {
+ label: '<1.10',
+ value: '<1.10'
+ },
+ {
+ label: '>=1.10',
+ value: '>=1.10'
+ }
+]
+
+const DeployModes = [
+ {
+ label: 'cluster',
+ value: 'cluster'
+ },
+ {
+ label: 'local',
+ value: 'local'
+ }
+]
diff --git a/dolphinscheduler-ui-next/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui-next/src/views/projects/task/components/node/format-data.ts
index a7af766..add6196 100644
--- a/dolphinscheduler-ui-next/src/views/projects/task/components/node/format-data.ts
+++ b/dolphinscheduler-ui-next/src/views/projects/task/components/node/format-data.ts
@@ -24,32 +24,38 @@ export function formatParams(data: INodeData): {
taskDefinitionJsonObj: object
} {
const taskParams: ITaskParams = {}
- if (data.taskType === 'SPARK') {
+ if (
+ data.taskType === 'SPARK' ||
+ data.taskType === 'MR' ||
+ data.taskType === 'FLINK'
+ ) {
taskParams.programType = data.programType
- taskParams.sparkVersion = data.sparkVersion
taskParams.mainClass = data.mainClass
if (data.mainJar) {
taskParams.mainJar = { id: data.mainJar }
}
taskParams.deployMode = data.deployMode
taskParams.appName = data.appName
+ taskParams.mainArgs = data.mainArgs
+ taskParams.others = data.others
+ }
+
+ if (data.taskType === 'SPARK') {
+ taskParams.sparkVersion = data.sparkVersion
taskParams.driverCores = data.driverCores
taskParams.driverMemory = data.driverMemory
taskParams.numExecutors = data.numExecutors
taskParams.executorMemory = data.executorMemory
taskParams.executorCores = data.executorCores
- taskParams.mainArgs = data.mainArgs
- taskParams.others = data.others
}
- if (data.taskType === 'MR') {
- taskParams.programType = data.programType
- taskParams.mainClass = data.mainClass
- if (data.mainJar) {
- taskParams.mainJar = { id: data.mainJar }
- }
- taskParams.appName = data.appName
- taskParams.mainArgs = data.mainArgs
- taskParams.others = data.others
+
+ if (data.taskType === 'FLINK') {
+ taskParams.flinkVersion = data.flinkVersion
+ taskParams.jobManagerMemory = data.jobManagerMemory
+ taskParams.taskManagerMemory = data.taskManagerMemory
+ taskParams.slot = data.slot
+ taskParams.taskManager = data.taskManager
+ taskParams.parallelism = data.parallelism
}
const params = {
diff --git a/dolphinscheduler-ui-next/src/views/projects/task/components/node/tasks/use-flink.ts b/dolphinscheduler-ui-next/src/views/projects/task/components/node/tasks/use-flink.ts
new file mode 100644
index 0000000..5b0f7e0
--- /dev/null
+++ b/dolphinscheduler-ui-next/src/views/projects/task/components/node/tasks/use-flink.ts
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { reactive } from 'vue'
+import * as Fields from '../fields/index'
+import type { IJsonItem, INodeData, ITaskData } from '../types'
+
+export function useFlink({
+ projectCode,
+ from = 0,
+ readonly,
+ data
+}: {
+ projectCode: number
+ from?: number
+ readonly?: boolean
+ data?: ITaskData
+}) {
+ const model = reactive<INodeData>({
+ name: '',
+ flag: 'YES',
+ description: '',
+ timeoutFlag: false,
+ localParams: [],
+ environmentCode: null,
+ failRetryInterval: 1,
+ failRetryTimes: 0,
+ workerGroup: 'default',
+ delayTime: 0,
+ timeout: 30,
+ programType: 'SCALA',
+ deployMode: 'cluster',
+ flinkVersion: '<1.10',
+ jobManagerMemory: '1G',
+ taskManagerMemory: '2G',
+ slot: 1,
+ taskManager: 2,
+ parallelism: 1
+ })
+
+ let extra: IJsonItem[] = []
+ if (from === 1) {
+ extra = [
+ Fields.useTaskType(model, readonly),
+ Fields.useProcessName({
+ model,
+ projectCode,
+ isCreate: !data?.id,
+ from,
+ processName: data?.processName,
+ code: data?.code
+ })
+ ]
+ }
+
+ return {
+ json: [
+ Fields.useName(),
+ ...extra,
+ Fields.useRunFlag(),
+ Fields.useDescription(),
+ Fields.useTaskPriority(),
+ Fields.useWorkerGroup(),
+ Fields.useEnvironmentName(model, !data?.id),
+ ...Fields.useTaskGroup(model, projectCode),
+ ...Fields.useFailed(),
+ Fields.useDelayTime(model),
+ ...Fields.useTimeoutAlarm(model),
+ ...Fields.useFlink(model),
+ Fields.usePreTasks(model)
+ ] as IJsonItem[],
+ model
+ }
+}
diff --git a/dolphinscheduler-ui-next/src/views/projects/task/components/node/types.ts b/dolphinscheduler-ui-next/src/views/projects/task/components/node/types.ts
index 18f2312..43fb597 100644
--- a/dolphinscheduler-ui-next/src/views/projects/task/components/node/types.ts
+++ b/dolphinscheduler-ui-next/src/views/projects/task/components/node/types.ts
@@ -47,6 +47,12 @@ interface ITaskParams {
rawScript?: string
programType?: string
sparkVersion?: string
+ flinkVersion?: string
+ jobManagerMemory?: string
+ taskManagerMemory?: string
+ slot?: number
+ taskManager?: number
+ parallelism?: number
mainClass?: string
deployMode?: string
appName?: string
diff --git a/dolphinscheduler-ui-next/src/views/projects/task/components/node/use-task.ts b/dolphinscheduler-ui-next/src/views/projects/task/components/node/use-task.ts
index 354dd43..b010229 100644
--- a/dolphinscheduler-ui-next/src/views/projects/task/components/node/use-task.ts
+++ b/dolphinscheduler-ui-next/src/views/projects/task/components/node/use-task.ts
@@ -15,6 +15,7 @@
* limitations under the License.
*/
+import { useFlink } from './tasks/use-flink'
import { useShell } from './tasks/use-shell'
import { useSubProcess } from './tasks/use-sub-process'
import { usePython } from './tasks/use-python'
@@ -75,5 +76,13 @@ export function useTask({
data
})
}
+ if (taskType === 'FLINK') {
+ node = useFlink({
+ projectCode,
+ from,
+ readonly,
+ data
+ })
+ }
return node
}