You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/06/18 05:34:35 UTC
[dolphinscheduler] branch dev updated: [feature][task] Enable zeppelin schedule a whole zeppelin note (#10434)
This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 4be3b877c3 [feature][task] Enable zeppelin schedule a whole zeppelin note (#10434)
4be3b877c3 is described below
commit 4be3b877c3c143e2b5c58c017b641fd927c0dff5
Author: Eric Gao <er...@gmail.com>
AuthorDate: Sat Jun 18 13:34:24 2022 +0800
[feature][task] Enable zeppelin schedule a whole zeppelin note (#10434)
---
docs/docs/en/guide/task/zeppelin.md | 2 +-
docs/docs/zh/guide/task/zeppelin.md | 2 +-
.../plugin/task/zeppelin/ZeppelinParameters.java | 2 +-
.../plugin/task/zeppelin/ZeppelinTask.java | 65 +++++++++++++++++-----
.../plugin/task/zeppelin/ZeppelinTaskTest.java | 37 +++++++++++-
.../task/components/node/fields/use-jupyter.ts | 54 ------------------
.../task/components/node/fields/use-zeppelin.ts | 9 ---
7 files changed, 89 insertions(+), 82 deletions(-)
diff --git a/docs/docs/en/guide/task/zeppelin.md b/docs/docs/en/guide/task/zeppelin.md
index ea2d203394..7dae3c4dca 100644
--- a/docs/docs/en/guide/task/zeppelin.md
+++ b/docs/docs/en/guide/task/zeppelin.md
@@ -21,7 +21,7 @@ it will call `Zeppelin Client API` to trigger zeppelin notebook paragraph. Click
- Failed retry interval: The time interval for resubmitting the task after a failed task. It supports drop-down and hand-filling.
- Timeout alarm: Check the timeout alarm and timeout failure. When the task exceeds the "timeout period", an alarm email will send and the task execution will fail.
- Zeppelin Note ID: The unique note id for a zeppelin notebook note.
-- Zeppelin Paragraph ID: The unique paragraph id for a zeppelin notebook paragraph.
+- Zeppelin Paragraph ID: The unique paragraph id for a zeppelin notebook paragraph. If you want to schedule a whole note at a time, leave this field blank.
- Zeppelin Parameters: Parameters in json format used for zeppelin dynamic form.
## Task Example
diff --git a/docs/docs/zh/guide/task/zeppelin.md b/docs/docs/zh/guide/task/zeppelin.md
index fa2477a7dc..18d36dfb71 100644
--- a/docs/docs/zh/guide/task/zeppelin.md
+++ b/docs/docs/zh/guide/task/zeppelin.md
@@ -22,7 +22,7 @@
- 超时告警:勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败.
- 前置任务:选择当前任务的前置任务,会将被选择的前置任务设置为当前任务的上游。
- Zeppelin Note ID:Zeppelin Note对应的唯一ID。
-- Zepplin Paragraph:Zeppelin Paragraph对应的唯一ID。
+- Zeppelin Paragraph ID:Zeppelin Paragraph对应的唯一ID。如果你想一次性调度整个note,这一栏不填即可。
- Zeppelin Parameters: 用于传入Zeppelin Dynamic Form的参数。
## Task Example
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java
index b379985f5d..4ae64b69de 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java
@@ -36,7 +36,7 @@ public class ZeppelinParameters extends AbstractParameters {
@Override
public boolean checkParameters() {
- return StringUtils.isNotEmpty(this.noteId) && StringUtils.isNotEmpty(this.paragraphId);
+ return StringUtils.isNotEmpty(this.noteId);
}
@Override
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
index eb32ac5996..062b56f503 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.plugin.task.zeppelin;
import com.fasterxml.jackson.databind.ObjectMapper;
+import kong.unirest.Unirest;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -25,11 +26,12 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
import org.apache.zeppelin.client.ClientConfig;
+import org.apache.zeppelin.client.NoteResult;
import org.apache.zeppelin.client.ParagraphResult;
import org.apache.zeppelin.client.Status;
import org.apache.zeppelin.client.ZeppelinClient;
-
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
@@ -85,11 +87,34 @@ public class ZeppelinTask extends AbstractTaskExecutor {
}
// Submit zeppelin task
- ParagraphResult paragraphResult = this.zClient.executeParagraph(noteId, paragraphId, zeppelinParamsMap);
- String resultContent = paragraphResult.getResultInText();
- Status status = paragraphResult.getStatus();
- final int exitStatusCode = mapStatusToExitCode(status);
+ String resultContent;
+ Status status = Status.FINISHED;
+ if (paragraphId == null) {
+ NoteResult noteResult = this.zClient.executeNote(noteId, zeppelinParamsMap);
+ List<ParagraphResult> paragraphResultList = noteResult.getParagraphResultList();
+ StringBuilder resultContentBuilder = new StringBuilder();
+ for (ParagraphResult paragraphResult : paragraphResultList) {
+ resultContentBuilder.append(
+ String.format(
+ "paragraph_id: %s, paragraph_result: %s\n",
+ paragraphResult.getParagraphId(),
+ paragraphResult.getResultInText()));
+ status = paragraphResult.getStatus();
+ // we treat note execution as failure if any paragraph in the note fails
+ // status will be further processed in method mapStatusToExitCode below
+ if (status != Status.FINISHED) {
+ break;
+ }
+ }
+ resultContent = resultContentBuilder.toString();
+ } else {
+ ParagraphResult paragraphResult = this.zClient.executeParagraph(noteId, paragraphId, zeppelinParamsMap);
+ resultContent = paragraphResult.getResultInText();
+ status = paragraphResult.getStatus();
+ }
+
// Use noteId-paragraph-Id as app id
+ final int exitStatusCode = mapStatusToExitCode(status);
setAppIds(String.format("%s-%s", noteId, paragraphId));
setExitStatusCode(exitStatusCode);
logger.info("zeppelin task finished with results: {}", resultContent);
@@ -146,15 +171,27 @@ public class ZeppelinTask extends AbstractTaskExecutor {
super.cancelApplication(status);
String noteId = this.zeppelinParameters.getNoteId();
String paragraphId = this.zeppelinParameters.getParagraphId();
- logger.info("trying terminate zeppelin task, taskId: {}, noteId: {}, paragraphId: {}",
- this.taskExecutionContext.getTaskInstanceId(),
- noteId,
- paragraphId);
- this.zClient.cancelParagraph(noteId, paragraphId);
- logger.info("zeppelin task terminated, taskId: {}, noteId: {}, paragraphId: {}",
- this.taskExecutionContext.getTaskInstanceId(),
- noteId,
- paragraphId);
+ if (paragraphId == null) {
+ logger.info("trying terminate zeppelin task, taskId: {}, noteId: {}",
+ this.taskExecutionContext.getTaskInstanceId(),
+ noteId);
+ Unirest.config().defaultBaseUrl(PropertyUtils.getString(TaskConstants.ZEPPELIN_REST_URL) + "/api");
+ Unirest.delete("/notebook/job/{noteId}").routeParam("noteId", noteId).asJson();
+ logger.info("zeppelin task terminated, taskId: {}, noteId: {}",
+ this.taskExecutionContext.getTaskInstanceId(),
+ noteId);
+ } else {
+ logger.info("trying terminate zeppelin task, taskId: {}, noteId: {}, paragraphId: {}",
+ this.taskExecutionContext.getTaskInstanceId(),
+ noteId,
+ paragraphId);
+ this.zClient.cancelParagraph(noteId, paragraphId);
+ logger.info("zeppelin task terminated, taskId: {}, noteId: {}, paragraphId: {}",
+ this.taskExecutionContext.getTaskInstanceId(),
+ noteId,
+ paragraphId);
+ }
+
}
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
index 273e5b09f1..96d7466343 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
@@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.zeppelin.client.ParagraphResult;
+import org.apache.zeppelin.client.NoteResult;
import org.apache.zeppelin.client.Status;
import org.apache.zeppelin.client.ZeppelinClient;
import org.junit.Assert;
@@ -45,7 +46,6 @@ import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
-import java.util.HashMap;
import java.util.Map;
@@ -66,6 +66,7 @@ public class ZeppelinTaskTest {
private ZeppelinClient zClient;
private ZeppelinTask zeppelinTask;
private ParagraphResult paragraphResult;
+ private NoteResult noteResult;
@Before
public void before() throws Exception {
@@ -86,7 +87,7 @@ public class ZeppelinTaskTest {
}
@Test
- public void testHandleWithParagraphExecutionSucess() throws Exception {
+ public void testHandleWithParagraphExecutionSuccess() throws Exception {
when(this.paragraphResult.getStatus()).thenReturn(Status.FINISHED);
this.zeppelinTask.handle();
Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID,
@@ -135,6 +136,30 @@ public class ZeppelinTaskTest {
Assert.assertEquals(EXIT_CODE_FAILURE, this.zeppelinTask.getExitStatusCode());
}
+ @Test
+ public void testHandleWithNoteExecutionSuccess() throws Exception {
+ String zeppelinParametersWithNoParagraphId = buildZeppelinTaskParametersWithNoParagraphId();
+ TaskExecutionContext taskExecutionContext= PowerMockito.mock(TaskExecutionContext.class);
+ when(taskExecutionContext.getTaskParams()).thenReturn(zeppelinParametersWithNoParagraphId);
+ this.zeppelinTask = spy(new ZeppelinTask(taskExecutionContext));
+
+ // mock zClient and note result
+ this.zClient = mock(ZeppelinClient.class);
+ this.noteResult = mock(NoteResult.class);
+
+ // use mocked zClient in zeppelinTask
+ doReturn(this.zClient).when(this.zeppelinTask, "getZeppelinClient");
+ when(this.zClient.executeNote(any(), any(Map.class))).thenReturn(this.noteResult);
+ when(paragraphResult.getResultInText()).thenReturn("mock-zeppelin-paragraph-execution-result");
+ this.zeppelinTask.init();
+ when(this.paragraphResult.getStatus()).thenReturn(Status.FINISHED);
+ this.zeppelinTask.handle();
+ Mockito.verify(this.zClient).executeNote(MOCK_NOTE_ID,
+ (Map<String, String>) mapper.readValue(MOCK_PARAMETERS, Map.class));
+ Mockito.verify(this.noteResult).getParagraphResultList();
+ Assert.assertEquals(EXIT_CODE_SUCCESS, this.zeppelinTask.getExitStatusCode());
+ }
+
private String buildZeppelinTaskParameters() {
ZeppelinParameters zeppelinParameters = new ZeppelinParameters();
zeppelinParameters.setNoteId(MOCK_NOTE_ID);
@@ -143,4 +168,12 @@ public class ZeppelinTaskTest {
return JSONUtils.toJsonString(zeppelinParameters);
}
+
+ private String buildZeppelinTaskParametersWithNoParagraphId() {
+ ZeppelinParameters zeppelinParameters = new ZeppelinParameters();
+ zeppelinParameters.setNoteId(MOCK_NOTE_ID);
+ zeppelinParameters.setParameters(MOCK_PARAMETERS);
+
+ return JSONUtils.toJsonString(zeppelinParameters);
+ }
}
\ No newline at end of file
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-jupyter.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-jupyter.ts
index a35510840f..c9512bb673 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-jupyter.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-jupyter.ts
@@ -80,15 +80,6 @@ export function useJupyter(model: { [field: string]: any }): IJsonItem[] {
props: {
placeholder: t('project.node.jupyter_parameters_tips')
}
- // validate: {
- // trigger: ['input', 'blur'],
- // required: false,
- // validator(validate: any, value: string) {
- // if (!value) {
- // return new Error(t('project.node.jupyter_parameters_tips'))
- // }
- // }
- // }
},
{
type: 'input',
@@ -97,15 +88,6 @@ export function useJupyter(model: { [field: string]: any }): IJsonItem[] {
props: {
placeholder: t('project.node.jupyter_kernel_tips')
}
- // validate: {
- // trigger: ['input', 'blur'],
- // required: false,
- // validator(validate: any, value: string) {
- // if (!value) {
- // return new Error(t('project.node.jupyter_kernel_tips'))
- // }
- // }
- // }
},
{
type: 'input',
@@ -114,15 +96,6 @@ export function useJupyter(model: { [field: string]: any }): IJsonItem[] {
props: {
placeholder: t('project.node.jupyter_engine_tips')
}
- // validate: {
- // trigger: ['input', 'blur'],
- // required: false,
- // validator(validate: any, value: string) {
- // if (!value) {
- // return new Error(t('project.node.jupyter_engine_tips'))
- // }
- // }
- // }
},
{
type: 'input',
@@ -131,15 +104,6 @@ export function useJupyter(model: { [field: string]: any }): IJsonItem[] {
props: {
placeholder: t('project.node.jupyter_execution_timeout_tips')
}
- // validate: {
- // trigger: ['input', 'blur'],
- // required: false,
- // validator(validate: any, value: string) {
- // if (!value) {
- // return new Error(t('project.node.jupyter_execution_timeout_tips'))
- // }
- // }
- // }
},
{
type: 'input',
@@ -148,15 +112,6 @@ export function useJupyter(model: { [field: string]: any }): IJsonItem[] {
props: {
placeholder: t('project.node.jupyter_start_timeout_tips')
}
- // validate: {
- // trigger: ['input', 'blur'],
- // required: false,
- // validator(validate: any, value: string) {
- // if (!value) {
- // return new Error(t('project.node.jupyter_start_timeout_tips'))
- // }
- // }
- // }
},
{
type: 'input',
@@ -165,15 +120,6 @@ export function useJupyter(model: { [field: string]: any }): IJsonItem[] {
props: {
placeholder: t('project.node.jupyter_others_tips')
}
- // validate: {
- // trigger: ['input', 'blur'],
- // required: false,
- // validator(validate: any, value: string) {
- // if (!value) {
- // return new Error(t('project.node.jupyter_others_tips'))
- // }
- // }
- // }
},
...useCustomParams({ model, field: 'localParams', isSimple: false })
]
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts
index ed7caf05df..5cbc1324f9 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts
@@ -45,15 +45,6 @@ export function useZeppelin(model: { [field: string]: any }): IJsonItem[] {
name: t('project.node.zeppelin_paragraph_id'),
props: {
placeholder: t('project.node.zeppelin_paragraph_id_tips')
- },
- validate: {
- trigger: ['input', 'blur'],
- required: true,
- validator(validate: any, value: string) {
- if (!value) {
- return new Error(t('project.node.zeppelin_paragraph_id_tips'))
- }
- }
}
},
{