You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ke...@apache.org on 2022/07/15 01:26:35 UTC

[dolphinscheduler] branch dev updated: [Feature][Task Plugin] Enable users to switch endpoints in zeppelin tasks (#10925)

This is an automated email from the ASF dual-hosted git repository.

kerwin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new a38fa34d43 [Feature][Task Plugin] Enable users to switch endpoints in zeppelin tasks (#10925)
a38fa34d43 is described below

commit a38fa34d43b2c39cda955bd8d73b1148b6decdd6
Author: Eric Gao <er...@gmail.com>
AuthorDate: Fri Jul 15 09:26:28 2022 +0800

    [Feature][Task Plugin] Enable users to switch endpoints in zeppelin tasks (#10925)
    
    * [Feature][Task Plugin] Enable users to switch endpoints in zeppelin tasks (#9814)
---
 docs/docs/en/guide/resource/configuration.md       |  3 ---
 docs/docs/en/guide/task/zeppelin.md                |  1 +
 docs/docs/zh/guide/task/zeppelin.md                |  1 +
 .../src/main/resources/common.properties           |  3 ---
 .../plugin/task/api/TaskConstants.java             |  4 ----
 .../plugin/task/zeppelin/ZeppelinParameters.java   | 22 +++++++++++++-----
 .../plugin/task/zeppelin/ZeppelinTask.java         | 26 +++++++++++-----------
 .../plugin/task/zeppelin/ZeppelinTaskTest.java     |  3 +++
 dolphinscheduler-ui/src/locales/en_US/project.ts   |  4 ++++
 dolphinscheduler-ui/src/locales/zh_CN/project.ts   |  6 +++--
 .../task/components/node/fields/use-zeppelin.ts    | 17 ++++++++++++++
 .../projects/task/components/node/format-data.ts   |  1 +
 .../views/projects/task/components/node/types.ts   |  2 ++
 13 files changed, 62 insertions(+), 31 deletions(-)

diff --git a/docs/docs/en/guide/resource/configuration.md b/docs/docs/en/guide/resource/configuration.md
index 4263354197..5614d3c359 100644
--- a/docs/docs/en/guide/resource/configuration.md
+++ b/docs/docs/en/guide/resource/configuration.md
@@ -121,9 +121,6 @@ development.state=false
 # rpc port
 alert.rpc.port=50052
 
-# Url endpoint for zeppelin RESTful API
-zeppelin.rest.url=http://localhost:8080
-
 # set path of conda.sh
 conda.path=/opt/anaconda3/etc/profile.d/conda.sh
 
diff --git a/docs/docs/en/guide/task/zeppelin.md b/docs/docs/en/guide/task/zeppelin.md
index fd40b2fc49..c88c0b92b8 100644
--- a/docs/docs/en/guide/task/zeppelin.md
+++ b/docs/docs/en/guide/task/zeppelin.md
@@ -26,6 +26,7 @@ it will call `Zeppelin Client API` to trigger zeppelin notebook paragraph. Click
 | Timeout alarm | Check Timeout Alarm and Timeout Failure. When the task exceeds the "timeout duration", an alarm email will be sent and the task execution will fail. |
 | Zeppelin Note ID | The unique note id for a zeppelin notebook note. |
 | Zeppelin Paragraph ID | The unique paragraph id for a zeppelin notebook paragraph. If you want to schedule a whole note at a time, leave this field blank. |
+| Zeppelin Rest Endpoint | The REST endpoint of your zeppelin server |
 | Zeppelin Parameters | Parameters in json format used for zeppelin dynamic form. |
 
 ## Task Example
diff --git a/docs/docs/zh/guide/task/zeppelin.md b/docs/docs/zh/guide/task/zeppelin.md
index 18d36dfb71..eb802d5cfb 100644
--- a/docs/docs/zh/guide/task/zeppelin.md
+++ b/docs/docs/zh/guide/task/zeppelin.md
@@ -23,6 +23,7 @@
 - 前置任务:选择当前任务的前置任务,会将被选择的前置任务设置为当前任务的上游。
 - Zeppelin Note ID:Zeppelin Note对应的唯一ID。
 - Zeppelin Paragraph ID:Zeppelin Paragraph对应的唯一ID。如果你想一次性调度整个note,这一栏不填即可。
+- Zeppelin Rest Endpoint:您的Zeppelin服务的REST Endpoint。
 - Zeppelin Parameters: 用于传入Zeppelin Dynamic Form的参数。
 
 ## Task Example
diff --git a/dolphinscheduler-common/src/main/resources/common.properties b/dolphinscheduler-common/src/main/resources/common.properties
index 97355f8811..3b12d7c606 100644
--- a/dolphinscheduler-common/src/main/resources/common.properties
+++ b/dolphinscheduler-common/src/main/resources/common.properties
@@ -101,9 +101,6 @@ development.state=false
 # rpc port
 alert.rpc.port=50052
 
-# Url endpoint for zeppelin RESTful API
-zeppelin.rest.url=http://localhost:8080
-
 # set path of conda.sh
 conda.path=/opt/anaconda3/etc/profile.d/conda.sh
 
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
index 1df63f8a27..03010825fa 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
@@ -421,10 +421,6 @@ public class TaskConstants {
     public static final int LOG_LINES = 500;
     public static final String NAMESPACE_NAME = "name";
     public static final String CLUSTER = "cluster";
-    /**
-     * zeppelin config
-     */
-    public static final String ZEPPELIN_REST_URL = "zeppelin.rest.url";
 
     /**
      * conda config used by jupyter task plugin
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java
index 4ae64b69de..0c67e4f69b 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java
@@ -32,11 +32,12 @@ public class ZeppelinParameters extends AbstractParameters {
      */
     private String noteId;
     private String paragraphId;
+    private String restEndpoint;
     private String parameters;
 
     @Override
     public boolean checkParameters() {
-        return StringUtils.isNotEmpty(this.noteId);
+        return StringUtils.isNotEmpty(this.noteId) && StringUtils.isNotEmpty(this.restEndpoint);
     }
 
     @Override
@@ -68,13 +69,22 @@ public class ZeppelinParameters extends AbstractParameters {
         this.parameters = parameters;
     }
 
+    public String getRestEndpoint() {
+        return restEndpoint;
+    }
+
+    public void setRestEndpoint(String restEndpoint) {
+        this.restEndpoint = restEndpoint;
+    }
+
     @Override
     public String toString() {
-        return "ZeppelinParameters{" +
-                "noteId='" + noteId + '\'' +
-                ", paragraphId='" + paragraphId + '\'' +
-                ", parameters='" + parameters + '\'' +
-                '}';
+        return "ZeppelinParameters{"
+               + "noteId='" + noteId + '\''
+               + ", paragraphId='" + paragraphId + '\''
+               + ", restEndpoint='" + restEndpoint + '\''
+               + ", parameters='" + parameters + '\''
+               + '}';
     }
 
 }
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
index 062b56f503..ca850dcbbc 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
@@ -24,7 +24,6 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
-import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
 import org.apache.zeppelin.client.ClientConfig;
 import org.apache.zeppelin.client.NoteResult;
 import org.apache.zeppelin.client.ParagraphResult;
@@ -77,9 +76,9 @@ public class ZeppelinTask extends AbstractTaskExecutor {
     @Override
     public void handle() throws Exception {
         try {
-            String noteId = this.zeppelinParameters.getNoteId();
-            String paragraphId = this.zeppelinParameters.getParagraphId();
-            String parameters = this.zeppelinParameters.getParameters();
+            final String noteId = this.zeppelinParameters.getNoteId();
+            final String paragraphId = this.zeppelinParameters.getParagraphId();
+            final String parameters = this.zeppelinParameters.getParameters();
             Map<String, String> zeppelinParamsMap = new HashMap<>();
             if (parameters != null) {
                 ObjectMapper mapper = new ObjectMapper();
@@ -90,8 +89,8 @@ public class ZeppelinTask extends AbstractTaskExecutor {
             String resultContent;
             Status status = Status.FINISHED;
             if (paragraphId == null) {
-                NoteResult noteResult = this.zClient.executeNote(noteId, zeppelinParamsMap);
-                List<ParagraphResult> paragraphResultList = noteResult.getParagraphResultList();
+                final NoteResult noteResult = this.zClient.executeNote(noteId, zeppelinParamsMap);
+                final List<ParagraphResult> paragraphResultList = noteResult.getParagraphResultList();
                 StringBuilder resultContentBuilder = new StringBuilder();
                 for (ParagraphResult paragraphResult : paragraphResultList) {
                     resultContentBuilder.append(
@@ -108,7 +107,7 @@ public class ZeppelinTask extends AbstractTaskExecutor {
                 }
                 resultContent = resultContentBuilder.toString();
             } else {
-                ParagraphResult paragraphResult = this.zClient.executeParagraph(noteId, paragraphId, zeppelinParamsMap);
+                final ParagraphResult paragraphResult = this.zClient.executeParagraph(noteId, paragraphId, zeppelinParamsMap);
                 resultContent = paragraphResult.getResultInText();
                 status = paragraphResult.getStatus();
             }
@@ -130,12 +129,12 @@ public class ZeppelinTask extends AbstractTaskExecutor {
      * @return ZeppelinClient
      */
     private ZeppelinClient getZeppelinClient() {
-        final String zeppelinRestUrl = PropertyUtils.getString(TaskConstants.ZEPPELIN_REST_URL);
-        ClientConfig clientConfig = new ClientConfig(zeppelinRestUrl);
+        final String restEndpoint = zeppelinParameters.getRestEndpoint();
+        final ClientConfig clientConfig = new ClientConfig(restEndpoint);
         ZeppelinClient zClient = null;
         try {
             zClient = new ZeppelinClient(clientConfig);
-            String zeppelinVersion = zClient.getVersion();
+            final String zeppelinVersion = zClient.getVersion();
             logger.info("zeppelin version: {}", zeppelinVersion);
         } catch (Exception e) {
             // TODO: complete error handling
@@ -168,14 +167,15 @@ public class ZeppelinTask extends AbstractTaskExecutor {
 
     @Override
     public void cancelApplication(boolean status) throws Exception {
+        final String restEndpoint = this.zeppelinParameters.getRestEndpoint();
         super.cancelApplication(status);
-        String noteId = this.zeppelinParameters.getNoteId();
-        String paragraphId = this.zeppelinParameters.getParagraphId();
+        final String noteId = this.zeppelinParameters.getNoteId();
+        final String paragraphId = this.zeppelinParameters.getParagraphId();
         if (paragraphId == null) {
             logger.info("trying terminate zeppelin task, taskId: {}, noteId: {}",
                     this.taskExecutionContext.getTaskInstanceId(),
                     noteId);
-            Unirest.config().defaultBaseUrl(PropertyUtils.getString(TaskConstants.ZEPPELIN_REST_URL) + "/api");
+            Unirest.config().defaultBaseUrl(restEndpoint + "/api");
             Unirest.delete("/notebook/job/{noteId}").routeParam("noteId", noteId).asJson();
             logger.info("zeppelin task terminated, taskId: {}, noteId: {}",
                     this.taskExecutionContext.getTaskInstanceId(),
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
index 96d7466343..4670e87872 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
@@ -61,6 +61,7 @@ public class ZeppelinTaskTest {
     private static final String MOCK_NOTE_ID = "2GYJR92R7";
     private static final String MOCK_PARAGRAPH_ID = "paragraph_1648793472526_1771221396";
     private static final String MOCK_PARAMETERS = "{\"key1\": \"value1\", \"key2\": \"value2\"}";
+    private static final String MOCK_REST_ENDPOINT = "localhost:8080";
     private final ObjectMapper mapper = new ObjectMapper();
 
     private ZeppelinClient zClient;
@@ -164,6 +165,7 @@ public class ZeppelinTaskTest {
         ZeppelinParameters zeppelinParameters = new ZeppelinParameters();
         zeppelinParameters.setNoteId(MOCK_NOTE_ID);
         zeppelinParameters.setParagraphId(MOCK_PARAGRAPH_ID);
+        zeppelinParameters.setRestEndpoint(MOCK_REST_ENDPOINT);
         zeppelinParameters.setParameters(MOCK_PARAMETERS);
 
         return JSONUtils.toJsonString(zeppelinParameters);
@@ -173,6 +175,7 @@ public class ZeppelinTaskTest {
         ZeppelinParameters zeppelinParameters = new ZeppelinParameters();
         zeppelinParameters.setNoteId(MOCK_NOTE_ID);
         zeppelinParameters.setParameters(MOCK_PARAMETERS);
+        zeppelinParameters.setRestEndpoint(MOCK_REST_ENDPOINT);
 
         return JSONUtils.toJsonString(zeppelinParameters);
     }
diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts b/dolphinscheduler-ui/src/locales/en_US/project.ts
index 3f7bc898ed..2ab410ed2d 100644
--- a/dolphinscheduler-ui/src/locales/en_US/project.ts
+++ b/dolphinscheduler-ui/src/locales/en_US/project.ts
@@ -617,6 +617,10 @@ export default {
     zeppelin_paragraph_id: 'zeppelinParagraphId',
     zeppelin_paragraph_id_tips:
       'Please enter the paragraph id of your zeppelin paragraph',
+    zeppelin_parameters: 'parameters',
+    zeppelin_parameters_tips: 'Please enter the parameters for zeppelin dynamic form',
+    zeppelin_rest_endpoint: 'zeppelinRestEndpoint',
+    zeppelin_rest_endpoint_tips: 'Please enter the rest endpoint of your Zeppelin server',
     jupyter_conda_env_name: 'condaEnvName',
     jupyter_conda_env_name_tips:
       'Please enter the conda environment name of papermill',
diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
index 8e918a6ec1..d6a4758f77 100644
--- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts
+++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
@@ -605,12 +605,14 @@ export default {
     emr_steps_define_json_tips: '请输入EMR步骤定义',
     segment_separator: '分段执行符号',
     segment_separator_tips: '请输入分段执行符号',
-    zeppelin_note_id: 'zeppelin_note_id',
+    zeppelin_note_id: 'zeppelinNoteId',
     zeppelin_note_id_tips: '请输入zeppelin note id',
-    zeppelin_paragraph_id: 'zeppelin_paragraph_id',
+    zeppelin_paragraph_id: 'zeppelinParagraphId',
     zeppelin_paragraph_id_tips: '请输入zeppelin paragraph id',
     zeppelin_parameters: 'parameters',
     zeppelin_parameters_tips: '请输入zeppelin dynamic form参数',
+    zeppelin_rest_endpoint: 'zeppelinRestEndpoint',
+    zeppelin_rest_endpoint_tips: '请输入zeppelin server的rest endpoint',
     jupyter_conda_env_name: 'condaEnvName',
     jupyter_conda_env_name_tips: '请输入papermill所在的conda环境名',
     jupyter_input_note_path: 'inputNotePath',
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts
index 5cbc1324f9..a3077dbd2f 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts
@@ -47,6 +47,23 @@ export function useZeppelin(model: { [field: string]: any }): IJsonItem[] {
         placeholder: t('project.node.zeppelin_paragraph_id_tips')
       }
     },
+    {
+      type: 'input',
+      field: 'zeppelinRestEndpoint',
+      name: t('project.node.zeppelin_rest_endpoint'),
+      props: {
+        placeholder: t('project.node.zeppelin_rest_endpoint_tips')
+      },
+      validate: {
+        trigger: ['input', 'blur'],
+        required: true,
+        validator(validate: any, value: string) {
+          if (!value) {
+            return new Error(t('project.node.zeppelin_rest_endpoint_tips'))
+          }
+        }
+      }
+    },
     {
       type: 'input',
       field: 'parameters',
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
index 4ab18ccd05..20209639e5 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
@@ -329,6 +329,7 @@ export function formatParams(data: INodeData): {
   if (data.taskType === 'ZEPPELIN') {
     taskParams.noteId = data.zeppelinNoteId
     taskParams.paragraphId = data.zeppelinParagraphId
+    taskParams.restEndpoint = data.zeppelinRestEndpoint
     taskParams.parameters = data.parameters
   }
 
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
index 187ff7532f..ced5d43759 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
@@ -298,6 +298,8 @@ interface ITaskParams {
   stepsDefineJson?: string
   zeppelinNoteId?: string
   zeppelinParagraphId?: string
+  zeppelinRestEndpoint?: string
+  restEndpoint?: string
   noteId?: string
   paragraphId?: string
   condaEnvName?: string