You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ke...@apache.org on 2022/07/15 01:26:35 UTC
[dolphinscheduler] branch dev updated: [Feature][Task Plugin] Enable users to switch endpoints in zeppelin tasks (#10925)
This is an automated email from the ASF dual-hosted git repository.
kerwin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new a38fa34d43 [Feature][Task Plugin] Enable users to switch endpoints in zeppelin tasks (#10925)
a38fa34d43 is described below
commit a38fa34d43b2c39cda955bd8d73b1148b6decdd6
Author: Eric Gao <er...@gmail.com>
AuthorDate: Fri Jul 15 09:26:28 2022 +0800
[Feature][Task Plugin] Enable users to switch endpoints in zeppelin tasks (#10925)
* [Feature][Task Plugin] Enable users to switch endpoints in zeppelin tasks (#9814)
---
docs/docs/en/guide/resource/configuration.md | 3 ---
docs/docs/en/guide/task/zeppelin.md | 1 +
docs/docs/zh/guide/task/zeppelin.md | 1 +
.../src/main/resources/common.properties | 3 ---
.../plugin/task/api/TaskConstants.java | 4 ----
.../plugin/task/zeppelin/ZeppelinParameters.java | 22 +++++++++++++-----
.../plugin/task/zeppelin/ZeppelinTask.java | 26 +++++++++++-----------
.../plugin/task/zeppelin/ZeppelinTaskTest.java | 3 +++
dolphinscheduler-ui/src/locales/en_US/project.ts | 4 ++++
dolphinscheduler-ui/src/locales/zh_CN/project.ts | 6 +++--
.../task/components/node/fields/use-zeppelin.ts | 17 ++++++++++++++
.../projects/task/components/node/format-data.ts | 1 +
.../views/projects/task/components/node/types.ts | 2 ++
13 files changed, 62 insertions(+), 31 deletions(-)
diff --git a/docs/docs/en/guide/resource/configuration.md b/docs/docs/en/guide/resource/configuration.md
index 4263354197..5614d3c359 100644
--- a/docs/docs/en/guide/resource/configuration.md
+++ b/docs/docs/en/guide/resource/configuration.md
@@ -121,9 +121,6 @@ development.state=false
# rpc port
alert.rpc.port=50052
-# Url endpoint for zeppelin RESTful API
-zeppelin.rest.url=http://localhost:8080
-
# set path of conda.sh
conda.path=/opt/anaconda3/etc/profile.d/conda.sh
diff --git a/docs/docs/en/guide/task/zeppelin.md b/docs/docs/en/guide/task/zeppelin.md
index fd40b2fc49..c88c0b92b8 100644
--- a/docs/docs/en/guide/task/zeppelin.md
+++ b/docs/docs/en/guide/task/zeppelin.md
@@ -26,6 +26,7 @@ it will call `Zeppelin Client API` to trigger zeppelin notebook paragraph. Click
| Timeout alarm | Check Timeout Alarm and Timeout Failure. When the task exceeds the "timeout duration", an alarm email will be sent and the task execution will fail. |
| Zeppelin Note ID | The unique note id for a zeppelin notebook note. |
| Zeppelin Paragraph ID | The unique paragraph id for a zeppelin notebook paragraph. If you want to schedule a whole note at a time, leave this field blank. |
+| Zeppelin Rest Endpoint | The REST endpoint of your zeppelin server |
| Zeppelin Parameters | Parameters in json format used for zeppelin dynamic form. |
## Task Example
diff --git a/docs/docs/zh/guide/task/zeppelin.md b/docs/docs/zh/guide/task/zeppelin.md
index 18d36dfb71..eb802d5cfb 100644
--- a/docs/docs/zh/guide/task/zeppelin.md
+++ b/docs/docs/zh/guide/task/zeppelin.md
@@ -23,6 +23,7 @@
- 前置任务:选择当前任务的前置任务,会将被选择的前置任务设置为当前任务的上游。
- Zeppelin Note ID:Zeppelin Note对应的唯一ID。
- Zeppelin Paragraph ID:Zeppelin Paragraph对应的唯一ID。如果你想一次性调度整个note,这一栏不填即可。
+- Zeppelin Rest Endpoint:您的Zeppelin服务的REST Endpoint。
- Zeppelin Parameters: 用于传入Zeppelin Dynamic Form的参数。
## Task Example
diff --git a/dolphinscheduler-common/src/main/resources/common.properties b/dolphinscheduler-common/src/main/resources/common.properties
index 97355f8811..3b12d7c606 100644
--- a/dolphinscheduler-common/src/main/resources/common.properties
+++ b/dolphinscheduler-common/src/main/resources/common.properties
@@ -101,9 +101,6 @@ development.state=false
# rpc port
alert.rpc.port=50052
-# Url endpoint for zeppelin RESTful API
-zeppelin.rest.url=http://localhost:8080
-
# set path of conda.sh
conda.path=/opt/anaconda3/etc/profile.d/conda.sh
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
index 1df63f8a27..03010825fa 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
@@ -421,10 +421,6 @@ public class TaskConstants {
public static final int LOG_LINES = 500;
public static final String NAMESPACE_NAME = "name";
public static final String CLUSTER = "cluster";
- /**
- * zeppelin config
- */
- public static final String ZEPPELIN_REST_URL = "zeppelin.rest.url";
/**
* conda config used by jupyter task plugin
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java
index 4ae64b69de..0c67e4f69b 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java
@@ -32,11 +32,12 @@ public class ZeppelinParameters extends AbstractParameters {
*/
private String noteId;
private String paragraphId;
+ private String restEndpoint;
private String parameters;
@Override
public boolean checkParameters() {
- return StringUtils.isNotEmpty(this.noteId);
+ return StringUtils.isNotEmpty(this.noteId) && StringUtils.isNotEmpty(this.restEndpoint);
}
@Override
@@ -68,13 +69,22 @@ public class ZeppelinParameters extends AbstractParameters {
this.parameters = parameters;
}
+ public String getRestEndpoint() {
+ return restEndpoint;
+ }
+
+ public void setRestEndpoint(String restEndpoint) {
+ this.restEndpoint = restEndpoint;
+ }
+
@Override
public String toString() {
- return "ZeppelinParameters{" +
- "noteId='" + noteId + '\'' +
- ", paragraphId='" + paragraphId + '\'' +
- ", parameters='" + parameters + '\'' +
- '}';
+ return "ZeppelinParameters{"
+ + "noteId='" + noteId + '\''
+ + ", paragraphId='" + paragraphId + '\''
+ + ", restEndpoint='" + restEndpoint + '\''
+ + ", parameters='" + parameters + '\''
+ + '}';
}
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
index 062b56f503..ca850dcbbc 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
@@ -24,7 +24,6 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
-import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
import org.apache.zeppelin.client.ClientConfig;
import org.apache.zeppelin.client.NoteResult;
import org.apache.zeppelin.client.ParagraphResult;
@@ -77,9 +76,9 @@ public class ZeppelinTask extends AbstractTaskExecutor {
@Override
public void handle() throws Exception {
try {
- String noteId = this.zeppelinParameters.getNoteId();
- String paragraphId = this.zeppelinParameters.getParagraphId();
- String parameters = this.zeppelinParameters.getParameters();
+ final String noteId = this.zeppelinParameters.getNoteId();
+ final String paragraphId = this.zeppelinParameters.getParagraphId();
+ final String parameters = this.zeppelinParameters.getParameters();
Map<String, String> zeppelinParamsMap = new HashMap<>();
if (parameters != null) {
ObjectMapper mapper = new ObjectMapper();
@@ -90,8 +89,8 @@ public class ZeppelinTask extends AbstractTaskExecutor {
String resultContent;
Status status = Status.FINISHED;
if (paragraphId == null) {
- NoteResult noteResult = this.zClient.executeNote(noteId, zeppelinParamsMap);
- List<ParagraphResult> paragraphResultList = noteResult.getParagraphResultList();
+ final NoteResult noteResult = this.zClient.executeNote(noteId, zeppelinParamsMap);
+ final List<ParagraphResult> paragraphResultList = noteResult.getParagraphResultList();
StringBuilder resultContentBuilder = new StringBuilder();
for (ParagraphResult paragraphResult : paragraphResultList) {
resultContentBuilder.append(
@@ -108,7 +107,7 @@ public class ZeppelinTask extends AbstractTaskExecutor {
}
resultContent = resultContentBuilder.toString();
} else {
- ParagraphResult paragraphResult = this.zClient.executeParagraph(noteId, paragraphId, zeppelinParamsMap);
+ final ParagraphResult paragraphResult = this.zClient.executeParagraph(noteId, paragraphId, zeppelinParamsMap);
resultContent = paragraphResult.getResultInText();
status = paragraphResult.getStatus();
}
@@ -130,12 +129,12 @@ public class ZeppelinTask extends AbstractTaskExecutor {
* @return ZeppelinClient
*/
private ZeppelinClient getZeppelinClient() {
- final String zeppelinRestUrl = PropertyUtils.getString(TaskConstants.ZEPPELIN_REST_URL);
- ClientConfig clientConfig = new ClientConfig(zeppelinRestUrl);
+ final String restEndpoint = zeppelinParameters.getRestEndpoint();
+ final ClientConfig clientConfig = new ClientConfig(restEndpoint);
ZeppelinClient zClient = null;
try {
zClient = new ZeppelinClient(clientConfig);
- String zeppelinVersion = zClient.getVersion();
+ final String zeppelinVersion = zClient.getVersion();
logger.info("zeppelin version: {}", zeppelinVersion);
} catch (Exception e) {
// TODO: complete error handling
@@ -168,14 +167,15 @@ public class ZeppelinTask extends AbstractTaskExecutor {
@Override
public void cancelApplication(boolean status) throws Exception {
+ final String restEndpoint = this.zeppelinParameters.getRestEndpoint();
super.cancelApplication(status);
- String noteId = this.zeppelinParameters.getNoteId();
- String paragraphId = this.zeppelinParameters.getParagraphId();
+ final String noteId = this.zeppelinParameters.getNoteId();
+ final String paragraphId = this.zeppelinParameters.getParagraphId();
if (paragraphId == null) {
logger.info("trying terminate zeppelin task, taskId: {}, noteId: {}",
this.taskExecutionContext.getTaskInstanceId(),
noteId);
- Unirest.config().defaultBaseUrl(PropertyUtils.getString(TaskConstants.ZEPPELIN_REST_URL) + "/api");
+ Unirest.config().defaultBaseUrl(restEndpoint + "/api");
Unirest.delete("/notebook/job/{noteId}").routeParam("noteId", noteId).asJson();
logger.info("zeppelin task terminated, taskId: {}, noteId: {}",
this.taskExecutionContext.getTaskInstanceId(),
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
index 96d7466343..4670e87872 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
@@ -61,6 +61,7 @@ public class ZeppelinTaskTest {
private static final String MOCK_NOTE_ID = "2GYJR92R7";
private static final String MOCK_PARAGRAPH_ID = "paragraph_1648793472526_1771221396";
private static final String MOCK_PARAMETERS = "{\"key1\": \"value1\", \"key2\": \"value2\"}";
+ private static final String MOCK_REST_ENDPOINT = "localhost:8080";
private final ObjectMapper mapper = new ObjectMapper();
private ZeppelinClient zClient;
@@ -164,6 +165,7 @@ public class ZeppelinTaskTest {
ZeppelinParameters zeppelinParameters = new ZeppelinParameters();
zeppelinParameters.setNoteId(MOCK_NOTE_ID);
zeppelinParameters.setParagraphId(MOCK_PARAGRAPH_ID);
+ zeppelinParameters.setRestEndpoint(MOCK_REST_ENDPOINT);
zeppelinParameters.setParameters(MOCK_PARAMETERS);
return JSONUtils.toJsonString(zeppelinParameters);
@@ -173,6 +175,7 @@ public class ZeppelinTaskTest {
ZeppelinParameters zeppelinParameters = new ZeppelinParameters();
zeppelinParameters.setNoteId(MOCK_NOTE_ID);
zeppelinParameters.setParameters(MOCK_PARAMETERS);
+ zeppelinParameters.setRestEndpoint(MOCK_REST_ENDPOINT);
return JSONUtils.toJsonString(zeppelinParameters);
}
diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts b/dolphinscheduler-ui/src/locales/en_US/project.ts
index 3f7bc898ed..2ab410ed2d 100644
--- a/dolphinscheduler-ui/src/locales/en_US/project.ts
+++ b/dolphinscheduler-ui/src/locales/en_US/project.ts
@@ -617,6 +617,10 @@ export default {
zeppelin_paragraph_id: 'zeppelinParagraphId',
zeppelin_paragraph_id_tips:
'Please enter the paragraph id of your zeppelin paragraph',
+ zeppelin_parameters: 'parameters',
+ zeppelin_parameters_tips: 'Please enter the parameters for zeppelin dynamic form',
+ zeppelin_rest_endpoint: 'zeppelinRestEndpoint',
+ zeppelin_rest_endpoint_tips: 'Please enter the rest endpoint of your Zeppelin server',
jupyter_conda_env_name: 'condaEnvName',
jupyter_conda_env_name_tips:
'Please enter the conda environment name of papermill',
diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
index 8e918a6ec1..d6a4758f77 100644
--- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts
+++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
@@ -605,12 +605,14 @@ export default {
emr_steps_define_json_tips: '请输入EMR步骤定义',
segment_separator: '分段执行符号',
segment_separator_tips: '请输入分段执行符号',
- zeppelin_note_id: 'zeppelin_note_id',
+ zeppelin_note_id: 'zeppelinNoteId',
zeppelin_note_id_tips: '请输入zeppelin note id',
- zeppelin_paragraph_id: 'zeppelin_paragraph_id',
+ zeppelin_paragraph_id: 'zeppelinParagraphId',
zeppelin_paragraph_id_tips: '请输入zeppelin paragraph id',
zeppelin_parameters: 'parameters',
zeppelin_parameters_tips: '请输入zeppelin dynamic form参数',
+ zeppelin_rest_endpoint: 'zeppelinRestEndpoint',
+ zeppelin_rest_endpoint_tips: '请输入zeppelin server的rest endpoint',
jupyter_conda_env_name: 'condaEnvName',
jupyter_conda_env_name_tips: '请输入papermill所在的conda环境名',
jupyter_input_note_path: 'inputNotePath',
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts
index 5cbc1324f9..a3077dbd2f 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts
@@ -47,6 +47,23 @@ export function useZeppelin(model: { [field: string]: any }): IJsonItem[] {
placeholder: t('project.node.zeppelin_paragraph_id_tips')
}
},
+ {
+ type: 'input',
+ field: 'zeppelinRestEndpoint',
+ name: t('project.node.zeppelin_rest_endpoint'),
+ props: {
+ placeholder: t('project.node.zeppelin_rest_endpoint_tips')
+ },
+ validate: {
+ trigger: ['input', 'blur'],
+ required: true,
+ validator(validate: any, value: string) {
+ if (!value) {
+ return new Error(t('project.node.zeppelin_rest_endpoint_tips'))
+ }
+ }
+ }
+ },
{
type: 'input',
field: 'parameters',
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
index 4ab18ccd05..20209639e5 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
@@ -329,6 +329,7 @@ export function formatParams(data: INodeData): {
if (data.taskType === 'ZEPPELIN') {
taskParams.noteId = data.zeppelinNoteId
taskParams.paragraphId = data.zeppelinParagraphId
+ taskParams.restEndpoint = data.zeppelinRestEndpoint
taskParams.parameters = data.parameters
}
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
index 187ff7532f..ced5d43759 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
@@ -298,6 +298,8 @@ interface ITaskParams {
stepsDefineJson?: string
zeppelinNoteId?: string
zeppelinParagraphId?: string
+ zeppelinRestEndpoint?: string
+ restEndpoint?: string
noteId?: string
paragraphId?: string
condaEnvName?: string