You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by wa...@apache.org on 2022/07/20 06:16:53 UTC

[dolphinscheduler] branch dev updated: [Feature][Task Plugin] Increase zeppelin task stability in production (#11010)

This is an automated email from the ASF dual-hosted git repository.

wanggenhua pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new f689220290 [Feature][Task Plugin] Increase zeppelin task stability in production (#11010)
f689220290 is described below

commit f689220290a13d5ce3b451e65a475c0444e7a047
Author: Eric Gao <er...@gmail.com>
AuthorDate: Wed Jul 20 14:16:47 2022 +0800

    [Feature][Task Plugin] Increase zeppelin task stability in production (#11010)
    
    * [Feature][Task Plugin] Increase zeppelin task stability in production (#10584)
    
    * Add front-end and update docs for the production mode of zeppelin task
    
    * Fix minor front-end bug of zeppelin task plugin
    
    * Refactor ZeppelinParameters with lombok
    
    * Fix formatting
    
    * Replace @Data with @Getter, @Setter, @ToString to avoid decrease in test coverage
---
 docs/docs/en/guide/task/zeppelin.md                | 12 +++++
 docs/docs/zh/guide/task/zeppelin.md                | 11 +++++
 .../plugin/task/zeppelin/ZeppelinParameters.java   | 49 +++-----------------
 .../plugin/task/zeppelin/ZeppelinTask.java         | 22 ++++++++-
 .../plugin/task/zeppelin/ZeppelinTaskTest.java     | 52 ++++++++++++++++++++--
 dolphinscheduler-ui/src/locales/en_US/project.ts   |  2 +
 dolphinscheduler-ui/src/locales/zh_CN/project.ts   |  2 +
 .../task/components/node/fields/use-zeppelin.ts    |  8 ++++
 .../projects/task/components/node/format-data.ts   |  1 +
 .../views/projects/task/components/node/types.ts   |  2 +
 10 files changed, 115 insertions(+), 46 deletions(-)

diff --git a/docs/docs/en/guide/task/zeppelin.md b/docs/docs/en/guide/task/zeppelin.md
index c88c0b92b8..8c8de9dae6 100644
--- a/docs/docs/en/guide/task/zeppelin.md
+++ b/docs/docs/en/guide/task/zeppelin.md
@@ -26,9 +26,21 @@ it will call `Zeppelin Client API` to trigger zeppelin notebook paragraph. Click
 | Timeout alarm | Check Timeout Alarm and Timeout Failure. When the task exceeds the "timeout duration", an alarm email will be sent and the task execution will fail. |
 | Zeppelin Note ID | The unique note id for a zeppelin notebook note. |
 | Zeppelin Paragraph ID | The unique paragraph id for a zeppelin notebook paragraph. If you want to schedule a whole note at a time, leave this field blank. |
+| Zeppelin Production Note Directory | The directory for cloned note in production mode. |
 | Zeppelin Rest Endpoint | The REST endpoint of your zeppelin server |
 | Zeppelin Parameters | Parameters in json format used for zeppelin dynamic form. |
 
+## Production (Clone) Mode
+
+- Fill in the optional `Zeppelin Production Note Directory` parameter to enable `Production Mode`.
+- In `Production Mode`, the target note gets copied to the `Zeppelin Production Note Directory` you choose. 
+`Zeppelin Task Plugin` will execute the cloned note instead of the original one. Once execution done, 
+`Zeppelin Task Plugin` will delete the cloned note automatically. 
+Therefore, it increases the stability as the modification to a running note triggered by `Dolphin Scheduler` 
+will not affect the production task.
+- If you leave the `Zeppelin Production Note Directory` empty, `Zeppelin Task Plugin` will execute the original note.
+- 'Zeppelin Production Note Directory' should both start and end with a `slash`. e.g. `/production_note_directory/`  
+
 ## Task Example
 
 ### Zeppelin Paragraph Task Example
diff --git a/docs/docs/zh/guide/task/zeppelin.md b/docs/docs/zh/guide/task/zeppelin.md
index eb802d5cfb..ac3bebc690 100644
--- a/docs/docs/zh/guide/task/zeppelin.md
+++ b/docs/docs/zh/guide/task/zeppelin.md
@@ -24,8 +24,19 @@
 - Zeppelin Note ID:Zeppelin Note对应的唯一ID。
 - Zeppelin Paragraph ID:Zeppelin Paragraph对应的唯一ID。如果你想一次性调度整个note,这一栏不填即可。
 - Zeppelin Rest Endpoint:您的Zeppelin服务的REST Endpoint。
+- Zeppelin Production Note Directory:生产模式下存放克隆note的目录。
 - Zeppelin Parameters: 用于传入Zeppelin Dynamic Form的参数。
 
+## 生产(克隆)模式
+
+- 填上`Zeppelin Production Note Directory`参数以启动`生产模式`。
+- 在`生产模式`下,目标note会被克隆到您所填的`Zeppelin Production Note Directory`目录下。 
+`Zeppelin任务插件`将会执行克隆出来的note并在执行成功后自动清除它。 
+因为在此模式下,如果您不小心修改了正在被`Dolphin Scheduler`调度的note,也不会影响到生产任务的执行,
+从而提高了稳定性。 
+- 如果您选择不填`Zeppelin Production Note Directory`这个参数,`Zeppelin任务插件`将会执行您的原始note。
+'Zeppelin Production Note Directory'参数在格式上应该以`斜杠`开头和结尾,例如 `/production_note_directory/`。
+
 ## Task Example
 
 ### Zeppelin Paragraph Task Example
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java
index 0c67e4f69b..b95c93971f 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java
@@ -17,6 +17,9 @@
 
 package org.apache.dolphinscheduler.plugin.task.zeppelin;
 
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
 import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
 import org.apache.dolphinscheduler.spi.utils.StringUtils;
@@ -24,6 +27,9 @@ import org.apache.dolphinscheduler.spi.utils.StringUtils;
 import java.util.Collections;
 import java.util.List;
 
+@Getter
+@Setter
+@ToString
 public class ZeppelinParameters extends AbstractParameters {
 
     /**
@@ -33,6 +39,7 @@ public class ZeppelinParameters extends AbstractParameters {
     private String noteId;
     private String paragraphId;
     private String restEndpoint;
+    private String productionNoteDirectory;
     private String parameters;
 
     @Override
@@ -45,46 +52,4 @@ public class ZeppelinParameters extends AbstractParameters {
         return Collections.emptyList();
     }
 
-    public String getNoteId() {
-        return noteId;
-    }
-
-    public void setNoteId(String noteId) {
-        this.noteId = noteId;
-    }
-
-    public String getParagraphId() {
-        return paragraphId;
-    }
-
-    public void setParagraphId(String paragraphId) {
-        this.paragraphId = paragraphId;
-    }
-
-    public String getParameters() {
-        return parameters;
-    }
-
-    public void setParameters(String parameters) {
-        this.parameters = parameters;
-    }
-
-    public String getRestEndpoint() {
-        return restEndpoint;
-    }
-
-    public void setRestEndpoint(String restEndpoint) {
-        this.restEndpoint = restEndpoint;
-    }
-
-    @Override
-    public String toString() {
-        return "ZeppelinParameters{"
-               + "noteId='" + noteId + '\''
-               + ", paragraphId='" + paragraphId + '\''
-               + ", restEndpoint='" + restEndpoint + '\''
-               + ", parameters='" + parameters + '\''
-               + '}';
-    }
-
 }
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
index ca850dcbbc..4fe0120e24 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
@@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.spi.utils.DateUtils;
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
 import org.apache.zeppelin.client.ClientConfig;
 import org.apache.zeppelin.client.NoteResult;
@@ -76,9 +77,11 @@ public class ZeppelinTask extends AbstractTaskExecutor {
     @Override
     public void handle() throws Exception {
         try {
-            final String noteId = this.zeppelinParameters.getNoteId();
             final String paragraphId = this.zeppelinParameters.getParagraphId();
+            final String productionNoteDirectory = this.zeppelinParameters.getProductionNoteDirectory();
             final String parameters = this.zeppelinParameters.getParameters();
+            // noteId may be replaced with cloned noteId
+            String noteId = this.zeppelinParameters.getNoteId();
             Map<String, String> zeppelinParamsMap = new HashMap<>();
             if (parameters != null) {
                 ObjectMapper mapper = new ObjectMapper();
@@ -88,6 +91,16 @@ public class ZeppelinTask extends AbstractTaskExecutor {
             // Submit zeppelin task
             String resultContent;
             Status status = Status.FINISHED;
+            // If in production, clone the note and run the cloned one for stability
+            if (productionNoteDirectory != null) {
+                final String cloneNotePath = String.format(
+                        "%s%s_%s",
+                        productionNoteDirectory,
+                        noteId,
+                        DateUtils.getTimestampString());
+                noteId = this.zClient.cloneNote(noteId, cloneNotePath);
+            }
+
             if (paragraphId == null) {
                 final NoteResult noteResult = this.zClient.executeNote(noteId, zeppelinParamsMap);
                 final List<ParagraphResult> paragraphResultList = noteResult.getParagraphResultList();
@@ -105,6 +118,7 @@ public class ZeppelinTask extends AbstractTaskExecutor {
                         break;
                     }
                 }
+
                 resultContent = resultContentBuilder.toString();
             } else {
                 final ParagraphResult paragraphResult = this.zClient.executeParagraph(noteId, paragraphId, zeppelinParamsMap);
@@ -112,6 +126,11 @@ public class ZeppelinTask extends AbstractTaskExecutor {
                 status = paragraphResult.getStatus();
             }
 
+            // Delete cloned note
+            if (productionNoteDirectory != null) {
+                this.zClient.deleteNote(noteId);
+            }
+
             // Use noteId-paragraph-Id as app id
             final int exitStatusCode = mapStatusToExitCode(status);
             setAppIds(String.format("%s-%s", noteId, paragraphId));
@@ -121,6 +140,7 @@ public class ZeppelinTask extends AbstractTaskExecutor {
             setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
             logger.error("zeppelin task submit failed with error", e);
         }
+
     }
 
     /**
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
index 4670e87872..397d68347e 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
@@ -29,6 +29,7 @@ import static org.powermock.api.mockito.PowerMockito.when;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.spi.utils.DateUtils;
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
 
 
@@ -51,9 +52,10 @@ import java.util.Map;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({
-        ZeppelinTask.class,
-        ZeppelinClient.class,
-        ObjectMapper.class,
+    ZeppelinTask.class,
+    ZeppelinClient.class,
+    ObjectMapper.class,
+    DateUtils.class
 })
 @PowerMockIgnore({"javax.*"})
 public class ZeppelinTaskTest {
@@ -62,6 +64,8 @@ public class ZeppelinTaskTest {
     private static final String MOCK_PARAGRAPH_ID = "paragraph_1648793472526_1771221396";
     private static final String MOCK_PARAMETERS = "{\"key1\": \"value1\", \"key2\": \"value2\"}";
     private static final String MOCK_REST_ENDPOINT = "localhost:8080";
+    private static final String MOCK_CLONE_NOTE_ID = "3GYJR92R8";
+    private static final String MOCK_PRODUCTION_DIRECTORY = "/prod/";
     private final ObjectMapper mapper = new ObjectMapper();
 
     private ZeppelinClient zClient;
@@ -161,6 +165,37 @@ public class ZeppelinTaskTest {
         Assert.assertEquals(EXIT_CODE_SUCCESS, this.zeppelinTask.getExitStatusCode());
     }
 
+    @Test
+    public void testHandleWithNoteExecutionSuccessWithProductionSetting() throws Exception {
+        String zeppelinParametersWithNoParagraphId = buildZeppelinTaskParametersWithProductionSetting();
+        TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
+        PowerMockito.mockStatic(DateUtils.class);
+        when(taskExecutionContext.getTaskParams()).thenReturn(zeppelinParametersWithNoParagraphId);
+        this.zeppelinTask = spy(new ZeppelinTask(taskExecutionContext));
+
+        // mock zClient and note result
+        this.zClient = mock(ZeppelinClient.class);
+        this.noteResult = mock(NoteResult.class);
+
+        // use mocked zClient in zeppelinTask
+        doReturn(this.zClient).when(this.zeppelinTask, "getZeppelinClient");
+        when(this.zClient.cloneNote(any(String.class), any(String.class))).thenReturn(MOCK_CLONE_NOTE_ID);
+        when(this.zClient.executeNote(any(), any(Map.class))).thenReturn(this.noteResult);
+        when(paragraphResult.getResultInText()).thenReturn("mock-zeppelin-paragraph-execution-result");
+        this.zeppelinTask.init();
+        when(this.paragraphResult.getStatus()).thenReturn(Status.FINISHED);
+        when(DateUtils.getTimestampString()).thenReturn("123456789");
+        this.zeppelinTask.handle();
+        Mockito.verify(this.zClient).cloneNote(
+                MOCK_NOTE_ID,
+                String.format("%s%s_%s", MOCK_PRODUCTION_DIRECTORY, MOCK_NOTE_ID, "123456789"));
+        Mockito.verify(this.zClient).executeNote(MOCK_CLONE_NOTE_ID,
+                (Map<String, String>) mapper.readValue(MOCK_PARAMETERS, Map.class));
+        Mockito.verify(this.noteResult).getParagraphResultList();
+        Mockito.verify(this.zClient).deleteNote(MOCK_CLONE_NOTE_ID);
+        Assert.assertEquals(EXIT_CODE_SUCCESS, this.zeppelinTask.getExitStatusCode());
+    }
+
     private String buildZeppelinTaskParameters() {
         ZeppelinParameters zeppelinParameters = new ZeppelinParameters();
         zeppelinParameters.setNoteId(MOCK_NOTE_ID);
@@ -179,4 +214,15 @@ public class ZeppelinTaskTest {
 
         return JSONUtils.toJsonString(zeppelinParameters);
     }
+
+    private String buildZeppelinTaskParametersWithProductionSetting() {
+        ZeppelinParameters zeppelinParameters = new ZeppelinParameters();
+        zeppelinParameters.setNoteId(MOCK_NOTE_ID);
+        zeppelinParameters.setParameters(MOCK_PARAMETERS);
+        zeppelinParameters.setRestEndpoint(MOCK_REST_ENDPOINT);
+        zeppelinParameters.setProductionNoteDirectory(MOCK_PRODUCTION_DIRECTORY);
+
+        return JSONUtils.toJsonString(zeppelinParameters);
+    }
+
 }
\ No newline at end of file
diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts b/dolphinscheduler-ui/src/locales/en_US/project.ts
index 2ab410ed2d..dc7257420c 100644
--- a/dolphinscheduler-ui/src/locales/en_US/project.ts
+++ b/dolphinscheduler-ui/src/locales/en_US/project.ts
@@ -621,6 +621,8 @@ export default {
     zeppelin_parameters_tips: 'Please enter the parameters for zeppelin dynamic form',
     zeppelin_rest_endpoint: 'zeppelinRestEndpoint',
     zeppelin_rest_endpoint_tips: 'Please enter the rest endpoint of your Zeppelin server',
+    zeppelin_production_note_directory: 'Directory for cloned zeppelin note in production mode',
+    zeppelin_production_note_directory_tips: 'Please enter the production note directory to enable production mode',
     jupyter_conda_env_name: 'condaEnvName',
     jupyter_conda_env_name_tips:
       'Please enter the conda environment name of papermill',
diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
index d6a4758f77..5be44bf3c2 100644
--- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts
+++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
@@ -608,6 +608,8 @@ export default {
     zeppelin_note_id: 'zeppelinNoteId',
     zeppelin_note_id_tips: '请输入zeppelin note id',
     zeppelin_paragraph_id: 'zeppelinParagraphId',
+    zeppelin_production_note_directory: '生产模式下存放克隆note的目录',
+    zeppelin_production_note_directory_tips: '请输入生产环境note目录以启用生产模式',
     zeppelin_paragraph_id_tips: '请输入zeppelin paragraph id',
     zeppelin_parameters: 'parameters',
     zeppelin_parameters_tips: '请输入zeppelin dynamic form参数',
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts
index a3077dbd2f..db16b9bbe1 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts
@@ -64,6 +64,14 @@ export function useZeppelin(model: { [field: string]: any }): IJsonItem[] {
         }
       }
     },
+    {
+      type: 'input',
+      field: 'zeppelinProductionNoteDirectory',
+      name: t('project.node.zeppelin_production_note_directory'),
+      props: {
+        placeholder: t('project.node.zeppelin_production_note_directory_tips')
+      }
+    },
     {
       type: 'input',
       field: 'parameters',
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
index 20209639e5..7a3878bc58 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
@@ -330,6 +330,7 @@ export function formatParams(data: INodeData): {
     taskParams.noteId = data.zeppelinNoteId
     taskParams.paragraphId = data.zeppelinParagraphId
     taskParams.restEndpoint = data.zeppelinRestEndpoint
+    taskParams.productionNoteDirectory = data.zeppelinProductionNoteDirectory
     taskParams.parameters = data.parameters
   }
 
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
index ced5d43759..268d8635b8 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
@@ -300,6 +300,8 @@ interface ITaskParams {
   zeppelinParagraphId?: string
   zeppelinRestEndpoint?: string
   restEndpoint?: string
+  zeppelinProductionNoteDirectory?: string
+  productionNoteDirectory?: string
   noteId?: string
   paragraphId?: string
   condaEnvName?: string