You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by wa...@apache.org on 2022/07/20 06:16:53 UTC
[dolphinscheduler] branch dev updated: [Feature][Task Plugin] Increase zeppelin task stability in production (#11010)
This is an automated email from the ASF dual-hosted git repository.
wanggenhua pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new f689220290 [Feature][Task Plugin] Increase zeppelin task stability in production (#11010)
f689220290 is described below
commit f689220290a13d5ce3b451e65a475c0444e7a047
Author: Eric Gao <er...@gmail.com>
AuthorDate: Wed Jul 20 14:16:47 2022 +0800
[Feature][Task Plugin] Increase zeppelin task stability in production (#11010)
* [Feature][Task Plugin] Increase zeppelin task stability in production (#10584)
* Add front-end and update docs for the production mode of zeppelin task
* Fix minor front-end bug of zeppelin task plugin
* Refactor ZeppelinParameters with lombok
* Fix formatting
* Replace @Data with @Getter, @Setter, @ToString to avoid decrease in test coverage
---
docs/docs/en/guide/task/zeppelin.md | 12 +++++
docs/docs/zh/guide/task/zeppelin.md | 11 +++++
.../plugin/task/zeppelin/ZeppelinParameters.java | 49 +++-----------------
.../plugin/task/zeppelin/ZeppelinTask.java | 22 ++++++++-
.../plugin/task/zeppelin/ZeppelinTaskTest.java | 52 ++++++++++++++++++++--
dolphinscheduler-ui/src/locales/en_US/project.ts | 2 +
dolphinscheduler-ui/src/locales/zh_CN/project.ts | 2 +
.../task/components/node/fields/use-zeppelin.ts | 8 ++++
.../projects/task/components/node/format-data.ts | 1 +
.../views/projects/task/components/node/types.ts | 2 +
10 files changed, 115 insertions(+), 46 deletions(-)
diff --git a/docs/docs/en/guide/task/zeppelin.md b/docs/docs/en/guide/task/zeppelin.md
index c88c0b92b8..8c8de9dae6 100644
--- a/docs/docs/en/guide/task/zeppelin.md
+++ b/docs/docs/en/guide/task/zeppelin.md
@@ -26,9 +26,21 @@ it will call `Zeppelin Client API` to trigger zeppelin notebook paragraph. Click
| Timeout alarm | Check Timeout Alarm and Timeout Failure. When the task exceeds the "timeout duration", an alarm email will be sent and the task execution will fail. |
| Zeppelin Note ID | The unique note id for a zeppelin notebook note. |
| Zeppelin Paragraph ID | The unique paragraph id for a zeppelin notebook paragraph. If you want to schedule a whole note at a time, leave this field blank. |
+| Zeppelin Production Note Directory | The directory for cloned note in production mode. |
| Zeppelin Rest Endpoint | The REST endpoint of your zeppelin server |
| Zeppelin Parameters | Parameters in json format used for zeppelin dynamic form. |
+## Production (Clone) Mode
+
+- Fill in the optional `Zeppelin Production Note Directory` parameter to enable `Production Mode`.
+- In `Production Mode`, the target note gets copied to the `Zeppelin Production Note Directory` you choose.
+`Zeppelin Task Plugin` will execute the cloned note instead of the original one. Once execution done,
+`Zeppelin Task Plugin` will delete the cloned note automatically.
+Therefore, it increases the stability as the modification to a running note triggered by `Dolphin Scheduler`
+will not affect the production task.
+- If you leave the `Zeppelin Production Note Directory` empty, `Zeppelin Task Plugin` will execute the original note.
+- 'Zeppelin Production Note Directory' should both start and end with a `slash`. e.g. `/production_note_directory/`
+
## Task Example
### Zeppelin Paragraph Task Example
diff --git a/docs/docs/zh/guide/task/zeppelin.md b/docs/docs/zh/guide/task/zeppelin.md
index eb802d5cfb..ac3bebc690 100644
--- a/docs/docs/zh/guide/task/zeppelin.md
+++ b/docs/docs/zh/guide/task/zeppelin.md
@@ -24,8 +24,19 @@
- Zeppelin Note ID:Zeppelin Note对应的唯一ID。
- Zeppelin Paragraph ID:Zeppelin Paragraph对应的唯一ID。如果你想一次性调度整个note,这一栏不填即可。
- Zeppelin Rest Endpoint:您的Zeppelin服务的REST Endpoint。
+- Zeppelin Production Note Directory:生产模式下存放克隆note的目录。
- Zeppelin Parameters: 用于传入Zeppelin Dynamic Form的参数。
+## 生产(克隆)模式
+
+- 填上`Zeppelin Production Note Directory`参数以启动`生产模式`。
+- 在`生产模式`下,目标note会被克隆到您所填的`Zeppelin Production Note Directory`目录下。
+`Zeppelin任务插件`将会执行克隆出来的note并在执行成功后自动清除它。
+因为在此模式下,如果您不小心修改了正在被`Dolphin Scheduler`调度的note,也不会影响到生产任务的执行,
+从而提高了稳定性。
+- 如果您选择不填`Zeppelin Production Note Directory`这个参数,`Zeppelin任务插件`将会执行您的原始note。
+'Zeppelin Production Note Directory'参数在格式上应该以`斜杠`开头和结尾,例如 `/production_note_directory/`。
+
## Task Example
### Zeppelin Paragraph Task Example
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java
index 0c67e4f69b..b95c93971f 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java
@@ -17,6 +17,9 @@
package org.apache.dolphinscheduler.plugin.task.zeppelin;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
@@ -24,6 +27,9 @@ import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.util.Collections;
import java.util.List;
+@Getter
+@Setter
+@ToString
public class ZeppelinParameters extends AbstractParameters {
/**
@@ -33,6 +39,7 @@ public class ZeppelinParameters extends AbstractParameters {
private String noteId;
private String paragraphId;
private String restEndpoint;
+ private String productionNoteDirectory;
private String parameters;
@Override
@@ -45,46 +52,4 @@ public class ZeppelinParameters extends AbstractParameters {
return Collections.emptyList();
}
- public String getNoteId() {
- return noteId;
- }
-
- public void setNoteId(String noteId) {
- this.noteId = noteId;
- }
-
- public String getParagraphId() {
- return paragraphId;
- }
-
- public void setParagraphId(String paragraphId) {
- this.paragraphId = paragraphId;
- }
-
- public String getParameters() {
- return parameters;
- }
-
- public void setParameters(String parameters) {
- this.parameters = parameters;
- }
-
- public String getRestEndpoint() {
- return restEndpoint;
- }
-
- public void setRestEndpoint(String restEndpoint) {
- this.restEndpoint = restEndpoint;
- }
-
- @Override
- public String toString() {
- return "ZeppelinParameters{"
- + "noteId='" + noteId + '\''
- + ", paragraphId='" + paragraphId + '\''
- + ", restEndpoint='" + restEndpoint + '\''
- + ", parameters='" + parameters + '\''
- + '}';
- }
-
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
index ca850dcbbc..4fe0120e24 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
@@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.spi.utils.DateUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.zeppelin.client.ClientConfig;
import org.apache.zeppelin.client.NoteResult;
@@ -76,9 +77,11 @@ public class ZeppelinTask extends AbstractTaskExecutor {
@Override
public void handle() throws Exception {
try {
- final String noteId = this.zeppelinParameters.getNoteId();
final String paragraphId = this.zeppelinParameters.getParagraphId();
+ final String productionNoteDirectory = this.zeppelinParameters.getProductionNoteDirectory();
final String parameters = this.zeppelinParameters.getParameters();
+ // noteId may be replaced with cloned noteId
+ String noteId = this.zeppelinParameters.getNoteId();
Map<String, String> zeppelinParamsMap = new HashMap<>();
if (parameters != null) {
ObjectMapper mapper = new ObjectMapper();
@@ -88,6 +91,16 @@ public class ZeppelinTask extends AbstractTaskExecutor {
// Submit zeppelin task
String resultContent;
Status status = Status.FINISHED;
+ // If in production, clone the note and run the cloned one for stability
+ if (productionNoteDirectory != null) {
+ final String cloneNotePath = String.format(
+ "%s%s_%s",
+ productionNoteDirectory,
+ noteId,
+ DateUtils.getTimestampString());
+ noteId = this.zClient.cloneNote(noteId, cloneNotePath);
+ }
+
if (paragraphId == null) {
final NoteResult noteResult = this.zClient.executeNote(noteId, zeppelinParamsMap);
final List<ParagraphResult> paragraphResultList = noteResult.getParagraphResultList();
@@ -105,6 +118,7 @@ public class ZeppelinTask extends AbstractTaskExecutor {
break;
}
}
+
resultContent = resultContentBuilder.toString();
} else {
final ParagraphResult paragraphResult = this.zClient.executeParagraph(noteId, paragraphId, zeppelinParamsMap);
@@ -112,6 +126,11 @@ public class ZeppelinTask extends AbstractTaskExecutor {
status = paragraphResult.getStatus();
}
+ // Delete cloned note
+ if (productionNoteDirectory != null) {
+ this.zClient.deleteNote(noteId);
+ }
+
// Use noteId-paragraph-Id as app id
final int exitStatusCode = mapStatusToExitCode(status);
setAppIds(String.format("%s-%s", noteId, paragraphId));
@@ -121,6 +140,7 @@ public class ZeppelinTask extends AbstractTaskExecutor {
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
logger.error("zeppelin task submit failed with error", e);
}
+
}
/**
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
index 4670e87872..397d68347e 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
@@ -29,6 +29,7 @@ import static org.powermock.api.mockito.PowerMockito.when;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.spi.utils.DateUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
@@ -51,9 +52,10 @@ import java.util.Map;
@RunWith(PowerMockRunner.class)
@PrepareForTest({
- ZeppelinTask.class,
- ZeppelinClient.class,
- ObjectMapper.class,
+ ZeppelinTask.class,
+ ZeppelinClient.class,
+ ObjectMapper.class,
+ DateUtils.class
})
@PowerMockIgnore({"javax.*"})
public class ZeppelinTaskTest {
@@ -62,6 +64,8 @@ public class ZeppelinTaskTest {
private static final String MOCK_PARAGRAPH_ID = "paragraph_1648793472526_1771221396";
private static final String MOCK_PARAMETERS = "{\"key1\": \"value1\", \"key2\": \"value2\"}";
private static final String MOCK_REST_ENDPOINT = "localhost:8080";
+ private static final String MOCK_CLONE_NOTE_ID = "3GYJR92R8";
+ private static final String MOCK_PRODUCTION_DIRECTORY = "/prod/";
private final ObjectMapper mapper = new ObjectMapper();
private ZeppelinClient zClient;
@@ -161,6 +165,37 @@ public class ZeppelinTaskTest {
Assert.assertEquals(EXIT_CODE_SUCCESS, this.zeppelinTask.getExitStatusCode());
}
+ @Test
+ public void testHandleWithNoteExecutionSuccessWithProductionSetting() throws Exception {
+ String zeppelinParametersWithNoParagraphId = buildZeppelinTaskParametersWithProductionSetting();
+ TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
+ PowerMockito.mockStatic(DateUtils.class);
+ when(taskExecutionContext.getTaskParams()).thenReturn(zeppelinParametersWithNoParagraphId);
+ this.zeppelinTask = spy(new ZeppelinTask(taskExecutionContext));
+
+ // mock zClient and note result
+ this.zClient = mock(ZeppelinClient.class);
+ this.noteResult = mock(NoteResult.class);
+
+ // use mocked zClient in zeppelinTask
+ doReturn(this.zClient).when(this.zeppelinTask, "getZeppelinClient");
+ when(this.zClient.cloneNote(any(String.class), any(String.class))).thenReturn(MOCK_CLONE_NOTE_ID);
+ when(this.zClient.executeNote(any(), any(Map.class))).thenReturn(this.noteResult);
+ when(paragraphResult.getResultInText()).thenReturn("mock-zeppelin-paragraph-execution-result");
+ this.zeppelinTask.init();
+ when(this.paragraphResult.getStatus()).thenReturn(Status.FINISHED);
+ when(DateUtils.getTimestampString()).thenReturn("123456789");
+ this.zeppelinTask.handle();
+ Mockito.verify(this.zClient).cloneNote(
+ MOCK_NOTE_ID,
+ String.format("%s%s_%s", MOCK_PRODUCTION_DIRECTORY, MOCK_NOTE_ID, "123456789"));
+ Mockito.verify(this.zClient).executeNote(MOCK_CLONE_NOTE_ID,
+ (Map<String, String>) mapper.readValue(MOCK_PARAMETERS, Map.class));
+ Mockito.verify(this.noteResult).getParagraphResultList();
+ Mockito.verify(this.zClient).deleteNote(MOCK_CLONE_NOTE_ID);
+ Assert.assertEquals(EXIT_CODE_SUCCESS, this.zeppelinTask.getExitStatusCode());
+ }
+
private String buildZeppelinTaskParameters() {
ZeppelinParameters zeppelinParameters = new ZeppelinParameters();
zeppelinParameters.setNoteId(MOCK_NOTE_ID);
@@ -179,4 +214,15 @@ public class ZeppelinTaskTest {
return JSONUtils.toJsonString(zeppelinParameters);
}
+
+ private String buildZeppelinTaskParametersWithProductionSetting() {
+ ZeppelinParameters zeppelinParameters = new ZeppelinParameters();
+ zeppelinParameters.setNoteId(MOCK_NOTE_ID);
+ zeppelinParameters.setParameters(MOCK_PARAMETERS);
+ zeppelinParameters.setRestEndpoint(MOCK_REST_ENDPOINT);
+ zeppelinParameters.setProductionNoteDirectory(MOCK_PRODUCTION_DIRECTORY);
+
+ return JSONUtils.toJsonString(zeppelinParameters);
+ }
+
}
\ No newline at end of file
diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts b/dolphinscheduler-ui/src/locales/en_US/project.ts
index 2ab410ed2d..dc7257420c 100644
--- a/dolphinscheduler-ui/src/locales/en_US/project.ts
+++ b/dolphinscheduler-ui/src/locales/en_US/project.ts
@@ -621,6 +621,8 @@ export default {
zeppelin_parameters_tips: 'Please enter the parameters for zeppelin dynamic form',
zeppelin_rest_endpoint: 'zeppelinRestEndpoint',
zeppelin_rest_endpoint_tips: 'Please enter the rest endpoint of your Zeppelin server',
+ zeppelin_production_note_directory: 'Directory for cloned zeppelin note in production mode',
+ zeppelin_production_note_directory_tips: 'Please enter the production note directory to enable production mode',
jupyter_conda_env_name: 'condaEnvName',
jupyter_conda_env_name_tips:
'Please enter the conda environment name of papermill',
diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
index d6a4758f77..5be44bf3c2 100644
--- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts
+++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
@@ -608,6 +608,8 @@ export default {
zeppelin_note_id: 'zeppelinNoteId',
zeppelin_note_id_tips: '请输入zeppelin note id',
zeppelin_paragraph_id: 'zeppelinParagraphId',
+ zeppelin_production_note_directory: '生产模式下存放克隆note的目录',
+ zeppelin_production_note_directory_tips: '请输入生产环境note目录以启用生产模式',
zeppelin_paragraph_id_tips: '请输入zeppelin paragraph id',
zeppelin_parameters: 'parameters',
zeppelin_parameters_tips: '请输入zeppelin dynamic form参数',
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts
index a3077dbd2f..db16b9bbe1 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts
@@ -64,6 +64,14 @@ export function useZeppelin(model: { [field: string]: any }): IJsonItem[] {
}
}
},
+ {
+ type: 'input',
+ field: 'zeppelinProductionNoteDirectory',
+ name: t('project.node.zeppelin_production_note_directory'),
+ props: {
+ placeholder: t('project.node.zeppelin_production_note_directory_tips')
+ }
+ },
{
type: 'input',
field: 'parameters',
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
index 20209639e5..7a3878bc58 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
@@ -330,6 +330,7 @@ export function formatParams(data: INodeData): {
taskParams.noteId = data.zeppelinNoteId
taskParams.paragraphId = data.zeppelinParagraphId
taskParams.restEndpoint = data.zeppelinRestEndpoint
+ taskParams.productionNoteDirectory = data.zeppelinProductionNoteDirectory
taskParams.parameters = data.parameters
}
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
index ced5d43759..268d8635b8 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
@@ -300,6 +300,8 @@ interface ITaskParams {
zeppelinParagraphId?: string
zeppelinRestEndpoint?: string
restEndpoint?: string
+ zeppelinProductionNoteDirectory?: string
+ productionNoteDirectory?: string
noteId?: string
paragraphId?: string
condaEnvName?: string