You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by wa...@apache.org on 2022/05/31 03:19:47 UTC

[dolphinscheduler] branch dev updated: [Feature][Task Plugin] Add support for dynamic form for zeppelin task plugin (#9977) (#10269)

This is an automated email from the ASF dual-hosted git repository.

wanggenhua pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new a8a5da367a [Feature][Task Plugin] Add support for dynamic form for zeppelin task plugin (#9977) (#10269)
a8a5da367a is described below

commit a8a5da367ad37911ee2400df894cdf8ff16fb73e
Author: Eric Gao <er...@gmail.com>
AuthorDate: Tue May 31 11:19:41 2022 +0800

    [Feature][Task Plugin] Add support for dynamic form for zeppelin task plugin (#9977) (#10269)
---
 docs/docs/en/guide/task/zeppelin.md                |  1 +
 docs/docs/zh/guide/task/zeppelin.md                |  1 +
 .../plugin/task/zeppelin/ZeppelinParameters.java   | 13 +++++++--
 .../plugin/task/zeppelin/ZeppelinTask.java         | 12 +++++++-
 .../plugin/task/zeppelin/ZeppelinTaskTest.java     | 34 +++++++++++++++-------
 dolphinscheduler-ui/src/locales/en_US/project.ts   |  3 ++
 dolphinscheduler-ui/src/locales/zh_CN/project.ts   |  3 ++
 .../task/components/node/fields/use-zeppelin.ts    |  8 +++++
 .../projects/task/components/node/format-data.ts   |  1 +
 9 files changed, 63 insertions(+), 13 deletions(-)

diff --git a/docs/docs/en/guide/task/zeppelin.md b/docs/docs/en/guide/task/zeppelin.md
index 6f5478ee75..08414e664a 100644
--- a/docs/docs/en/guide/task/zeppelin.md
+++ b/docs/docs/en/guide/task/zeppelin.md
@@ -22,6 +22,7 @@ it will call `Zeppelin Client API` to trigger zeppelin notebook paragraph. Click
 - Timeout alarm: Check the timeout alarm and timeout failure. When the task exceeds the "timeout period", an alarm email will send and the task execution will fail.
 - Zeppelin Note ID: The unique note id for a zeppelin notebook note.
 - Zeppelin Paragraph ID: The unique paragraph id for a zeppelin notebook paragraph.
+- Zeppelin Parameters: Parameters in json format used for zeppelin dynamic form.
 
 ## Task Example
 
diff --git a/docs/docs/zh/guide/task/zeppelin.md b/docs/docs/zh/guide/task/zeppelin.md
index 9461075c54..78285a0317 100644
--- a/docs/docs/zh/guide/task/zeppelin.md
+++ b/docs/docs/zh/guide/task/zeppelin.md
@@ -23,6 +23,7 @@
 - 前置任务:选择当前任务的前置任务,会将被选择的前置任务设置为当前任务的上游。
 - Zeppelin Note ID:Zeppelin Note对应的唯一ID。
 - Zepplin Paragraph:Zeppelin Paragraph对应的唯一ID。
+- Zeppelin Parameters: 用于传入Zeppelin Dynamic Form的参数。
 
 ## Task Example
 
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java
index 3617449755..b379985f5d 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java
@@ -32,17 +32,16 @@ public class ZeppelinParameters extends AbstractParameters {
      */
     private String noteId;
     private String paragraphId;
+    private String parameters;
 
     @Override
     public boolean checkParameters() {
-
         return StringUtils.isNotEmpty(this.noteId) && StringUtils.isNotEmpty(this.paragraphId);
     }
 
     @Override
     public List<ResourceInfo> getResourceFilesList() {
         return Collections.emptyList();
-
     }
 
     public String getNoteId() {
@@ -61,11 +60,21 @@ public class ZeppelinParameters extends AbstractParameters {
         this.paragraphId = paragraphId;
     }
 
+    public String getParameters() {
+        return parameters;
+    }
+
+    public void setParameters(String parameters) {
+        this.parameters = parameters;
+    }
+
     @Override
     public String toString() {
         return "ZeppelinParameters{" +
                 "noteId='" + noteId + '\'' +
                 ", paragraphId='" + paragraphId + '\'' +
+                ", parameters='" + parameters + '\'' +
                 '}';
     }
+
 }
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
index ad163bd8ab..eb32ac5996 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.plugin.task.zeppelin;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -28,6 +29,9 @@ import org.apache.zeppelin.client.ParagraphResult;
 import org.apache.zeppelin.client.Status;
 import org.apache.zeppelin.client.ZeppelinClient;
 
+import java.util.HashMap;
+import java.util.Map;
+
 
 public class ZeppelinTask extends AbstractTaskExecutor {
 
@@ -73,9 +77,15 @@ public class ZeppelinTask extends AbstractTaskExecutor {
         try {
             String noteId = this.zeppelinParameters.getNoteId();
             String paragraphId = this.zeppelinParameters.getParagraphId();
+            String parameters = this.zeppelinParameters.getParameters();
+            Map<String, String> zeppelinParamsMap = new HashMap<>();
+            if (parameters != null) {
+                ObjectMapper mapper = new ObjectMapper();
+                zeppelinParamsMap = mapper.readValue(parameters, Map.class);
+            }
 
             // Submit zeppelin task
-            ParagraphResult paragraphResult = this.zClient.executeParagraph(noteId, paragraphId);
+            ParagraphResult paragraphResult = this.zClient.executeParagraph(noteId, paragraphId, zeppelinParamsMap);
             String resultContent = paragraphResult.getResultInText();
             Status status = paragraphResult.getStatus();
             final int exitStatusCode = mapStatusToExitCode(status);
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
index 30fb346f5d..273e5b09f1 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
@@ -27,6 +27,7 @@ import static org.powermock.api.mockito.PowerMockito.mock;
 import static org.powermock.api.mockito.PowerMockito.spy;
 import static org.powermock.api.mockito.PowerMockito.when;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
 
@@ -44,17 +45,23 @@ import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.util.HashMap;
+import java.util.Map;
+
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({
         ZeppelinTask.class,
         ZeppelinClient.class,
+        ObjectMapper.class,
 })
 @PowerMockIgnore({"javax.*"})
 public class ZeppelinTaskTest {
 
     private static final String MOCK_NOTE_ID = "2GYJR92R7";
     private static final String MOCK_PARAGRAPH_ID = "paragraph_1648793472526_1771221396";
+    private static final String MOCK_PARAMETERS = "{\"key1\": \"value1\", \"key2\": \"value2\"}";
+    private final ObjectMapper mapper = new ObjectMapper();
 
     private ZeppelinClient zClient;
     private ZeppelinTask zeppelinTask;
@@ -73,7 +80,7 @@ public class ZeppelinTaskTest {
 
         // use mocked zClient in zeppelinTask
         doReturn(this.zClient).when(this.zeppelinTask, "getZeppelinClient");
-        when(this.zClient.executeParagraph(any(), any())).thenReturn(this.paragraphResult);
+        when(this.zClient.executeParagraph(any(), any(), any(Map.class))).thenReturn(this.paragraphResult);
         when(paragraphResult.getResultInText()).thenReturn("mock-zeppelin-paragraph-execution-result");
         this.zeppelinTask.init();
     }
@@ -82,7 +89,9 @@ public class ZeppelinTaskTest {
     public void testHandleWithParagraphExecutionSucess() throws Exception {
         when(this.paragraphResult.getStatus()).thenReturn(Status.FINISHED);
         this.zeppelinTask.handle();
-        Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID, MOCK_PARAGRAPH_ID);
+        Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID,
+                MOCK_PARAGRAPH_ID,
+                (Map<String, String>) mapper.readValue(MOCK_PARAMETERS, Map.class));
         Mockito.verify(this.paragraphResult).getResultInText();
         Mockito.verify(this.paragraphResult).getStatus();
         Assert.assertEquals(EXIT_CODE_SUCCESS, this.zeppelinTask.getExitStatusCode());
@@ -92,7 +101,9 @@ public class ZeppelinTaskTest {
     public void testHandleWithParagraphExecutionAborted() throws Exception {
         when(this.paragraphResult.getStatus()).thenReturn(Status.ABORT);
         this.zeppelinTask.handle();
-        Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID, MOCK_PARAGRAPH_ID);
+        Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID,
+                MOCK_PARAGRAPH_ID,
+                (Map<String, String>) mapper.readValue(MOCK_PARAMETERS, Map.class));
         Mockito.verify(this.paragraphResult).getResultInText();
         Mockito.verify(this.paragraphResult).getStatus();
         Assert.assertEquals(EXIT_CODE_KILL, this.zeppelinTask.getExitStatusCode());
@@ -102,7 +113,9 @@ public class ZeppelinTaskTest {
     public void testHandleWithParagraphExecutionError() throws Exception {
         when(this.paragraphResult.getStatus()).thenReturn(Status.ERROR);
         this.zeppelinTask.handle();
-        Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID, MOCK_PARAGRAPH_ID);
+        Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID,
+                MOCK_PARAGRAPH_ID,
+                (Map<String, String>) mapper.readValue(MOCK_PARAMETERS, Map.class));
         Mockito.verify(this.paragraphResult).getResultInText();
         Mockito.verify(this.paragraphResult).getStatus();
         Assert.assertEquals(EXIT_CODE_FAILURE, this.zeppelinTask.getExitStatusCode());
@@ -110,11 +123,13 @@ public class ZeppelinTaskTest {
 
     @Test
     public void testHandleWithParagraphExecutionException() throws Exception {
-        when(this.zClient.executeParagraph(any(), any())).
+        when(this.zClient.executeParagraph(any(), any(), any(Map.class))).
                 thenThrow(new Exception("Something wrong happens from zeppelin side"));
 //        when(this.paragraphResult.getStatus()).thenReturn(Status.ERROR);
         this.zeppelinTask.handle();
-        Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID, MOCK_PARAGRAPH_ID);
+        Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID,
+                MOCK_PARAGRAPH_ID,
+                (Map<String, String>) mapper.readValue(MOCK_PARAMETERS, Map.class));
         Mockito.verify(this.paragraphResult, Mockito.times(0)).getResultInText();
         Mockito.verify(this.paragraphResult, Mockito.times(0)).getStatus();
         Assert.assertEquals(EXIT_CODE_FAILURE, this.zeppelinTask.getExitStatusCode());
@@ -122,10 +137,9 @@ public class ZeppelinTaskTest {
 
     private String buildZeppelinTaskParameters() {
         ZeppelinParameters zeppelinParameters = new ZeppelinParameters();
-        String noteId = MOCK_NOTE_ID;
-        String paragraphId = MOCK_PARAGRAPH_ID;
-        zeppelinParameters.setNoteId(noteId);
-        zeppelinParameters.setParagraphId(paragraphId);
+        zeppelinParameters.setNoteId(MOCK_NOTE_ID);
+        zeppelinParameters.setParagraphId(MOCK_PARAGRAPH_ID);
+        zeppelinParameters.setParameters(MOCK_PARAMETERS);
 
         return JSONUtils.toJsonString(zeppelinParameters);
     }
diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts b/dolphinscheduler-ui/src/locales/en_US/project.ts
index 381de4232c..d0e7aa2bd7 100644
--- a/dolphinscheduler-ui/src/locales/en_US/project.ts
+++ b/dolphinscheduler-ui/src/locales/en_US/project.ts
@@ -606,6 +606,9 @@ export default {
     zeppelin_paragraph_id: 'zeppelinParagraphId',
     zeppelin_paragraph_id_tips:
       'Please enter the paragraph id of your zeppelin paragraph',
+    zeppelin_parameters: 'parameters',
+    zeppelin_parameters_tips:
+      'Please enter the parameters for zeppelin dynamic form',
     jupyter_conda_env_name: 'condaEnvName',
     jupyter_conda_env_name_tips:
       'Please enter the conda environment name of papermill',
diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
index ccc93958a9..27e3119470 100644
--- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts
+++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
@@ -599,6 +599,9 @@ export default {
     zeppelin_note_id_tips: '请输入zeppelin note id',
     zeppelin_paragraph_id: 'zeppelin_paragraph_id',
     zeppelin_paragraph_id_tips: '请输入zeppelin paragraph id',
+    zeppelin_parameters: 'parameters',
+    zeppelin_parameters_tips:
+      '请输入zeppelin dynamic form参数',
     jupyter_conda_env_name: 'condaEnvName',
     jupyter_conda_env_name_tips: '请输入papermill所在的conda环境名',
     jupyter_input_note_path: 'inputNotePath',
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts
index 5cbe70f15a..ed7caf05df 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts
@@ -56,6 +56,14 @@ export function useZeppelin(model: { [field: string]: any }): IJsonItem[] {
         }
       }
     },
+    {
+      type: 'input',
+      field: 'parameters',
+      name: t('project.node.zeppelin_parameters'),
+      props: {
+        placeholder: t('project.node.zeppelin_parameters_tips')
+      }
+    },
     ...useCustomParams({ model, field: 'localParams', isSimple: false })
   ]
 }
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
index 2ea47c72cb..7c87c22a60 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
@@ -316,6 +316,7 @@ export function formatParams(data: INodeData): {
   if (data.taskType === 'ZEPPELIN') {
     taskParams.noteId = data.zeppelinNoteId
     taskParams.paragraphId = data.zeppelinParagraphId
+    taskParams.parameters = data.parameters
   }
 
   if (data.taskType === 'K8S') {