You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by li...@apache.org on 2020/12/22 07:32:48 UTC

[incubator-dolphinscheduler] branch dev updated: [Improvement] Refactor code to support distributed tracing (#4270)

This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3a28a71  [Improvement] Refactor code to support distributed tracing (#4270)
3a28a71 is described below

commit 3a28a71b007a26e7613d2f23094b8b4cbfe111ff
Author: hailin0 <ha...@foxmail.com>
AuthorDate: Tue Dec 22 15:32:41 2020 +0800

    [Improvement] Refactor code to support distributed tracing (#4270)
    
    * Refactor code to support tracing
    
    * Extension network protocol, support context and version
    * Extension master asynchronous queue support context
    * Extract scan task method from MasterSchedulerService for tracing
    
    * fix
    
    * fix
    
    * add test case
    
    * fix
    
    * fix
    
    Co-authored-by: hailin0 <ha...@yeah.net>
---
 .../remote/codec/NettyDecoder.java                 | 33 ++++++++
 .../remote/codec/NettyEncoder.java                 |  7 ++
 .../dolphinscheduler/remote/command/Command.java   | 14 ++++
 .../{CommandHeader.java => CommandContext.java}    | 48 +++++------
 .../remote/command/CommandHeader.java              | 26 ++++++
 .../master/consumer/TaskPriorityQueueConsumer.java | 20 ++---
 .../master/runner/MasterBaseTaskExecThread.java    | 37 +++------
 .../master/runner/MasterSchedulerService.java      | 77 +++++++++--------
 .../consumer/TaskPriorityQueueConsumerTest.java    | 27 +++---
 .../service/queue}/TaskPriority.java               | 97 ++++++++++++++--------
 .../service/queue/TaskPriorityQueueImpl.java       | 46 +---------
 .../src/test/java/queue/TaskPriorityTest.java      | 83 ++++++++++++++++++
 .../src/test/java/queue/TaskUpdateQueueTest.java   | 20 ++---
 13 files changed, 338 insertions(+), 197 deletions(-)

diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java
index 179ae1b..343e8c6 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java
@@ -22,6 +22,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.ReplayingDecoder;
 import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandContext;
 import org.apache.dolphinscheduler.remote.command.CommandHeader;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.slf4j.Logger;
@@ -54,16 +55,34 @@ public class NettyDecoder extends ReplayingDecoder<NettyDecoder.State> {
         switch (state()){
             case MAGIC:
                 checkMagic(in.readByte());
+                checkpoint(State.VERSION);
+                // fallthru
+            case VERSION:
+                checkVersion(in.readByte());
                 checkpoint(State.COMMAND);
+                // fallthru
             case COMMAND:
                 commandHeader.setType(in.readByte());
                 checkpoint(State.OPAQUE);
+                // fallthru
             case OPAQUE:
                 commandHeader.setOpaque(in.readLong());
+                checkpoint(State.CONTEXT_LENGTH);
+                // fallthru
+            case CONTEXT_LENGTH:
+                commandHeader.setContextLength(in.readInt());
+                checkpoint(State.CONTEXT);
+                // fallthru
+            case CONTEXT:
+                byte[] context = new byte[commandHeader.getContextLength()];
+                in.readBytes(context);
+                commandHeader.setContext(context);
                 checkpoint(State.BODY_LENGTH);
+                // fallthru
             case BODY_LENGTH:
                 commandHeader.setBodyLength(in.readInt());
                 checkpoint(State.BODY);
+                // fallthru
             case BODY:
                 byte[] body = new byte[commandHeader.getBodyLength()];
                 in.readBytes(body);
@@ -71,6 +90,7 @@ public class NettyDecoder extends ReplayingDecoder<NettyDecoder.State> {
                 Command packet = new Command();
                 packet.setType(commandType(commandHeader.getType()));
                 packet.setOpaque(commandHeader.getOpaque());
+                packet.setContext(CommandContext.valueOf(commandHeader.getContext()));
                 packet.setBody(body);
                 out.add(packet);
                 //
@@ -105,10 +125,23 @@ public class NettyDecoder extends ReplayingDecoder<NettyDecoder.State> {
         }
     }
 
+    /**
+     *  check version
+     * @param version
+     */
+    private void checkVersion(byte version) {
+        if (version != Command.VERSION) {
+            throw new IllegalArgumentException("illegal protocol [version]" + version);
+        }
+    }
+
     enum State{
         MAGIC,
+        VERSION,
         COMMAND,
         OPAQUE,
+        CONTEXT_LENGTH,
+        CONTEXT,
         BODY_LENGTH,
         BODY;
     }
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyEncoder.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyEncoder.java
index 4e9836a..785ee5a 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyEncoder.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyEncoder.java
@@ -42,11 +42,18 @@ public class NettyEncoder extends MessageToByteEncoder<Command> {
             throw new Exception("encode msg is null");
         }
         out.writeByte(Command.MAGIC);
+        out.writeByte(Command.VERSION);
         out.writeByte(msg.getType().ordinal());
         out.writeLong(msg.getOpaque());
+        writeContext(msg, out);
         out.writeInt(msg.getBody().length);
         out.writeBytes(msg.getBody());
     }
 
+    private void writeContext(Command msg, ByteBuf out) {
+        byte[] headerBytes = msg.getContext().toBytes();
+        out.writeInt(headerBytes.length);
+        out.writeBytes(headerBytes);
+    }
 }
 
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java
index ed46e1f..9baa321 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java
@@ -28,6 +28,7 @@ public class Command implements Serializable {
     private static final AtomicLong REQUEST_ID = new AtomicLong(1);
 
     public static final byte MAGIC = (byte) 0xbabe;
+    public static final byte VERSION = 0;
 
     public Command(){
         this.opaque = REQUEST_ID.getAndIncrement();
@@ -48,6 +49,11 @@ public class Command implements Serializable {
     private long opaque;
 
     /**
+     * request context
+     */
+    private CommandContext context = new CommandContext();
+
+    /**
      *  data body
      */
     private byte[] body;
@@ -76,6 +82,14 @@ public class Command implements Serializable {
         this.body = body;
     }
 
+    public CommandContext getContext() {
+        return context;
+    }
+
+    public void setContext(CommandContext context) {
+        this.context = context;
+    }
+
     @Override
     public int hashCode() {
         final int prime = 31;
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandContext.java
similarity index 55%
copy from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java
copy to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandContext.java
index 78948a5..c9febee 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandContext.java
@@ -14,51 +14,43 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.remote.command;
 
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+
 import java.io.Serializable;
+import java.util.LinkedHashMap;
+import java.util.Map;
 
 /**
- *  command header
+ *  command context
  */
-public class CommandHeader implements Serializable {
-
-    /**
-     * type
-     */
-    private byte type;
-
-    /**
-     * request unique identification
-     */
-    private long opaque;
+public class CommandContext implements Serializable {
 
-    /**
-     *  body length
-     */
-    private int bodyLength;
+    private Map<String, String> items = new LinkedHashMap<>();
 
-    public int getBodyLength() {
-        return bodyLength;
+    public Map<String, String> getItems() {
+        return items;
     }
 
-    public void setBodyLength(int bodyLength) {
-        this.bodyLength = bodyLength;
+    public void setItems(Map<String, String> items) {
+        this.items = items;
     }
 
-    public byte getType() {
-        return type;
+    public void put(String key, String value) {
+        items.put(key, value);
     }
 
-    public void setType(byte type) {
-        this.type = type;
+    public String get(String key) {
+        return items.get(key);
     }
 
-    public long getOpaque() {
-        return opaque;
+    public byte[] toBytes() {
+        return JSONUtils.toJsonByteArray(this);
     }
 
-    public void setOpaque(long opaque) {
-        this.opaque = opaque;
+    public static CommandContext valueOf(byte[] src) {
+        return JSONUtils.parseObject(src, CommandContext.class);
     }
 }
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java
index 78948a5..9e83a42 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java
@@ -34,6 +34,16 @@ public class CommandHeader implements Serializable {
     private long opaque;
 
     /**
+     *  context length
+     */
+    private int contextLength;
+
+    /**
+     *  context
+     */
+    private byte[] context;
+
+    /**
      *  body length
      */
     private int bodyLength;
@@ -61,4 +71,20 @@ public class CommandHeader implements Serializable {
     public void setOpaque(long opaque) {
         this.opaque = opaque;
     }
+
+    public int getContextLength() {
+        return contextLength;
+    }
+
+    public void setContextLength(int contextLength) {
+        this.contextLength = contextLength;
+    }
+
+    public byte[] getContext() {
+        return context;
+    }
+
+    public void setContext(byte[] context) {
+        this.context = context;
+    }
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index a407e4e..2325508 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -50,13 +50,13 @@ import org.apache.dolphinscheduler.server.entity.ProcedureTaskExecutionContext;
 import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
 import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
-import org.apache.dolphinscheduler.server.entity.TaskPriority;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
 import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
 import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
 import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
 import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.queue.TaskPriority;
 import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
 
 import java.util.ArrayList;
@@ -90,7 +90,7 @@ public class TaskPriorityQueueConsumer extends Thread {
      * taskUpdateQueue
      */
     @Autowired
-    private TaskPriorityQueue<String> taskPriorityQueue;
+    private TaskPriorityQueue<TaskPriority> taskPriorityQueue;
 
     /**
      * processService
@@ -119,7 +119,7 @@ public class TaskPriorityQueueConsumer extends Thread {
 
     @Override
     public void run() {
-        List<String> failedDispatchTasks = new ArrayList<>();
+        List<TaskPriority> failedDispatchTasks = new ArrayList<>();
         while (Stopper.isRunning()) {
             try {
                 int fetchTaskNum = masterConfig.getMasterDispatchTaskNumber();
@@ -130,15 +130,14 @@ public class TaskPriorityQueueConsumer extends Thread {
                         continue;
                     }
                     // if not task , blocking here
-                    String taskPriorityInfo = taskPriorityQueue.take();
-                    TaskPriority taskPriority = TaskPriority.of(taskPriorityInfo);
-                    boolean dispatchResult = dispatch(taskPriority.getTaskId());
+                    TaskPriority taskPriority = taskPriorityQueue.take();
+                    boolean dispatchResult = dispatch(taskPriority);
                     if (!dispatchResult) {
-                        failedDispatchTasks.add(taskPriorityInfo);
+                        failedDispatchTasks.add(taskPriority);
                     }
                 }
                 if (!failedDispatchTasks.isEmpty()) {
-                    for (String dispatchFailedTask : failedDispatchTasks) {
+                    for (TaskPriority dispatchFailedTask : failedDispatchTasks) {
                         taskPriorityQueue.put(dispatchFailedTask);
                     }
                     // If there are tasks in a cycle that cannot find the worker group,
@@ -157,12 +156,13 @@ public class TaskPriorityQueueConsumer extends Thread {
     /**
      * dispatch task
      *
-     * @param taskInstanceId taskInstanceId
+     * @param taskPriority taskPriority
      * @return result
      */
-    protected boolean dispatch(int taskInstanceId) {
+    protected boolean dispatch(TaskPriority taskPriority) {
         boolean result = false;
         try {
+            int taskInstanceId = taskPriority.getTaskId();
             TaskExecutionContext context = getTaskExecutionContext(taskInstanceId);
             ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup());
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
index f5c3708..fcff67f 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
@@ -16,8 +16,6 @@
  */
 package org.apache.dolphinscheduler.server.master.runner;
 
-import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
-
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
 import org.apache.dolphinscheduler.common.model.TaskNode;
@@ -27,17 +25,15 @@ import org.apache.dolphinscheduler.dao.AlertDao;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.service.queue.TaskPriority;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
 import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
 
-import java.util.concurrent.Callable;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import static org.apache.dolphinscheduler.common.Constants.*;
 
 import java.util.Date;
 import java.util.concurrent.Callable;
@@ -217,14 +213,14 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
             logger.info("task ready to submit: {}", taskInstance);
 
             /**
-             *  taskPriorityInfo
+             *  taskPriority
              */
-            String taskPriorityInfo = buildTaskPriorityInfo(processInstance.getProcessInstancePriority().getCode(),
+            TaskPriority taskPriority = buildTaskPriority(processInstance.getProcessInstancePriority().getCode(),
                     processInstance.getId(),
                     taskInstance.getProcessInstancePriority().getCode(),
                     taskInstance.getId(),
                     org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP);
-            taskUpdateQueue.put(taskPriorityInfo);
+            taskUpdateQueue.put(taskPriority);
             logger.info(String.format("master submit success, task : %s", taskInstance.getName()) );
             return true;
         }catch (Exception e){
@@ -235,29 +231,22 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
     }
 
     /**
-     * buildTaskPriorityInfo
+     * buildTaskPriority
      *
      * @param processInstancePriority processInstancePriority
      * @param processInstanceId processInstanceId
      * @param taskInstancePriority taskInstancePriority
      * @param taskInstanceId taskInstanceId
      * @param workerGroup workerGroup
-     * @return TaskPriorityInfo
+     * @return TaskPriority
      */
-    private String buildTaskPriorityInfo(int processInstancePriority,
-                                         int processInstanceId,
-                                         int taskInstancePriority,
-                                         int taskInstanceId,
-                                         String workerGroup) {
-        return processInstancePriority +
-                UNDERLINE +
-                processInstanceId +
-                UNDERLINE +
-                taskInstancePriority +
-                UNDERLINE +
-                taskInstanceId +
-                UNDERLINE +
-                workerGroup;
+    private TaskPriority buildTaskPriority(int processInstancePriority,
+                                           int processInstanceId,
+                                           int taskInstancePriority,
+                                           int taskInstanceId,
+                                           String workerGroup) {
+        return new TaskPriority(processInstancePriority, processInstanceId,
+                taskInstancePriority, taskInstanceId, workerGroup);
     }
 
     /**
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index 30dd0f9..b0e0528 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -124,52 +124,57 @@ public class MasterSchedulerService extends Thread {
     public void run() {
         logger.info("master scheduler started");
         while (Stopper.isRunning()){
-            InterProcessMutex mutex = null;
             try {
                 boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory());
-                if(!runCheckFlag) {
+                if (!runCheckFlag) {
                     Thread.sleep(Constants.SLEEP_TIME_MILLIS);
                     continue;
                 }
                 if (zkMasterClient.getZkClient().getState() == CuratorFrameworkState.STARTED) {
+                    scheduleProcess();
+                }
+            } catch (Exception e) {
+                logger.error("master scheduler thread error", e);
+            }
+        }
+    }
 
-                    mutex = zkMasterClient.blockAcquireMutex();
-
-                    int activeCount = masterExecService.getActiveCount();
-                    // make sure to scan and delete command  table in one transaction
-                    Command command = processService.findOneCommand();
-                    if (command != null) {
-                        logger.info("find one command: id: {}, type: {}", command.getId(),command.getCommandType());
-
-                        try{
-
-                            ProcessInstance processInstance = processService.handleCommand(logger,
-                                    getLocalAddress(),
-                                    this.masterConfig.getMasterExecThreads() - activeCount, command);
-                            if (processInstance != null) {
-                                logger.info("start master exec thread , split DAG ...");
-                                masterExecService.execute(
-                                        new MasterExecThread(
-                                                processInstance
-                                                , processService
-                                                , nettyRemotingClient
-                                                , alertManager
-                                                , masterConfig));
-                            }
-                        }catch (Exception e){
-                            logger.error("scan command error ", e);
-                            processService.moveToErrorCommand(command, e.toString());
-                        }
-                    } else{
-                        //indicate that no command ,sleep for 1s
-                        Thread.sleep(Constants.SLEEP_TIME_MILLIS);
+    private void scheduleProcess() throws Exception {
+        InterProcessMutex mutex = null;
+        try {
+            mutex = zkMasterClient.blockAcquireMutex();
+
+            int activeCount = masterExecService.getActiveCount();
+            // make sure to scan and delete command  table in one transaction
+            Command command = processService.findOneCommand();
+            if (command != null) {
+                logger.info("find one command: id: {}, type: {}", command.getId(),command.getCommandType());
+
+                try {
+
+                    ProcessInstance processInstance = processService.handleCommand(logger,
+                            getLocalAddress(),
+                            this.masterConfig.getMasterExecThreads() - activeCount, command);
+                    if (processInstance != null) {
+                        logger.info("start master exec thread , split DAG ...");
+                        masterExecService.execute(
+                                new MasterExecThread(
+                                        processInstance
+                                        , processService
+                                        , nettyRemotingClient
+                                        , alertManager
+                                        , masterConfig));
                     }
+                } catch (Exception e) {
+                    logger.error("scan command error ", e);
+                    processService.moveToErrorCommand(command, e.toString());
                 }
-            } catch (Exception e){
-                logger.error("master scheduler thread error",e);
-            } finally{
-                zkMasterClient.releaseMutex(mutex);
+            } else {
+                //indicate that no command ,sleep for 1s
+                Thread.sleep(Constants.SLEEP_TIME_MILLIS);
             }
+        } finally {
+            zkMasterClient.releaseMutex(mutex);
         }
     }
 
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
index 049e30e..8c2321d 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
@@ -42,6 +42,7 @@ import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
 import org.apache.dolphinscheduler.server.zk.SpringZKServer;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.queue.TaskPriority;
 import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
 import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient;
 import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
@@ -72,7 +73,7 @@ public class TaskPriorityQueueConsumerTest {
 
 
     @Autowired
-    private TaskPriorityQueue taskPriorityQueue;
+    private TaskPriorityQueue<TaskPriority> taskPriorityQueue;
 
     @Autowired
     private TaskPriorityQueueConsumer taskPriorityQueueConsumer;
@@ -142,9 +143,8 @@ public class TaskPriorityQueueConsumerTest {
         taskInstance.setProcessDefine(processDefinition);
 
         Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
-        Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
-
-        taskPriorityQueue.put("2_1_2_1_default");
+        TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default");
+        taskPriorityQueue.put(taskPriority);
 
         TimeUnit.SECONDS.sleep(10);
 
@@ -180,7 +180,8 @@ public class TaskPriorityQueueConsumerTest {
         processDefinition.setProjectId(1);
         taskInstance.setProcessDefine(processDefinition);
         Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
-        taskPriorityQueue.put("2_1_2_1_default");
+        TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default");
+        taskPriorityQueue.put(taskPriority);
 
         DataSource dataSource = new DataSource();
         dataSource.setId(1);
@@ -243,7 +244,8 @@ public class TaskPriorityQueueConsumerTest {
         processDefinition.setProjectId(1);
         taskInstance.setProcessDefine(processDefinition);
         Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
-        taskPriorityQueue.put("2_1_2_1_default");
+        TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default");
+        taskPriorityQueue.put(taskPriority);
 
         DataSource dataSource = new DataSource();
         dataSource.setId(80);
@@ -310,7 +312,8 @@ public class TaskPriorityQueueConsumerTest {
         processDefinition.setProjectId(1);
         taskInstance.setProcessDefine(processDefinition);
         Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
-        taskPriorityQueue.put("2_1_2_1_default");
+        TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default");
+        taskPriorityQueue.put(taskPriority);
 
         DataSource dataSource = new DataSource();
         dataSource.setId(1);
@@ -402,7 +405,8 @@ public class TaskPriorityQueueConsumerTest {
         Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
         Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
 
-        taskPriorityQueue.put("2_1_2_1_NoWorkGroup");
+        TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "NoWorkGroup");
+        taskPriorityQueue.put(taskPriority);
 
         TimeUnit.SECONDS.sleep(10);
 
@@ -455,7 +459,9 @@ public class TaskPriorityQueueConsumerTest {
         Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
         Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
 
-        boolean res  = taskPriorityQueueConsumer.dispatch(1);
+        TaskPriority taskPriority = new TaskPriority();
+        taskPriority.setTaskId(1);
+        boolean res  = taskPriorityQueueConsumer.dispatch(taskPriority);
 
         Assert.assertFalse(res);
     }
@@ -649,7 +655,8 @@ public class TaskPriorityQueueConsumerTest {
         Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
         Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
 
-        taskPriorityQueue.put("2_1_2_1_NoWorkGroup");
+        TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "NoWorkGroup");
+        taskPriorityQueue.put(taskPriority);
 
         taskPriorityQueueConsumer.run();
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskPriority.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
similarity index 56%
rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskPriority.java
rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
index 991eeed..a872f6d 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskPriority.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
@@ -15,14 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.entity;
+package org.apache.dolphinscheduler.service.queue;
 
-import static org.apache.dolphinscheduler.common.Constants.*;
+import java.util.Map;
+import java.util.Objects;
 
 /**
  *  task priority info
  */
-public class TaskPriority {
+public class TaskPriority implements Comparable<TaskPriority> {
 
     /**
      * processInstancePriority
@@ -50,9 +51,9 @@ public class TaskPriority {
     private String groupName;
 
     /**
-     *   ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_${groupName}
+     * context
      */
-    private String taskPriorityInfo;
+    private Map<String, String> context;
 
     public TaskPriority(){}
 
@@ -65,15 +66,6 @@ public class TaskPriority {
         this.taskInstancePriority = taskInstancePriority;
         this.taskId = taskId;
         this.groupName = groupName;
-        this.taskPriorityInfo = this.processInstancePriority +
-                UNDERLINE +
-                this.processInstanceId +
-                UNDERLINE +
-                this.taskInstancePriority +
-                UNDERLINE +
-                this.taskId +
-                UNDERLINE +
-                this.groupName;
     }
 
     public int getProcessInstancePriority() {
@@ -104,6 +96,10 @@ public class TaskPriority {
         return taskId;
     }
 
+    public Map<String, String> getContext() {
+        return context;
+    }
+
     public void setTaskId(int taskId) {
         this.taskId = taskId;
     }
@@ -116,32 +112,61 @@ public class TaskPriority {
         this.groupName = groupName;
     }
 
-    public String getTaskPriorityInfo() {
-        return taskPriorityInfo;
+    public void setContext(Map<String, String> context) {
+        this.context = context;
     }
 
-    public void setTaskPriorityInfo(String taskPriorityInfo) {
-        this.taskPriorityInfo = taskPriorityInfo;
-    }
+    @Override
+    public int compareTo(TaskPriority other) {
+        if (this.getProcessInstancePriority() > other.getProcessInstancePriority()) {
+            return 1;
+        }
+        if (this.getProcessInstancePriority() < other.getProcessInstancePriority()) {
+            return -1;
+        }
 
-    /**
-     * taskPriorityInfo convert taskPriority
-     *
-     * @param taskPriorityInfo taskPriorityInfo
-     * @return TaskPriority
-     */
-    public static TaskPriority of(String taskPriorityInfo){
-        String[] parts = taskPriorityInfo.split(UNDERLINE);
+        if (this.getProcessInstanceId() > other.getProcessInstanceId()) {
+            return 1;
+        }
+        if (this.getProcessInstanceId() < other.getProcessInstanceId()) {
+            return -1;
+        }
+
+        if (this.getTaskInstancePriority() > other.getTaskInstancePriority()) {
+            return 1;
+        }
+        if (this.getTaskInstancePriority() < other.getTaskInstancePriority()) {
+            return -1;
+        }
+
+        if (this.getTaskId() > other.getTaskId()) {
+            return 1;
+        }
+        if (this.getTaskId() < other.getTaskId()) {
+            return -1;
+        }
 
-        if (parts.length != 5) {
-            throw new IllegalArgumentException(String.format("TaskPriority : %s illegal.", taskPriorityInfo));
+        return this.getGroupName().compareTo(other.getGroupName());
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
         }
-        TaskPriority taskPriority = new TaskPriority(
-                Integer.parseInt(parts[0]),
-                Integer.parseInt(parts[1]),
-                Integer.parseInt(parts[2]),
-                Integer.parseInt(parts[3]),
-                parts[4]);
-        return taskPriority;
+        TaskPriority that = (TaskPriority) o;
+        return processInstancePriority == that.processInstancePriority
+                &&  processInstanceId == that.processInstanceId
+                && taskInstancePriority == that.taskInstancePriority
+                && taskId == that.taskId
+                && Objects.equals(groupName, that.groupName);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(processInstancePriority, processInstanceId, taskInstancePriority, taskId, groupName);
     }
 }
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
index aefad84..694d4c4 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
@@ -17,24 +17,18 @@
 
 package org.apache.dolphinscheduler.service.queue;
 
-import static org.apache.dolphinscheduler.common.Constants.TASK_INFO_LENGTH;
-import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
-
 import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
 
-import java.util.Comparator;
 import java.util.concurrent.PriorityBlockingQueue;
 
 import org.springframework.stereotype.Service;
 
-
-
 /**
  * A singleton of a task queue implemented with zookeeper
  * tasks queue implementation
  */
 @Service
-public class TaskPriorityQueueImpl implements TaskPriorityQueue<String> {
+public class TaskPriorityQueueImpl implements TaskPriorityQueue<TaskPriority> {
     /**
      * queue size
      */
@@ -43,7 +37,7 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue<String> {
     /**
      * queue
      */
-    private PriorityBlockingQueue<String> queue = new PriorityBlockingQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator());
+    private PriorityBlockingQueue<TaskPriority> queue = new PriorityBlockingQueue<>(QUEUE_MAX_SIZE);
 
     /**
      * put task takePriorityInfo
@@ -52,7 +46,7 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue<String> {
      * @throws TaskPriorityQueueException
      */
     @Override
-    public void put(String taskPriorityInfo) throws TaskPriorityQueueException {
+    public void put(TaskPriority taskPriorityInfo) throws TaskPriorityQueueException {
         queue.put(taskPriorityInfo);
     }
 
@@ -63,7 +57,7 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue<String> {
      * @throws TaskPriorityQueueException
      */
     @Override
-    public String take() throws TaskPriorityQueueException, InterruptedException {
+    public TaskPriority take() throws TaskPriorityQueueException, InterruptedException {
         return queue.take();
     }
 
@@ -77,36 +71,4 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue<String> {
     public int size() throws TaskPriorityQueueException {
         return queue.size();
     }
-
-    /**
-     * TaskInfoComparator
-     */
-    private class TaskInfoComparator implements Comparator<String> {
-
-        /**
-         * compare o1 o2
-         *
-         * @param o1 o1
-         * @param o2 o2
-         * @return compare result
-         */
-        @Override
-        public int compare(String o1, String o2) {
-            String s1 = o1;
-            String s2 = o2;
-            String[] s1Array = s1.split(UNDERLINE);
-            if (s1Array.length > TASK_INFO_LENGTH) {
-                // warning: if this length > 5, need to be changed
-                s1 = s1.substring(0, s1.lastIndexOf(UNDERLINE));
-            }
-
-            String[] s2Array = s2.split(UNDERLINE);
-            if (s2Array.length > TASK_INFO_LENGTH) {
-                // warning: if this length > 5, need to be changed
-                s2 = s2.substring(0, s2.lastIndexOf(UNDERLINE));
-            }
-
-            return s1.compareTo(s2);
-        }
-    }
 }
diff --git a/dolphinscheduler-service/src/test/java/queue/TaskPriorityTest.java b/dolphinscheduler-service/src/test/java/queue/TaskPriorityTest.java
new file mode 100644
index 0000000..1511770
--- /dev/null
+++ b/dolphinscheduler-service/src/test/java/queue/TaskPriorityTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package queue;
+
+import org.apache.dolphinscheduler.service.queue.TaskPriority;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TaskPriorityTest {
+
+    @Test
+    public void testSort() {
+        TaskPriority priorityOne = new TaskPriority(1, 0, 0, 0, "default");
+        TaskPriority priorityTwo = new TaskPriority(2, 0, 0, 0, "default");
+        TaskPriority priorityThree = new TaskPriority(3, 0, 0, 0, "default");
+        List<TaskPriority> taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo);
+        Collections.sort(taskPrioritys);
+        Assert.assertEquals(
+                Arrays.asList(priorityOne, priorityTwo, priorityThree),
+                taskPrioritys
+        );
+
+        priorityOne = new TaskPriority(0, 1, 0, 0, "default");
+        priorityTwo = new TaskPriority(0, 2, 0, 0, "default");
+        priorityThree = new TaskPriority(0, 3, 0, 0, "default");
+        taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo);
+        Collections.sort(taskPrioritys);
+        Assert.assertEquals(
+                Arrays.asList(priorityOne, priorityTwo, priorityThree),
+                taskPrioritys
+        );
+
+        priorityOne = new TaskPriority(0, 0, 1, 0, "default");
+        priorityTwo = new TaskPriority(0, 0, 2, 0, "default");
+        priorityThree = new TaskPriority(0, 0, 3, 0, "default");
+        taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo);
+        Collections.sort(taskPrioritys);
+        Assert.assertEquals(
+                Arrays.asList(priorityOne, priorityTwo, priorityThree),
+                taskPrioritys
+        );
+
+        priorityOne = new TaskPriority(0, 0, 0, 1, "default");
+        priorityTwo = new TaskPriority(0, 0, 0, 2, "default");
+        priorityThree = new TaskPriority(0, 0, 0, 3, "default");
+        taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo);
+        Collections.sort(taskPrioritys);
+        Assert.assertEquals(
+                Arrays.asList(priorityOne, priorityTwo, priorityThree),
+                taskPrioritys
+        );
+
+        priorityOne = new TaskPriority(0, 0, 0, 0, "default_1");
+        priorityTwo = new TaskPriority(0, 0, 0, 0, "default_2");
+        priorityThree = new TaskPriority(0, 0, 0, 0, "default_3");
+        taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo);
+        Collections.sort(taskPrioritys);
+        Assert.assertEquals(
+                Arrays.asList(priorityOne, priorityTwo, priorityThree),
+                taskPrioritys
+        );
+    }
+}
diff --git a/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java b/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java
index ca6c083..2c13afa 100644
--- a/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java
+++ b/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java
@@ -17,6 +17,7 @@
 
 package queue;
 
+import org.apache.dolphinscheduler.service.queue.TaskPriority;
 import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
 import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
 import org.junit.Test;
@@ -31,19 +32,16 @@ public class TaskUpdateQueueTest {
     @Test
     public void testQueue() throws Exception{
 
-        // ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_${groupName}
-
         /**
          * 1_1_2_1_default
          * 1_1_2_2_default
          * 1_1_0_3_default
          * 1_1_0_4_default
          */
-
-        String taskInfo1 = "1_1_2_1_default";
-        String taskInfo2 = "1_1_2_2_default";
-        String taskInfo3 = "1_1_0_3_default";
-        String taskInfo4 = "1_1_0_4_default";
+        TaskPriority taskInfo1 = new TaskPriority(1, 1, 2, 1, "default");
+        TaskPriority taskInfo2 = new TaskPriority(1, 1, 2, 2, "default");
+        TaskPriority taskInfo3 = new TaskPriority(1, 1, 0, 3, "default");
+        TaskPriority taskInfo4 = new TaskPriority(1, 1, 0, 4, "default");
 
         TaskPriorityQueue queue = new TaskPriorityQueueImpl();
         queue.put(taskInfo1);
@@ -51,9 +49,9 @@ public class TaskUpdateQueueTest {
         queue.put(taskInfo3);
         queue.put(taskInfo4);
 
-        assertEquals("1_1_0_3_default", queue.take());
-        assertEquals("1_1_0_4_default", queue.take());
-        assertEquals("1_1_2_1_default",queue.take());
-        assertEquals("1_1_2_2_default",queue.take());
+        assertEquals(taskInfo3, queue.take());
+        assertEquals(taskInfo4, queue.take());
+        assertEquals(taskInfo1, queue.take());
+        assertEquals(taskInfo2, queue.take());
     }
 }