You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by GitBox <gi...@apache.org> on 2022/12/15 06:59:59 UTC

[GitHub] [dolphinscheduler] caishunfeng commented on a diff in pull request #13194: [Feature][Master] Add task caching mechanism to improve the running speed of repetitive tasks

caishunfeng commented on code in PR #13194:
URL: https://github.com/apache/dolphinscheduler/pull/13194#discussion_r1049262110


##########
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/IsCache.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.enums;
+
+import com.baomidou.mybatisplus.annotation.EnumValue;
+
+public enum IsCache {

Review Comment:
   You can use the `Flag` enum. No need to recreate a similar one.



##########
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskEventType.java:
##########
@@ -22,5 +22,7 @@ public enum TaskEventType {
     DELAY,
     RUNNING,
     RESULT,
-    WORKER_REJECT
+    WORKER_REJECT,
+
+    CACHE,

Review Comment:
   ```suggestion
       WORKER_REJECT,
       CACHE,
   ```



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java:
##########
@@ -319,4 +327,38 @@ public TaskInstance queryTaskInstanceById(User loginUser, long projectCode, Long
         }
         return taskInstance;
     }
+
+    @Override
+    public TaskInstanceRemoveCacheResponse removeTaskInstanceCache(User loginUser, long projectCode,
+                                                                   Integer taskInstanceId) {
+        Result result = new Result();
+
+        Project project = projectMapper.queryByCode(projectCode);
+        projectService.checkProjectAndAuthThrowException(loginUser, project, INSTANCE_UPDATE);
+
+        TaskInstance taskInstance = taskInstanceMapper.selectById(taskInstanceId);
+        if (taskInstance == null) {
+            logger.error("Task definition can not be found, projectCode:{}, taskInstanceId:{}.", projectCode,
+                    taskInstanceId);
+            putMsg(result, Status.TASK_INSTANCE_NOT_FOUND);
+            return new TaskInstanceRemoveCacheResponse(result);
+        }
+        String tagCacheKey = taskInstance.getCacheKey();
+        String cacheKey = TaskCacheUtils.revertCacheKey(tagCacheKey);
+        List<Integer> cacheTaskInstanceIds = new ArrayList<>();
+        while (true) {
+            TaskInstance cacheTaskInstance = taskInstanceDao.findTaskInstanceByCacheKey(cacheKey);

Review Comment:
   Maybe it's not a loop action. 



##########
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtils.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.utils;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+public class TaskCacheUtils {
+
+    final static String MERGE_TAG = "-";
+
+    public static String generateCacheKey(TaskInstance taskInstance, TaskExecutionContext context) {
+        List<String> keyElements = new ArrayList<>();
+        keyElements.add(String.valueOf(taskInstance.getTaskCode()));
+        keyElements.add(String.valueOf(taskInstance.getTaskDefinitionVersion()));
+        keyElements.add(String.valueOf(taskInstance.getIsCache().getCode()));
+        keyElements.add(getTaskInputVarPool(taskInstance, context));
+        String data = StringUtils.join(keyElements, "_");
+        String cacheKey = md5(data);
+        System.out.println("cacheKey: " + cacheKey + ", data: " + data);

Review Comment:
   should remove it.



##########
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql:
##########
@@ -894,7 +898,8 @@ CREATE TABLE `t_ds_task_instance` (
   `test_flag`  tinyint(4) DEFAULT null COMMENT 'test flagļ¼š0 normal, 1 test run',
   PRIMARY KEY (`id`),
   KEY `process_instance_id` (`process_instance_id`) USING BTREE,
-  KEY `idx_code_version` (`task_code`, `task_definition_version`) USING BTREE
+  KEY `idx_code_version` (`task_code`, `task_definition_version`) USING BTREE,
+  KEY `cache_key` (`cache_key`) USING BTREE,

Review Comment:
   I think the unique index is better. WDYT?



##########
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java:
##########
@@ -187,6 +189,19 @@ public class TaskInstance implements Serializable {
      */
     private Flag flag;
 
+    /**
+     * task is cache: yes/no
+     */
+    private IsCache isCache;
+
+    /**
+     * task is cache: yes/no
+     */
+    @TableField(updateStrategy = FieldStrategy.IGNORED)
+    private String cacheKey;
+
+    @TableField(exist = false)
+    private String tmpCacheKey;

Review Comment:
   Please remove this field if it doesn't belong to this entity.



##########
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java:
##########
@@ -187,6 +189,19 @@ public class TaskInstance implements Serializable {
      */
     private Flag flag;
 
+    /**
+     * task is cache: yes/no
+     */
+    private IsCache isCache;

Review Comment:
   Same here



##########
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java:
##########
@@ -187,6 +189,19 @@ public class TaskInstance implements Serializable {
      */
     private Flag flag;
 
+    /**
+     * task is cache: yes/no
+     */
+    private IsCache isCache;
+
+    /**
+     * task is cache: yes/no
+     */

Review Comment:
   Please use the right comments.



##########
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java:
##########
@@ -187,6 +189,19 @@ public class TaskInstance implements Serializable {
      */
     private Flag flag;
 
+    /**
+     * task is cache: yes/no
+     */
+    private IsCache isCache;
+
+    /**
+     * task is cache: yes/no
+     */
+    @TableField(updateStrategy = FieldStrategy.IGNORED)

Review Comment:
   Why add the ignored update strategy for it?



##########
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java:
##########
@@ -115,6 +116,11 @@ public class TaskDefinition {
      */
     private Flag flag;
 
+    /**
+     * task is cache: yes/no
+     */
+    private IsCache isCache;

Review Comment:
   ```suggestion
       private Flag isCache;
   ```



##########
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtils.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.utils;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+public class TaskCacheUtils {
+
+    final static String MERGE_TAG = "-";
+
+    public static String generateCacheKey(TaskInstance taskInstance, TaskExecutionContext context) {
+        List<String> keyElements = new ArrayList<>();
+        keyElements.add(String.valueOf(taskInstance.getTaskCode()));
+        keyElements.add(String.valueOf(taskInstance.getTaskDefinitionVersion()));
+        keyElements.add(String.valueOf(taskInstance.getIsCache().getCode()));
+        keyElements.add(getTaskInputVarPool(taskInstance, context));
+        String data = StringUtils.join(keyElements, "_");
+        String cacheKey = md5(data);
+        System.out.println("cacheKey: " + cacheKey + ", data: " + data);
+        return cacheKey;
+    }
+
+    public static String generateTagCacheKey(Integer sourceTaskId, String cacheKey) {
+        return sourceTaskId + MERGE_TAG + cacheKey;
+    }
+
+    public static String revertCacheKey(String tagCacheKey) {
+        if (tagCacheKey == null) {
+            return "";
+        }
+        if (tagCacheKey.contains(MERGE_TAG)) {
+            return tagCacheKey.split(MERGE_TAG)[1];
+        } else {
+            return tagCacheKey;
+        }
+    }
+
+    public static String md5(String data) {
+        try {
+            MessageDigest md = MessageDigest.getInstance("MD5");
+            byte[] md5 = md.digest(data.getBytes(StandardCharsets.UTF_8));
+
+            StringBuilder sb = new StringBuilder();
+            for (byte b : md5) {
+                sb.append(String.format("%02x", b));
+            }
+            return sb.toString();
+        } catch (NoSuchAlgorithmException e) {
+            e.printStackTrace();
+        }

Review Comment:
   ```suggestion
           } catch (NoSuchAlgorithmException ignore) {
               
           }
   ```



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java:
##########
@@ -276,4 +287,30 @@ private boolean isTaskNeedToCheck(TaskPriority taskPriority) {
         }
         return false;
     }
+
+    private boolean checkIsCacheExecution(TaskInstance taskInstance, TaskExecutionContext context) {
+        if (taskInstance.getIsCache().equals(IsCache.YES)) {
+            String cacheKey = TaskCacheUtils.generateCacheKey(taskInstance, context);
+            TaskInstance cacheTaskInstance = taskInstanceDao.findTaskInstanceByCacheKey(cacheKey);
+            if (cacheTaskInstance != null) {
+                logger.info("Task {} is cache, no need to dispatch, task instance id: {}",
+                        taskInstance.getName(), taskInstance.getId());
+                addCacheEvent(taskInstance, cacheTaskInstance);
+                taskInstance.setTmpCacheKey(TaskCacheUtils.generateTagCacheKey(cacheTaskInstance.getId(), cacheKey));
+                return true;
+            } else {
+                taskInstance.setTmpCacheKey(cacheKey);
+            }
+        }
+        return false;

Review Comment:
   ```suggestion
           if (taskInstance.getIsCache().equals(IsCache.NO)) {
               return false;
           }
             String cacheKey = TaskCacheUtils.generateCacheKey(taskInstance, context);
             TaskInstance cacheTaskInstance = taskInstanceDao.findTaskInstanceByCacheKey(cacheKey);
             if (cacheTaskInstance != null) {
                 logger.info("Task {} is cache, no need to dispatch, task instance id: {}",
                         taskInstance.getName(), taskInstance.getId());
                 addCacheEvent(taskInstance, cacheTaskInstance);
                 taskInstance.setTmpCacheKey(TaskCacheUtils.generateTagCacheKey(cacheTaskInstance.getId(), cacheKey));
                 return true;
             } 
           taskInstance.setTmpCacheKey(cacheKey);
           return false;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org