You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by wa...@apache.org on 2022/05/31 03:19:47 UTC
[dolphinscheduler] branch dev updated: [Feature][Task Plugin] Add support for dynamic form for zeppelin task plugin (#9977) (#10269)
This is an automated email from the ASF dual-hosted git repository.
wanggenhua pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new a8a5da367a [Feature][Task Plugin] Add support for dynamic form for zeppelin task plugin (#9977) (#10269)
a8a5da367a is described below
commit a8a5da367ad37911ee2400df894cdf8ff16fb73e
Author: Eric Gao <er...@gmail.com>
AuthorDate: Tue May 31 11:19:41 2022 +0800
[Feature][Task Plugin] Add support for dynamic form for zeppelin task plugin (#9977) (#10269)
---
docs/docs/en/guide/task/zeppelin.md | 1 +
docs/docs/zh/guide/task/zeppelin.md | 1 +
.../plugin/task/zeppelin/ZeppelinParameters.java | 13 +++++++--
.../plugin/task/zeppelin/ZeppelinTask.java | 12 +++++++-
.../plugin/task/zeppelin/ZeppelinTaskTest.java | 34 +++++++++++++++-------
dolphinscheduler-ui/src/locales/en_US/project.ts | 3 ++
dolphinscheduler-ui/src/locales/zh_CN/project.ts | 3 ++
.../task/components/node/fields/use-zeppelin.ts | 8 +++++
.../projects/task/components/node/format-data.ts | 1 +
9 files changed, 63 insertions(+), 13 deletions(-)
diff --git a/docs/docs/en/guide/task/zeppelin.md b/docs/docs/en/guide/task/zeppelin.md
index 6f5478ee75..08414e664a 100644
--- a/docs/docs/en/guide/task/zeppelin.md
+++ b/docs/docs/en/guide/task/zeppelin.md
@@ -22,6 +22,7 @@ it will call `Zeppelin Client API` to trigger zeppelin notebook paragraph. Click
- Timeout alarm: Check the timeout alarm and timeout failure. When the task exceeds the "timeout period", an alarm email will send and the task execution will fail.
- Zeppelin Note ID: The unique note id for a zeppelin notebook note.
- Zeppelin Paragraph ID: The unique paragraph id for a zeppelin notebook paragraph.
+- Zeppelin Parameters: Parameters in json format used for zeppelin dynamic form.
## Task Example
diff --git a/docs/docs/zh/guide/task/zeppelin.md b/docs/docs/zh/guide/task/zeppelin.md
index 9461075c54..78285a0317 100644
--- a/docs/docs/zh/guide/task/zeppelin.md
+++ b/docs/docs/zh/guide/task/zeppelin.md
@@ -23,6 +23,7 @@
- 前置任务:选择当前任务的前置任务,会将被选择的前置任务设置为当前任务的上游。
- Zeppelin Note ID:Zeppelin Note对应的唯一ID。
- Zepplin Paragraph:Zeppelin Paragraph对应的唯一ID。
+- Zeppelin Parameters: 用于传入Zeppelin Dynamic Form的参数。
## Task Example
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java
index 3617449755..b379985f5d 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java
@@ -32,17 +32,16 @@ public class ZeppelinParameters extends AbstractParameters {
*/
private String noteId;
private String paragraphId;
+ private String parameters;
@Override
public boolean checkParameters() {
-
return StringUtils.isNotEmpty(this.noteId) && StringUtils.isNotEmpty(this.paragraphId);
}
@Override
public List<ResourceInfo> getResourceFilesList() {
return Collections.emptyList();
-
}
public String getNoteId() {
@@ -61,11 +60,21 @@ public class ZeppelinParameters extends AbstractParameters {
this.paragraphId = paragraphId;
}
+ public String getParameters() {
+ return parameters;
+ }
+
+ public void setParameters(String parameters) {
+ this.parameters = parameters;
+ }
+
@Override
public String toString() {
return "ZeppelinParameters{" +
"noteId='" + noteId + '\'' +
", paragraphId='" + paragraphId + '\'' +
+ ", parameters='" + parameters + '\'' +
'}';
}
+
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
index ad163bd8ab..eb32ac5996 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.plugin.task.zeppelin;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -28,6 +29,9 @@ import org.apache.zeppelin.client.ParagraphResult;
import org.apache.zeppelin.client.Status;
import org.apache.zeppelin.client.ZeppelinClient;
+import java.util.HashMap;
+import java.util.Map;
+
public class ZeppelinTask extends AbstractTaskExecutor {
@@ -73,9 +77,15 @@ public class ZeppelinTask extends AbstractTaskExecutor {
try {
String noteId = this.zeppelinParameters.getNoteId();
String paragraphId = this.zeppelinParameters.getParagraphId();
+ String parameters = this.zeppelinParameters.getParameters();
+ Map<String, String> zeppelinParamsMap = new HashMap<>();
+ if (parameters != null) {
+ ObjectMapper mapper = new ObjectMapper();
+ zeppelinParamsMap = mapper.readValue(parameters, Map.class);
+ }
// Submit zeppelin task
- ParagraphResult paragraphResult = this.zClient.executeParagraph(noteId, paragraphId);
+ ParagraphResult paragraphResult = this.zClient.executeParagraph(noteId, paragraphId, zeppelinParamsMap);
String resultContent = paragraphResult.getResultInText();
Status status = paragraphResult.getStatus();
final int exitStatusCode = mapStatusToExitCode(status);
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
index 30fb346f5d..273e5b09f1 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
@@ -27,6 +27,7 @@ import static org.powermock.api.mockito.PowerMockito.mock;
import static org.powermock.api.mockito.PowerMockito.spy;
import static org.powermock.api.mockito.PowerMockito.when;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
@@ -44,17 +45,23 @@ import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
+import java.util.HashMap;
+import java.util.Map;
+
@RunWith(PowerMockRunner.class)
@PrepareForTest({
ZeppelinTask.class,
ZeppelinClient.class,
+ ObjectMapper.class,
})
@PowerMockIgnore({"javax.*"})
public class ZeppelinTaskTest {
private static final String MOCK_NOTE_ID = "2GYJR92R7";
private static final String MOCK_PARAGRAPH_ID = "paragraph_1648793472526_1771221396";
+ private static final String MOCK_PARAMETERS = "{\"key1\": \"value1\", \"key2\": \"value2\"}";
+ private final ObjectMapper mapper = new ObjectMapper();
private ZeppelinClient zClient;
private ZeppelinTask zeppelinTask;
@@ -73,7 +80,7 @@ public class ZeppelinTaskTest {
// use mocked zClient in zeppelinTask
doReturn(this.zClient).when(this.zeppelinTask, "getZeppelinClient");
- when(this.zClient.executeParagraph(any(), any())).thenReturn(this.paragraphResult);
+ when(this.zClient.executeParagraph(any(), any(), any(Map.class))).thenReturn(this.paragraphResult);
when(paragraphResult.getResultInText()).thenReturn("mock-zeppelin-paragraph-execution-result");
this.zeppelinTask.init();
}
@@ -82,7 +89,9 @@ public class ZeppelinTaskTest {
public void testHandleWithParagraphExecutionSucess() throws Exception {
when(this.paragraphResult.getStatus()).thenReturn(Status.FINISHED);
this.zeppelinTask.handle();
- Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID, MOCK_PARAGRAPH_ID);
+ Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID,
+ MOCK_PARAGRAPH_ID,
+ (Map<String, String>) mapper.readValue(MOCK_PARAMETERS, Map.class));
Mockito.verify(this.paragraphResult).getResultInText();
Mockito.verify(this.paragraphResult).getStatus();
Assert.assertEquals(EXIT_CODE_SUCCESS, this.zeppelinTask.getExitStatusCode());
@@ -92,7 +101,9 @@ public class ZeppelinTaskTest {
public void testHandleWithParagraphExecutionAborted() throws Exception {
when(this.paragraphResult.getStatus()).thenReturn(Status.ABORT);
this.zeppelinTask.handle();
- Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID, MOCK_PARAGRAPH_ID);
+ Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID,
+ MOCK_PARAGRAPH_ID,
+ (Map<String, String>) mapper.readValue(MOCK_PARAMETERS, Map.class));
Mockito.verify(this.paragraphResult).getResultInText();
Mockito.verify(this.paragraphResult).getStatus();
Assert.assertEquals(EXIT_CODE_KILL, this.zeppelinTask.getExitStatusCode());
@@ -102,7 +113,9 @@ public class ZeppelinTaskTest {
public void testHandleWithParagraphExecutionError() throws Exception {
when(this.paragraphResult.getStatus()).thenReturn(Status.ERROR);
this.zeppelinTask.handle();
- Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID, MOCK_PARAGRAPH_ID);
+ Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID,
+ MOCK_PARAGRAPH_ID,
+ (Map<String, String>) mapper.readValue(MOCK_PARAMETERS, Map.class));
Mockito.verify(this.paragraphResult).getResultInText();
Mockito.verify(this.paragraphResult).getStatus();
Assert.assertEquals(EXIT_CODE_FAILURE, this.zeppelinTask.getExitStatusCode());
@@ -110,11 +123,13 @@ public class ZeppelinTaskTest {
@Test
public void testHandleWithParagraphExecutionException() throws Exception {
- when(this.zClient.executeParagraph(any(), any())).
+ when(this.zClient.executeParagraph(any(), any(), any(Map.class))).
thenThrow(new Exception("Something wrong happens from zeppelin side"));
// when(this.paragraphResult.getStatus()).thenReturn(Status.ERROR);
this.zeppelinTask.handle();
- Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID, MOCK_PARAGRAPH_ID);
+ Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID,
+ MOCK_PARAGRAPH_ID,
+ (Map<String, String>) mapper.readValue(MOCK_PARAMETERS, Map.class));
Mockito.verify(this.paragraphResult, Mockito.times(0)).getResultInText();
Mockito.verify(this.paragraphResult, Mockito.times(0)).getStatus();
Assert.assertEquals(EXIT_CODE_FAILURE, this.zeppelinTask.getExitStatusCode());
@@ -122,10 +137,9 @@ public class ZeppelinTaskTest {
private String buildZeppelinTaskParameters() {
ZeppelinParameters zeppelinParameters = new ZeppelinParameters();
- String noteId = MOCK_NOTE_ID;
- String paragraphId = MOCK_PARAGRAPH_ID;
- zeppelinParameters.setNoteId(noteId);
- zeppelinParameters.setParagraphId(paragraphId);
+ zeppelinParameters.setNoteId(MOCK_NOTE_ID);
+ zeppelinParameters.setParagraphId(MOCK_PARAGRAPH_ID);
+ zeppelinParameters.setParameters(MOCK_PARAMETERS);
return JSONUtils.toJsonString(zeppelinParameters);
}
diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts b/dolphinscheduler-ui/src/locales/en_US/project.ts
index 381de4232c..d0e7aa2bd7 100644
--- a/dolphinscheduler-ui/src/locales/en_US/project.ts
+++ b/dolphinscheduler-ui/src/locales/en_US/project.ts
@@ -606,6 +606,9 @@ export default {
zeppelin_paragraph_id: 'zeppelinParagraphId',
zeppelin_paragraph_id_tips:
'Please enter the paragraph id of your zeppelin paragraph',
+ zeppelin_parameters: 'parameters',
+ zeppelin_parameters_tips:
+ 'Please enter the parameters for zeppelin dynamic form',
jupyter_conda_env_name: 'condaEnvName',
jupyter_conda_env_name_tips:
'Please enter the conda environment name of papermill',
diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
index ccc93958a9..27e3119470 100644
--- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts
+++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
@@ -599,6 +599,9 @@ export default {
zeppelin_note_id_tips: '请输入zeppelin note id',
zeppelin_paragraph_id: 'zeppelin_paragraph_id',
zeppelin_paragraph_id_tips: '请输入zeppelin paragraph id',
+ zeppelin_parameters: 'parameters',
+ zeppelin_parameters_tips:
+ '请输入zeppelin dynamic form参数',
jupyter_conda_env_name: 'condaEnvName',
jupyter_conda_env_name_tips: '请输入papermill所在的conda环境名',
jupyter_input_note_path: 'inputNotePath',
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts
index 5cbe70f15a..ed7caf05df 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts
@@ -56,6 +56,14 @@ export function useZeppelin(model: { [field: string]: any }): IJsonItem[] {
}
}
},
+ {
+ type: 'input',
+ field: 'parameters',
+ name: t('project.node.zeppelin_parameters'),
+ props: {
+ placeholder: t('project.node.zeppelin_parameters_tips')
+ }
+ },
...useCustomParams({ model, field: 'localParams', isSimple: false })
]
}
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
index 2ea47c72cb..7c87c22a60 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
@@ -316,6 +316,7 @@ export function formatParams(data: INodeData): {
if (data.taskType === 'ZEPPELIN') {
taskParams.noteId = data.zeppelinNoteId
taskParams.paragraphId = data.zeppelinParagraphId
+ taskParams.parameters = data.parameters
}
if (data.taskType === 'K8S') {