You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/06/18 05:34:35 UTC

[dolphinscheduler] branch dev updated: [feature][task] Enable zeppelin schedule a whole zeppelin note (#10434)

This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4be3b877c3 [feature][task] Enable zeppelin schedule a whole zeppelin note (#10434)
4be3b877c3 is described below

commit 4be3b877c3c143e2b5c58c017b641fd927c0dff5
Author: Eric Gao <er...@gmail.com>
AuthorDate: Sat Jun 18 13:34:24 2022 +0800

    [feature][task] Enable zeppelin schedule a whole zeppelin note (#10434)
---
 docs/docs/en/guide/task/zeppelin.md                |  2 +-
 docs/docs/zh/guide/task/zeppelin.md                |  2 +-
 .../plugin/task/zeppelin/ZeppelinParameters.java   |  2 +-
 .../plugin/task/zeppelin/ZeppelinTask.java         | 65 +++++++++++++++++-----
 .../plugin/task/zeppelin/ZeppelinTaskTest.java     | 37 +++++++++++-
 .../task/components/node/fields/use-jupyter.ts     | 54 ------------------
 .../task/components/node/fields/use-zeppelin.ts    |  9 ---
 7 files changed, 89 insertions(+), 82 deletions(-)

diff --git a/docs/docs/en/guide/task/zeppelin.md b/docs/docs/en/guide/task/zeppelin.md
index ea2d203394..7dae3c4dca 100644
--- a/docs/docs/en/guide/task/zeppelin.md
+++ b/docs/docs/en/guide/task/zeppelin.md
@@ -21,7 +21,7 @@ it will call `Zeppelin Client API` to trigger zeppelin notebook paragraph. Click
 - Failed retry interval: The time interval for resubmitting the task after a failed task. It supports drop-down and hand-filling.
 - Timeout alarm: Check the timeout alarm and timeout failure. When the task exceeds the "timeout period", an alarm email will send and the task execution will fail.
 - Zeppelin Note ID: The unique note id for a zeppelin notebook note.
-- Zeppelin Paragraph ID: The unique paragraph id for a zeppelin notebook paragraph.
+- Zeppelin Paragraph ID: The unique paragraph id for a zeppelin notebook paragraph. If you want to schedule a whole note at a time, leave this field blank.
 - Zeppelin Parameters: Parameters in json format used for zeppelin dynamic form.
 
 ## Task Example
diff --git a/docs/docs/zh/guide/task/zeppelin.md b/docs/docs/zh/guide/task/zeppelin.md
index fa2477a7dc..18d36dfb71 100644
--- a/docs/docs/zh/guide/task/zeppelin.md
+++ b/docs/docs/zh/guide/task/zeppelin.md
@@ -22,7 +22,7 @@
 - 超时告警:勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败.
 - 前置任务:选择当前任务的前置任务,会将被选择的前置任务设置为当前任务的上游。
 - Zeppelin Note ID:Zeppelin Note对应的唯一ID。
-- Zepplin Paragraph:Zeppelin Paragraph对应的唯一ID。
+- Zeppelin Paragraph ID:Zeppelin Paragraph对应的唯一ID。如果你想一次性调度整个note,这一栏不填即可。
 - Zeppelin Parameters: 用于传入Zeppelin Dynamic Form的参数。
 
 ## Task Example
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java
index b379985f5d..4ae64b69de 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java
@@ -36,7 +36,7 @@ public class ZeppelinParameters extends AbstractParameters {
 
     @Override
     public boolean checkParameters() {
-        return StringUtils.isNotEmpty(this.noteId) && StringUtils.isNotEmpty(this.paragraphId);
+        return StringUtils.isNotEmpty(this.noteId);
     }
 
     @Override
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
index eb32ac5996..062b56f503 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
@@ -18,6 +18,7 @@
 package org.apache.dolphinscheduler.plugin.task.zeppelin;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import kong.unirest.Unirest;
 import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -25,11 +26,12 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
 import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
 import org.apache.zeppelin.client.ClientConfig;
+import org.apache.zeppelin.client.NoteResult;
 import org.apache.zeppelin.client.ParagraphResult;
 import org.apache.zeppelin.client.Status;
 import org.apache.zeppelin.client.ZeppelinClient;
-
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 
@@ -85,11 +87,34 @@ public class ZeppelinTask extends AbstractTaskExecutor {
             }
 
             // Submit zeppelin task
-            ParagraphResult paragraphResult = this.zClient.executeParagraph(noteId, paragraphId, zeppelinParamsMap);
-            String resultContent = paragraphResult.getResultInText();
-            Status status = paragraphResult.getStatus();
-            final int exitStatusCode = mapStatusToExitCode(status);
+            String resultContent;
+            Status status = Status.FINISHED;
+            if (paragraphId == null) {
+                NoteResult noteResult = this.zClient.executeNote(noteId, zeppelinParamsMap);
+                List<ParagraphResult> paragraphResultList = noteResult.getParagraphResultList();
+                StringBuilder resultContentBuilder = new StringBuilder();
+                for (ParagraphResult paragraphResult : paragraphResultList) {
+                    resultContentBuilder.append(
+                            String.format(
+                                    "paragraph_id: %s, paragraph_result: %s\n",
+                                    paragraphResult.getParagraphId(),
+                                    paragraphResult.getResultInText()));
+                    status = paragraphResult.getStatus();
+                    // we treat note execution as failure if any paragraph in the note fails
+                    // status will be further processed in method mapStatusToExitCode below
+                    if (status != Status.FINISHED) {
+                        break;
+                    }
+                }
+                resultContent = resultContentBuilder.toString();
+            } else {
+                ParagraphResult paragraphResult = this.zClient.executeParagraph(noteId, paragraphId, zeppelinParamsMap);
+                resultContent = paragraphResult.getResultInText();
+                status = paragraphResult.getStatus();
+            }
+
             // Use noteId-paragraph-Id as app id
+            final int exitStatusCode = mapStatusToExitCode(status);
             setAppIds(String.format("%s-%s", noteId, paragraphId));
             setExitStatusCode(exitStatusCode);
             logger.info("zeppelin task finished with results: {}", resultContent);
@@ -146,15 +171,27 @@ public class ZeppelinTask extends AbstractTaskExecutor {
         super.cancelApplication(status);
         String noteId = this.zeppelinParameters.getNoteId();
         String paragraphId = this.zeppelinParameters.getParagraphId();
-        logger.info("trying terminate zeppelin task, taskId: {}, noteId: {}, paragraphId: {}",
-                this.taskExecutionContext.getTaskInstanceId(),
-                noteId,
-                paragraphId);
-        this.zClient.cancelParagraph(noteId, paragraphId);
-        logger.info("zeppelin task terminated, taskId: {}, noteId: {}, paragraphId: {}",
-                this.taskExecutionContext.getTaskInstanceId(),
-                noteId,
-                paragraphId);
+        if (paragraphId == null) {
+            logger.info("trying terminate zeppelin task, taskId: {}, noteId: {}",
+                    this.taskExecutionContext.getTaskInstanceId(),
+                    noteId);
+            Unirest.config().defaultBaseUrl(PropertyUtils.getString(TaskConstants.ZEPPELIN_REST_URL) + "/api");
+            Unirest.delete("/notebook/job/{noteId}").routeParam("noteId", noteId).asJson();
+            logger.info("zeppelin task terminated, taskId: {}, noteId: {}",
+                    this.taskExecutionContext.getTaskInstanceId(),
+                    noteId);
+        } else {
+            logger.info("trying terminate zeppelin task, taskId: {}, noteId: {}, paragraphId: {}",
+                    this.taskExecutionContext.getTaskInstanceId(),
+                    noteId,
+                    paragraphId);
+            this.zClient.cancelParagraph(noteId, paragraphId);
+            logger.info("zeppelin task terminated, taskId: {}, noteId: {}, paragraphId: {}",
+                    this.taskExecutionContext.getTaskInstanceId(),
+                    noteId,
+                    paragraphId);
+        }
+
     }
 
 }
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
index 273e5b09f1..96d7466343 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
@@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.spi.utils.JSONUtils;
 
 
 import org.apache.zeppelin.client.ParagraphResult;
+import org.apache.zeppelin.client.NoteResult;
 import org.apache.zeppelin.client.Status;
 import org.apache.zeppelin.client.ZeppelinClient;
 import org.junit.Assert;
@@ -45,7 +46,6 @@ import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import java.util.HashMap;
 import java.util.Map;
 
 
@@ -66,6 +66,7 @@ public class ZeppelinTaskTest {
     private ZeppelinClient zClient;
     private ZeppelinTask zeppelinTask;
     private ParagraphResult paragraphResult;
+    private NoteResult noteResult;
 
     @Before
     public void before() throws Exception {
@@ -86,7 +87,7 @@ public class ZeppelinTaskTest {
     }
 
     @Test
-    public void testHandleWithParagraphExecutionSucess() throws Exception {
+    public void testHandleWithParagraphExecutionSuccess() throws Exception {
         when(this.paragraphResult.getStatus()).thenReturn(Status.FINISHED);
         this.zeppelinTask.handle();
         Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID,
@@ -135,6 +136,30 @@ public class ZeppelinTaskTest {
         Assert.assertEquals(EXIT_CODE_FAILURE, this.zeppelinTask.getExitStatusCode());
     }
 
+    @Test
+    public void testHandleWithNoteExecutionSuccess() throws Exception {
+        String zeppelinParametersWithNoParagraphId = buildZeppelinTaskParametersWithNoParagraphId();
+        TaskExecutionContext taskExecutionContext= PowerMockito.mock(TaskExecutionContext.class);
+        when(taskExecutionContext.getTaskParams()).thenReturn(zeppelinParametersWithNoParagraphId);
+        this.zeppelinTask = spy(new ZeppelinTask(taskExecutionContext));
+
+        // mock zClient and note result
+        this.zClient = mock(ZeppelinClient.class);
+        this.noteResult = mock(NoteResult.class);
+
+        // use mocked zClient in zeppelinTask
+        doReturn(this.zClient).when(this.zeppelinTask, "getZeppelinClient");
+        when(this.zClient.executeNote(any(), any(Map.class))).thenReturn(this.noteResult);
+        when(paragraphResult.getResultInText()).thenReturn("mock-zeppelin-paragraph-execution-result");
+        this.zeppelinTask.init();
+        when(this.paragraphResult.getStatus()).thenReturn(Status.FINISHED);
+        this.zeppelinTask.handle();
+        Mockito.verify(this.zClient).executeNote(MOCK_NOTE_ID,
+                (Map<String, String>) mapper.readValue(MOCK_PARAMETERS, Map.class));
+        Mockito.verify(this.noteResult).getParagraphResultList();
+        Assert.assertEquals(EXIT_CODE_SUCCESS, this.zeppelinTask.getExitStatusCode());
+    }
+
     private String buildZeppelinTaskParameters() {
         ZeppelinParameters zeppelinParameters = new ZeppelinParameters();
         zeppelinParameters.setNoteId(MOCK_NOTE_ID);
@@ -143,4 +168,12 @@ public class ZeppelinTaskTest {
 
         return JSONUtils.toJsonString(zeppelinParameters);
     }
+
+    private String buildZeppelinTaskParametersWithNoParagraphId() {
+        ZeppelinParameters zeppelinParameters = new ZeppelinParameters();
+        zeppelinParameters.setNoteId(MOCK_NOTE_ID);
+        zeppelinParameters.setParameters(MOCK_PARAMETERS);
+
+        return JSONUtils.toJsonString(zeppelinParameters);
+    }
 }
\ No newline at end of file
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-jupyter.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-jupyter.ts
index a35510840f..c9512bb673 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-jupyter.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-jupyter.ts
@@ -80,15 +80,6 @@ export function useJupyter(model: { [field: string]: any }): IJsonItem[] {
       props: {
         placeholder: t('project.node.jupyter_parameters_tips')
       }
-      //       validate: {
-      //         trigger: ['input', 'blur'],
-      //         required: false,
-      //         validator(validate: any, value: string) {
-      //           if (!value) {
-      //             return new Error(t('project.node.jupyter_parameters_tips'))
-      //           }
-      //         }
-      //       }
     },
     {
       type: 'input',
@@ -97,15 +88,6 @@ export function useJupyter(model: { [field: string]: any }): IJsonItem[] {
       props: {
         placeholder: t('project.node.jupyter_kernel_tips')
       }
-      //       validate: {
-      //         trigger: ['input', 'blur'],
-      //         required: false,
-      //         validator(validate: any, value: string) {
-      //           if (!value) {
-      //             return new Error(t('project.node.jupyter_kernel_tips'))
-      //           }
-      //         }
-      //       }
     },
     {
       type: 'input',
@@ -114,15 +96,6 @@ export function useJupyter(model: { [field: string]: any }): IJsonItem[] {
       props: {
         placeholder: t('project.node.jupyter_engine_tips')
       }
-      //       validate: {
-      //         trigger: ['input', 'blur'],
-      //         required: false,
-      //         validator(validate: any, value: string) {
-      //           if (!value) {
-      //             return new Error(t('project.node.jupyter_engine_tips'))
-      //           }
-      //         }
-      //       }
     },
     {
       type: 'input',
@@ -131,15 +104,6 @@ export function useJupyter(model: { [field: string]: any }): IJsonItem[] {
       props: {
         placeholder: t('project.node.jupyter_execution_timeout_tips')
       }
-      //       validate: {
-      //         trigger: ['input', 'blur'],
-      //         required: false,
-      //         validator(validate: any, value: string) {
-      //           if (!value) {
-      //             return new Error(t('project.node.jupyter_execution_timeout_tips'))
-      //           }
-      //         }
-      //       }
     },
     {
       type: 'input',
@@ -148,15 +112,6 @@ export function useJupyter(model: { [field: string]: any }): IJsonItem[] {
       props: {
         placeholder: t('project.node.jupyter_start_timeout_tips')
       }
-      //       validate: {
-      //         trigger: ['input', 'blur'],
-      //         required: false,
-      //         validator(validate: any, value: string) {
-      //           if (!value) {
-      //             return new Error(t('project.node.jupyter_start_timeout_tips'))
-      //           }
-      //         }
-      //       }
     },
     {
       type: 'input',
@@ -165,15 +120,6 @@ export function useJupyter(model: { [field: string]: any }): IJsonItem[] {
       props: {
         placeholder: t('project.node.jupyter_others_tips')
       }
-      //       validate: {
-      //         trigger: ['input', 'blur'],
-      //         required: false,
-      //         validator(validate: any, value: string) {
-      //           if (!value) {
-      //             return new Error(t('project.node.jupyter_others_tips'))
-      //           }
-      //         }
-      //       }
     },
     ...useCustomParams({ model, field: 'localParams', isSimple: false })
   ]
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts
index ed7caf05df..5cbc1324f9 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts
@@ -45,15 +45,6 @@ export function useZeppelin(model: { [field: string]: any }): IJsonItem[] {
       name: t('project.node.zeppelin_paragraph_id'),
       props: {
         placeholder: t('project.node.zeppelin_paragraph_id_tips')
-      },
-      validate: {
-        trigger: ['input', 'blur'],
-        required: true,
-        validator(validate: any, value: string) {
-          if (!value) {
-            return new Error(t('project.node.zeppelin_paragraph_id_tips'))
-          }
-        }
       }
     },
     {