You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/07/05 03:05:27 UTC

[dolphinscheduler] branch dev updated: [Optimization]Optimize some details of MLFlow task plugin #10740 (#10739)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7d79a2165e [Optimization]Optimize some details of MLFlow task plugin #10740 (#10739)
7d79a2165e is described below

commit 7d79a2165ee5f0bf1aa7c949c852da179d6c0cc9
Author: JieguangZhou <ji...@163.com>
AuthorDate: Tue Jul 5 11:05:20 2022 +0800

    [Optimization]Optimize some details of MLFlow task plugin #10740 (#10739)
    
    * Optimize some details of MLFlow task plugin
    
    * Update dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowParameters.java
    
    * fix some nips
    
    Co-authored-by: Jiajie Zhong <zh...@gmail.com>
---
 .../dolphinscheduler-task-mlflow/pom.xml           |  5 +++
 .../plugin/task/mlflow/MlflowConstants.java        | 14 ++++----
 .../plugin/task/mlflow/MlflowParameters.java       |  4 +++
 .../plugin/task/mlflow/MlflowTask.java             | 41 ++++++++++++++++++++--
 .../plugin/task/mlflow/MlflowTaskTest.java         | 36 ++++++++-----------
 5 files changed, 71 insertions(+), 29 deletions(-)

diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/pom.xml
index f6691e5166..7e21ccab77 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/pom.xml
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/pom.xml
@@ -34,6 +34,11 @@
             <artifactId>dolphinscheduler-spi</artifactId>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-common</artifactId>
+            <scope>provided</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.dolphinscheduler</groupId>
             <artifactId>dolphinscheduler-task-api</artifactId>
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowConstants.java
index 4b99cfa7cd..8712081ad7 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowConstants.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowConstants.java
@@ -90,7 +90,9 @@ public class MlflowConstants {
 
     public static final String DOCKER_RREMOVE_CONTAINER = "docker rm -f %s";
 
-    public static final String DOCKER_RUN = "docker run --name=%s -p=%s:8080 %s";
+    public static final String DOCKER_RUN = "docker run -d --name=%s -p=%s:8080 " +
+            "--health-cmd \"curl --fail http://127.0.0.1:8080/ping || exit 1\" --health-interval 5s --health-retries 20" +
+            " %s";
 
     public static final String DOCKER_COMPOSE_RUN = "docker-compose up -d";
 
@@ -100,10 +102,10 @@ public class MlflowConstants {
             "export DS_TASK_MLFLOW_CPU_LIMIT=%s\n" +
             "export DS_TASK_MLFLOW_MEMORY_LIMIT=%s";
 
-    public static final String DOCKER_HEALTH_CHECK_COMMAND = "for i in $(seq 1 300); " +
-            "do " +
-            "[ $(docker inspect --format \"{{json .State.Health.Status }}\" %s) = '\"healthy\"' ] " +
-            "&& exit 0  && break;sleep 1; " +
-            "done; docker-compose down; exit 1";
 
+    public static final String DOCKER_HEALTH_CHECK = "docker inspect --format \"{{json .State.Health.Status }}\" %s";
+
+    public static final int DOCKER_HEALTH_CHECK_TIMEOUT = 20;
+
+    public static final int DOCKER_HEALTH_CHECK_INTERVAL = 5000;
 }
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowParameters.java
index a49fd051ed..4e47c8ae64 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowParameters.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowParameters.java
@@ -286,4 +286,8 @@ public class MlflowParameters extends AbstractParameters {
         return containerName;
     }
 
+    public boolean getIsDeployDocker(){
+        return deployType.equals(MlflowConstants.MLFLOW_MODELS_DEPLOY_TYPE_DOCKER) || deployType.equals(MlflowConstants.MLFLOW_MODELS_DEPLOY_TYPE_DOCKER_COMPOSE);
+    }
+
 };
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
index e1e13d17c5..88da235ff0 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
@@ -27,8 +27,10 @@ import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
 import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
 import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.OSUtils;
 import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -84,7 +86,13 @@ public class MlflowTask extends AbstractTaskExecutor {
             // construct process
             String command = buildCommand();
             TaskResponse commandExecuteResult = shellCommandExecutor.run(command);
-            setExitStatusCode(commandExecuteResult.getExitStatusCode());
+            int exitCode = exitStatusCode;
+            if (mlflowParameters.getIsDeployDocker()){
+                exitCode = checkDockerHealth();
+            }else {
+                exitCode = getExitStatusCode();
+            }
+            setExitStatusCode(exitCode);
             setAppIds(commandExecuteResult.getAppIds());
             setProcessId(commandExecuteResult.getProcessId());
             mlflowParameters.dealOutParam(shellCommandExecutor.getVarPool());
@@ -181,10 +189,12 @@ public class MlflowTask extends AbstractTaskExecutor {
             String templatePath = getTemplatePath(MlflowConstants.TEMPLATE_DOCKER_COMPOSE);
             args.add(String.format("cp %s %s", templatePath, taskExecutionContext.getExecutePath()));
             String imageName = "mlflow/" + mlflowParameters.getModelKeyName(":");
+            String containerName = mlflowParameters.getContainerName();
+
             args.add(String.format(MlflowConstants.MLFLOW_BUILD_DOCKER, deployModelKey, imageName));
+            args.add(String.format(MlflowConstants.DOCKER_RREMOVE_CONTAINER, containerName));
             args.add(mlflowParameters.getDockerComposeEnvCommand());
             args.add(MlflowConstants.DOCKER_COMPOSE_RUN);
-            args.add(String.format(MlflowConstants.DOCKER_HEALTH_CHECK_COMMAND, mlflowParameters.getContainerName()));
         }
 
         String command = ParameterUtils.convertParameterPlaceholders(String.join("\n", args), ParamUtils.convert(paramsMap));
@@ -197,6 +207,33 @@ public class MlflowTask extends AbstractTaskExecutor {
 
     }
 
+    public int checkDockerHealth() throws Exception {
+        logger.info("checking container healthy ... ");
+        int exitCode = -1;
+        String[] command = {"sh", "-c", String.format(MlflowConstants.DOCKER_HEALTH_CHECK, mlflowParameters.getContainerName())};
+        for(int x = 0; x < MlflowConstants.DOCKER_HEALTH_CHECK_TIMEOUT; x = x+1) {
+            String status;
+            try {
+                status = OSUtils.exeShell(command).replace("\n", "").replace("\"", "");
+            } catch (Exception e) {
+                status = String.format("error --- %s", e.getMessage());
+            }
+            logger.info("container healthy status: {}", status);
+
+            if (status.equals("healthy")) {
+                exitCode = 0;
+                logger.info("container is healthy");
+                return exitCode;
+            }else {
+                logger.info("The health check has been running for {} seconds", x * MlflowConstants.DOCKER_HEALTH_CHECK_INTERVAL / 1000);
+                ThreadUtils.sleep(MlflowConstants.DOCKER_HEALTH_CHECK_INTERVAL);
+            }
+        }
+
+        logger.info("health check fail");
+        return exitCode;
+    }
+
 
     @Override
     public AbstractParameters getParameters() {
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/test/java/org/apache/dolphinler/plugin/task/mlflow/MlflowTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/test/java/org/apache/dolphinler/plugin/task/mlflow/MlflowTaskTest.java
index 5a85abf3c0..f985666006 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/test/java/org/apache/dolphinler/plugin/task/mlflow/MlflowTaskTest.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/test/java/org/apache/dolphinler/plugin/task/mlflow/MlflowTaskTest.java
@@ -22,6 +22,7 @@ import java.util.UUID;
 
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
+import org.apache.dolphinscheduler.plugin.task.api.utils.OSUtils;
 import org.apache.dolphinscheduler.plugin.task.mlflow.MlflowConstants;
 import org.apache.dolphinscheduler.plugin.task.mlflow.MlflowParameters;
 import org.apache.dolphinscheduler.plugin.task.mlflow.MlflowTask;
@@ -41,7 +42,6 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import org.powermock.core.classloader.annotations.SuppressStaticInitializationFor;
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
 
-
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({
         JSONUtils.class,
@@ -129,9 +129,7 @@ public class MlflowTaskTest {
         MlflowTask mlflowTask = initTask(createModelDeplyMlflowParameters());
         Assert.assertEquals(mlflowTask.buildCommand(),
                 "export MLFLOW_TRACKING_URI=http://127.0.0.1:5000\n" +
-                        "mlflow models serve -m runs:/a272ec279fc34a8995121ae04281585f/model " +
-                        "--port 7000 " +
-                        "-h 0.0.0.0");
+                        "mlflow models serve -m models:/model/1 --port 7000 -h 0.0.0.0");
     }
 
     @Test
@@ -139,12 +137,11 @@ public class MlflowTaskTest {
         MlflowTask mlflowTask = initTask(createModelDeplyDockerParameters());
         Assert.assertEquals(mlflowTask.buildCommand(),
                 "export MLFLOW_TRACKING_URI=http://127.0.0.1:5000\n" +
-                        "mlflow models build-docker -m runs:/a272ec279fc34a8995121ae04281585f/model " +
-                        "-n mlflow/a272ec279fc34a8995121ae04281585f:model " +
-                        "--enable-mlserver\n" +
-                        "docker rm -f ds-mlflow-a272ec279fc34a8995121ae04281585f-model\n" +
-                        "docker run --name=ds-mlflow-a272ec279fc34a8995121ae04281585f-model " +
-                        "-p=7000:8080 mlflow/a272ec279fc34a8995121ae04281585f:model");
+                        "mlflow models build-docker -m models:/model/1 -n mlflow/model:1 --enable-mlserver\n" +
+                        "docker rm -f ds-mlflow-model-1\n" +
+                        "docker run -d --name=ds-mlflow-model-1 -p=7000:8080 " +
+                        "--health-cmd \"curl --fail http://127.0.0.1:8080/ping || exit 1\" --health-interval 5s --health-retries 20 " +
+                        "mlflow/model:1");
     }
 
     @Test
@@ -154,16 +151,14 @@ public class MlflowTaskTest {
                 "export MLFLOW_TRACKING_URI=http://127.0.0.1:5000\n" +
                         "cp " + mlflowTask.getTemplatePath(MlflowConstants.TEMPLATE_DOCKER_COMPOSE) +
                         " /tmp/dolphinscheduler_test\n" +
-                        "mlflow models build-docker -m models:/22222/1 -n mlflow/22222:1 --enable-mlserver\n" +
-                        "export DS_TASK_MLFLOW_IMAGE_NAME=mlflow/22222:1\n" +
-                        "export DS_TASK_MLFLOW_CONTAINER_NAME=ds-mlflow-22222-1\n" +
+                        "mlflow models build-docker -m models:/model/1 -n mlflow/model:1 --enable-mlserver\n" +
+                        "docker rm -f ds-mlflow-model-1\n" +
+                        "export DS_TASK_MLFLOW_IMAGE_NAME=mlflow/model:1\n" +
+                        "export DS_TASK_MLFLOW_CONTAINER_NAME=ds-mlflow-model-1\n" +
                         "export DS_TASK_MLFLOW_DEPLOY_PORT=7000\n" +
                         "export DS_TASK_MLFLOW_CPU_LIMIT=0.5\n" +
                         "export DS_TASK_MLFLOW_MEMORY_LIMIT=200m\n" +
-                        "docker-compose up -d\n" +
-                        "for i in $(seq 1 300); do " +
-                        "[ $(docker inspect --format \"{{json .State.Health.Status }}\" ds-mlflow-22222-1) = '\"healthy\"' ] && exit 0  && break;sleep 1; " +
-                        "done; docker-compose down; exit 1");
+                        "docker-compose up -d");
     }
 
     private MlflowTask initTask(MlflowParameters mlflowParameters) {
@@ -172,7 +167,6 @@ public class MlflowTaskTest {
         mlflowTask.init();
         mlflowTask.getParameters().setVarPool(taskExecutionContext.getVarPool());
         return mlflowTask;
-
     }
 
     private MlflowParameters createBasicAlgorithmParameters() {
@@ -218,7 +212,7 @@ public class MlflowTaskTest {
         mlflowParameters.setMlflowTaskType(MlflowConstants.MLFLOW_TASK_TYPE_MODELS);
         mlflowParameters.setDeployType(MlflowConstants.MLFLOW_MODELS_DEPLOY_TYPE_MLFLOW);
         mlflowParameters.setMlflowTrackingUris("http://127.0.0.1:5000");
-        mlflowParameters.setDeployModelKey("runs:/a272ec279fc34a8995121ae04281585f/model");
+        mlflowParameters.setDeployModelKey("models:/model/1");
         mlflowParameters.setDeployPort("7000");
         return mlflowParameters;
     }
@@ -228,7 +222,7 @@ public class MlflowTaskTest {
         mlflowParameters.setMlflowTaskType(MlflowConstants.MLFLOW_TASK_TYPE_MODELS);
         mlflowParameters.setDeployType(MlflowConstants.MLFLOW_MODELS_DEPLOY_TYPE_DOCKER);
         mlflowParameters.setMlflowTrackingUris("http://127.0.0.1:5000");
-        mlflowParameters.setDeployModelKey("runs:/a272ec279fc34a8995121ae04281585f/model");
+        mlflowParameters.setDeployModelKey("models:/model/1");
         mlflowParameters.setDeployPort("7000");
         return mlflowParameters;
     }
@@ -238,7 +232,7 @@ public class MlflowTaskTest {
         mlflowParameters.setMlflowTaskType(MlflowConstants.MLFLOW_TASK_TYPE_MODELS);
         mlflowParameters.setDeployType(MlflowConstants.MLFLOW_MODELS_DEPLOY_TYPE_DOCKER_COMPOSE);
         mlflowParameters.setMlflowTrackingUris("http://127.0.0.1:5000");
-        mlflowParameters.setDeployModelKey("models:/22222/1");
+        mlflowParameters.setDeployModelKey("models:/model/1");
         mlflowParameters.setDeployPort("7000");
         mlflowParameters.setCpuLimit("0.5");
         mlflowParameters.setMemoryLimit("200m");