You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by GitBox <gi...@apache.org> on 2022/06/14 02:29:58 UTC

[GitHub] [dolphinscheduler] zhongjiajie commented on a diff in pull request #10407: [Feature][Task Plugin] Add DVC task plugin for MLops scenario (#10372)

zhongjiajie commented on code in PR #10407:
URL: https://github.com/apache/dolphinscheduler/pull/10407#discussion_r896315911


##########
docs/docs/en/guide/task/dvc.md:
##########
@@ -0,0 +1,125 @@
+# DVC Node
+
+## Overview
+
+[DVC (Data Version Control)](https://dvc.org) is an excellent open-source  version control system for machine learning projects.
+
+The DVC plugin is used to use the data version management function of DVC on DolphinScheduler, helping users to carry out data version management easily.
+
+The plugin provides the following three functions:
+
+- Init DVC: Initialize the Git repository as a DVC repository and bind the address where the data is stored to store the actual data.
+- Upload: Add or update specific data to the repository and record the version tag.
+- Download: Download a specific version of data from the repository.
+
+## Create Task
+
+- Click `Project -> Management-Project -> Name-Workflow Definition`, and click the "Create Workflow" button to enter the
+  DAG editing page.
+- Drag from the toolbar <img src="../../../../img/tasks/icons/dvc.png" width="15"/> task node to canvas.
+
+## Task Example
+
+First, introduce some general parameters of DolphinScheduler:
+
+- **Node name**: The node name in a workflow definition is unique.
+- **Run flag**: Identifies whether this node schedules normally, if it does not need to execute, select
+  the `prohibition execution`.
+- **Descriptive information**: Describe the function of the node.
+- **Task priority**: When the number of worker threads is insufficient, execute in the order of priority from high
+  to low, and tasks with the same priority will execute in a first-in first-out order.
+- **Worker grouping**: Assign tasks to the machines of the worker group to execute. If `Default` is selected,
+  randomly select a worker machine for execution.
+- **Environment Name**: Configure the environment name in which run the script.
+- **Times of failed retry attempts**: The number of times the task failed to resubmit.
+- **Failed retry interval**: The time interval (unit minute) for resubmitting the task after a failed task.
+- **Delayed execution time**: The time (unit minute) that a task delays in execution.
+- **Timeout alarm**: Check the timeout alarm and timeout failure. When the task runs exceed the "timeout", an alarm
+  email will send and the task execution will fail.
+- **Predecessor task**: Selecting a predecessor task for the current task, will set the selected predecessor task as
+  upstream of the current task.
+
+Here are some specific parameters for the DVC plugin:
+
+- **DVC Task Type** :Upload, Download or Init DVC。
+- **DVC Repository** :The DVC repository address associated with the task execution.
+
+### Init DVC
+
+Initialize the Git repository as a DVC repository and add a new data remote to save data.
+
+After the project is initialized, it is still a Git repository, but with DVC features added.
+
+The data is not actually stored in a Git repository, but somewhere else, and DVC keeps track of the version and address of the data and handles this relationship.
+
+![dvc_init](../../../../img/tasks/demo/dvc_init.png)
+
+**Task Parameter**
+
+- **Remote Store Url** :The actual data is stored at the address. You can learn about the supported storage types from the [DVC supported storage types](https://dvc.org/doc/command-reference/remote/add#supported-storage-types) .
+
+The example above shows that: 
+Initialize repository `git@github.com:xxxx/dvc-data-repository-example.git` as a DVC project and bind the remote storage address to `~/dvc`
+
+### Upload
+
+Used to upload and update data and record version numbers.
+
+![dvc_upload](../../../../img/tasks/demo/dvc_upload.png)
+
+**Task Parameter**
+
+- **Data Path in DVC Repository** :The data will be uploaded to this path in the repository.
+- **Data Path In Worker** :Data path to be uploaded.
+- **Version** :After the data is uploaded, the version tag for the data will be saved in `git tag`.
+- **Version Message** :Version Message. 
+
+The example above shows that:
+
+Upload data `/home/lucky/xxxx/MLflow-AutoML/data/iris` to the root directory of repository `git@github.com:xxxx/dvc-data-repository-example.git`. The file or folder of data is named `iris`.

Review Comment:
   Maybe better use place hold `<YOUR-NAME-OR-ORG>` in git URI, same as below
   ```suggestion
   Upload data `/home/lucky/MLflow-AutoML/data/iris` to the root directory of repository `git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example.git`. The file or folder of data is named `iris`.
   ```



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dvc;
+
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.D;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
+
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * shell task
+ */
+public class DvcTask extends AbstractTaskExecutor {
+
+    /**
+     * shell parameters
+     */
+    private DvcParameters parameters;
+
+    /**
+     * shell command executor
+     */
+    private ShellCommandExecutor shellCommandExecutor;
+
+    /**
+     * taskExecutionContext
+     */
+    private TaskExecutionContext taskExecutionContext;
+
+    /**
+     * constructor
+     *
+     * @param taskExecutionContext taskExecutionContext
+     */
+    public DvcTask(TaskExecutionContext taskExecutionContext) {
+        super(taskExecutionContext);
+
+        this.taskExecutionContext = taskExecutionContext;
+        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext, logger);
+    }
+
+    @Override
+    public void init() {
+        logger.info("shell task params {}", taskExecutionContext.getTaskParams());
+
+        parameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), DvcParameters.class);
+
+        if (!parameters.checkParameters()) {
+            throw new RuntimeException("shell task params is not valid");
+        }
+    }
+
+    @Override
+    public void handle() throws Exception {
+        try {
+            // construct process
+            String command = buildCommand();
+            TaskResponse commandExecuteResult = shellCommandExecutor.run(command);
+            setExitStatusCode(commandExecuteResult.getExitStatusCode());
+            setAppIds(commandExecuteResult.getAppIds());
+            setProcessId(commandExecuteResult.getProcessId());
+            parameters.dealOutParam(shellCommandExecutor.getVarPool());
+        } catch (Exception e) {
+            logger.error("shell task error", e);
+            setExitStatusCode(EXIT_CODE_FAILURE);
+            throw e;
+        }
+    }
+
+    @Override
+    public void cancelApplication(boolean cancelApplication) throws Exception {
+        // cancel process
+        shellCommandExecutor.cancelApplication();
+    }
+
+    public String buildCommand() {
+        String command = "";
+        String taskType = parameters.getDvcTaskType();
+        if (taskType.equals(DvcConstants.TASK_TYPE_UPLOAD)) {
+            command = buildUploadCommond();
+        }else if (taskType.equals(DvcConstants.TASK_TYPE_DOWNLOAD)){
+            command = buildDownCommond();
+        }else if (taskType.equals(DvcConstants.TASK_TYPE_INIT_DVC)){
+            command = buildInitDvcCommond();
+        }
+        logger.info(command);

Review Comment:
   should we add more detail prefix word before info log, like `Run DVC task with command: ` or others



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org