You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by li...@apache.org on 2020/12/18 09:28:46 UTC

[incubator-dolphinscheduler] branch dev updated: [Fix-4222][Master]Add the priority queue to ensure that tasks are submitted according to priority. (#4250)

This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9fe5c37  [Fix-4222][Master]Add the priority queue to ensure that tasks are submitted according to priority. (#4250)
9fe5c37 is described below

commit 9fe5c3717ded3e42ccde854d5ba23a685c23508d
Author: lgcareer <18...@163.com>
AuthorDate: Fri Dec 18 17:28:37 2020 +0800

    [Fix-4222][Master]Add the priority queue to ensure that tasks are submitted according to priority. (#4250)
    
    * [Fix-4222][Master]Add the priority queue to ensure that tasks are submitted according to priority.
    
    * [Fix-4222][Master]Add the priority queue to ensure that tasks are submitted according to priority.
    
    * [Fix-4222][Master]Remove useless import
    
    * [Fix-4222][Master]Reformat code style
    
    * [Fix-4222][Master]Reformat code style
    
    * [Fix-4222][Master]Reformat code style
    
    * [Fix-4222][Master]add PeerTaskInstancePriorityQueueTest
    
    * [Fix-4222][Master]Fix code smell
    
    * [Fix-4222][Master]Reformat code style
    
    * [Fix-4222][Master]Fix code smell
    
    Co-authored-by: xingchun-chen <55...@users.noreply.github.com>
---
 .../master/consumer/TaskPriorityQueueConsumer.java |   2 +-
 .../server/master/runner/MasterExecThread.java     |  72 +++++++-----
 .../TaskPriorityQueueException.java}               |  36 +++---
 .../queue/PeerTaskInstancePriorityQueue.java       | 127 +++++++++++++++++++++
 .../service/queue/TaskPriorityQueue.java           |  21 ++--
 .../service/queue/TaskPriorityQueueImpl.java       |  38 +++---
 .../queue/PeerTaskInstancePriorityQueueTest.java   |  89 +++++++++++++++
 pom.xml                                            |   1 +
 8 files changed, 316 insertions(+), 70 deletions(-)

diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index 89d3e97..a407e4e 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -90,7 +90,7 @@ public class TaskPriorityQueueConsumer extends Thread {
      * taskUpdateQueue
      */
     @Autowired
-    private TaskPriorityQueue taskPriorityQueue;
+    private TaskPriorityQueue<String> taskPriorityQueue;
 
     /**
      * processService
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index 66de086..e68f9d5 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -55,6 +55,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.utils.AlertManager;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
+import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
 
 import org.apache.commons.io.FileUtils;
 
@@ -120,9 +121,9 @@ public class MasterExecThread implements Runnable {
     private Map<String, TaskInstance> completeTaskList = new ConcurrentHashMap<>();
 
     /**
-     * ready to submit task list
+     * ready to submit task queue
      */
-    private Map<String, TaskInstance> readyToSubmitTaskList = new ConcurrentHashMap<>();
+    private PeerTaskInstancePriorityQueue readyToSubmitTaskQueue = new PeerTaskInstancePriorityQueue();
 
     /**
      * depend failed task map
@@ -178,8 +179,8 @@ public class MasterExecThread implements Runnable {
     /**
      * constructor of MasterExecThread
      *
-     * @param processInstance processInstance
-     * @param processService processService
+     * @param processInstance     processInstance
+     * @param processService      processService
      * @param nettyRemotingClient nettyRemotingClient
      */
     public MasterExecThread(ProcessInstance processInstance
@@ -483,7 +484,7 @@ public class MasterExecThread implements Runnable {
      * encapsulation task
      *
      * @param processInstance process instance
-     * @param nodeName node name
+     * @param nodeName        node name
      * @return TaskInstance
      */
     private TaskInstance createTaskInstance(ProcessInstance processInstance, String nodeName,
@@ -563,7 +564,7 @@ public class MasterExecThread implements Runnable {
         // if previous node success , post node submit
         for (TaskInstance task : taskInstances) {
 
-            if (readyToSubmitTaskList.containsKey(task.getName())) {
+            if (readyToSubmitTaskQueue.contains(task)) {
                 continue;
             }
 
@@ -699,7 +700,7 @@ public class MasterExecThread implements Runnable {
                 return true;
             }
             if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE) {
-                return readyToSubmitTaskList.size() == 0 || activeTaskNode.size() == 0;
+                return readyToSubmitTaskQueue.size() == 0 || activeTaskNode.size() == 0;
             }
         }
         return false;
@@ -731,7 +732,7 @@ public class MasterExecThread implements Runnable {
         List<TaskInstance> pauseList = getCompleteTaskByState(ExecutionStatus.PAUSE);
         if (CollectionUtils.isNotEmpty(pauseList)
                 || !isComplementEnd()
-                || readyToSubmitTaskList.size() > 0) {
+                || readyToSubmitTaskQueue.size() > 0) {
             return ExecutionStatus.PAUSE;
         } else {
             return ExecutionStatus.SUCCESS;
@@ -782,7 +783,7 @@ public class MasterExecThread implements Runnable {
         // success
         if (state == ExecutionStatus.RUNNING_EXECUTION) {
             List<TaskInstance> killTasks = getCompleteTaskByState(ExecutionStatus.KILL);
-            if (readyToSubmitTaskList.size() > 0) {
+            if (readyToSubmitTaskQueue.size() > 0) {
                 //tasks currently pending submission, no retries, indicating that depend is waiting to complete
                 return ExecutionStatus.RUNNING_EXECUTION;
             } else if (CollectionUtils.isNotEmpty(killTasks)) {
@@ -804,8 +805,8 @@ public class MasterExecThread implements Runnable {
 
         boolean result = false;
 
-        for (String taskName : readyToSubmitTaskList.keySet()) {
-            TaskInstance task = readyToSubmitTaskList.get(taskName);
+        for (Iterator<TaskInstance> iter = readyToSubmitTaskQueue.iterator(); iter.hasNext(); ) {
+            TaskInstance task = iter.next();
             if (task.getState().typeIsFailure()) {
                 result = true;
                 break;
@@ -872,7 +873,11 @@ public class MasterExecThread implements Runnable {
      */
     private void addTaskToStandByList(TaskInstance taskInstance) {
         logger.info("add task to stand by list: {}", taskInstance.getName());
-        readyToSubmitTaskList.putIfAbsent(taskInstance.getName(), taskInstance);
+        try {
+            readyToSubmitTaskQueue.put(taskInstance);
+        } catch (Exception e) {
+            logger.error("add task instance to readyToSubmitTaskQueue error");
+        }
     }
 
     /**
@@ -882,7 +887,11 @@ public class MasterExecThread implements Runnable {
      */
     private void removeTaskFromStandbyList(TaskInstance taskInstance) {
         logger.info("remove task from stand by list: {}", taskInstance.getName());
-        readyToSubmitTaskList.remove(taskInstance.getName());
+        try {
+            readyToSubmitTaskQueue.remove(taskInstance);
+        } catch (Exception e) {
+            logger.error("remove task instance from readyToSubmitTaskQueue error");
+        }
     }
 
     /**
@@ -891,8 +900,8 @@ public class MasterExecThread implements Runnable {
      * @return Boolean whether has retry task in standby
      */
     private boolean hasRetryTaskInStandBy() {
-        for (Map.Entry<String, TaskInstance> entry : readyToSubmitTaskList.entrySet()) {
-            if (entry.getValue().getState().typeIsFailure()) {
+        for (Iterator<TaskInstance> iter = readyToSubmitTaskQueue.iterator(); iter.hasNext(); ) {
+            if (iter.next().getState().typeIsFailure()) {
                 return true;
             }
         }
@@ -1083,20 +1092,25 @@ public class MasterExecThread implements Runnable {
      * handling the list of tasks to be submitted
      */
     private void submitStandByTask() {
-        for (Map.Entry<String, TaskInstance> entry : readyToSubmitTaskList.entrySet()) {
-            TaskInstance task = entry.getValue();
-            DependResult dependResult = getDependResultForTask(task);
-            if (DependResult.SUCCESS == dependResult) {
-                if (retryTaskIntervalOverTime(task)) {
-                    submitTaskExec(task);
+        try {
+            int length = readyToSubmitTaskQueue.size();
+            for (int i = 0; i < length; i++) {
+                TaskInstance task = readyToSubmitTaskQueue.peek();
+                DependResult dependResult = getDependResultForTask(task);
+                if (DependResult.SUCCESS == dependResult) {
+                    if (retryTaskIntervalOverTime(task)) {
+                        submitTaskExec(task);
+                        removeTaskFromStandbyList(task);
+                    }
+                } else if (DependResult.FAILED == dependResult) {
+                    // if the dependency fails, the current node is not submitted and the state changes to failure.
+                    dependFailedTask.put(task.getName(), task);
                     removeTaskFromStandbyList(task);
+                    logger.info("task {},id:{} depend result : {}", task.getName(), task.getId(), dependResult);
                 }
-            } else if (DependResult.FAILED == dependResult) {
-                // if the dependency fails, the current node is not submitted and the state changes to failure.
-                dependFailedTask.put(entry.getKey(), task);
-                removeTaskFromStandbyList(task);
-                logger.info("task {},id:{} depend result : {}", task.getName(), task.getId(), dependResult);
             }
+        } catch (Exception e) {
+            logger.error("submit standby task error", e);
         }
     }
 
@@ -1185,9 +1199,9 @@ public class MasterExecThread implements Runnable {
      * generate flow dag
      *
      * @param processDefinitionJson process definition json
-     * @param startNodeNameList start node name list
-     * @param recoveryNodeNameList recovery node name list
-     * @param depNodeType depend node type
+     * @param startNodeNameList     start node name list
+     * @param recoveryNodeNameList  recovery node name list
+     * @param depNodeType           depend node type
      * @return ProcessDag           process dag
      * @throws Exception exception
      */
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/exceptions/TaskPriorityQueueException.java
similarity index 59%
copy from dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java
copy to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/exceptions/TaskPriorityQueueException.java
index 3ad9aef..30a7214 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/exceptions/TaskPriorityQueueException.java
@@ -14,31 +14,31 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dolphinscheduler.service.queue;
 
+package org.apache.dolphinscheduler.service.exceptions;
 
-public interface TaskPriorityQueue {
+/**
+ * task priority queue exception
+ */
+public class TaskPriorityQueueException extends Exception {
 
     /**
-     * put task info
+     * Construct a new runtime exception with the detail message
      *
-     * @param taskInfo taskInfo
-     * @throws Exception
+     * @param message message
      */
-    void put(String taskInfo) throws Exception;
+    public TaskPriorityQueueException(String message) {
+        super(message);
+    }
 
     /**
-     * take taskInfo
-     * @return taskInfo
-     * @throws Exception
-     */
-    String take()throws Exception;
-
-    /**
-     * size
+     * Construct a new runtime exception with the detail message and cause
      *
-     * @return size
-     * @throws Exception
+     * @param message   message
+     * @param cause     cause
      */
-    int size() throws Exception;
-}
\ No newline at end of file
+    public TaskPriorityQueueException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
new file mode 100644
index 0000000..d7a9025
--- /dev/null
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.queue;
+
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+
+/**
+ * Task instances priority queue implementation
+ * All the task instances are in the same process instance.
+ */
+public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInstance> {
+    /**
+     * queue size
+     */
+    private static final Integer QUEUE_MAX_SIZE = 3000;
+
+    /**
+     * queue
+     */
+    private PriorityQueue<TaskInstance> queue = new PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator());
+
+    /**
+     * put task instance to priority queue
+     *
+     * @param taskInstance taskInstance
+     * @throws TaskPriorityQueueException
+     */
+    public void put(TaskInstance taskInstance) throws TaskPriorityQueueException {
+        queue.add(taskInstance);
+    }
+
+    /**
+     * take task info
+     *
+     * @return task instance
+     * @throws TaskPriorityQueueException
+     */
+    @Override
+    public TaskInstance take() throws TaskPriorityQueueException {
+        return queue.poll();
+    }
+
+    /**
+     * peek taskInfo
+     *
+     * @return task instance
+     */
+    public TaskInstance peek() {
+        return queue.peek();
+    }
+
+    /**
+     * queue size
+     *
+     * @return size
+     */
+    public int size() {
+        return queue.size();
+    }
+
+    /**
+     * whether contains the task instance
+     *
+     * @param taskInstance task instance
+     * @return true is contains
+     */
+    public boolean contains(TaskInstance taskInstance) {
+        return queue.contains(taskInstance);
+    }
+
+    /**
+     * remove task
+     *
+     * @param taskInstance task instance
+     * @return true if remove success
+     */
+    public boolean remove(TaskInstance taskInstance) {
+        return queue.remove(taskInstance);
+    }
+
+    /**
+     * get iterator
+     *
+     * @return Iterator
+     */
+    public Iterator<TaskInstance> iterator() {
+        return queue.iterator();
+    }
+
+    /**
+     * TaskInfoComparator
+     */
+    private class TaskInfoComparator implements Comparator<TaskInstance> {
+
+        /**
+         * compare o1 o2
+         *
+         * @param o1 o1
+         * @param o2 o2
+         * @return compare result
+         */
+        @Override
+        public int compare(TaskInstance o1, TaskInstance o2) {
+            return o1.getTaskInstancePriority().compareTo(o2.getTaskInstancePriority());
+        }
+    }
+}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java
index 3ad9aef..14c6b38 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java
@@ -14,31 +14,38 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.service.queue;
 
+import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
 
-public interface TaskPriorityQueue {
+/**
+ * task priority queue
+ * @param <T>
+ */
+public interface TaskPriorityQueue<T> {
 
     /**
      * put task info
      *
      * @param taskInfo taskInfo
-     * @throws Exception
+     * @throws TaskPriorityQueueException
      */
-    void put(String taskInfo) throws Exception;
+    void put(T taskInfo) throws TaskPriorityQueueException;
 
     /**
      * take taskInfo
+     *
      * @return taskInfo
-     * @throws Exception
+     * @throws TaskPriorityQueueException
      */
-    String take()throws Exception;
+    T take() throws TaskPriorityQueueException, InterruptedException;
 
     /**
      * size
      *
      * @return size
-     * @throws Exception
+     * @throws TaskPriorityQueueException
      */
-    int size() throws Exception;
+    int size() throws TaskPriorityQueueException;
 }
\ No newline at end of file
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
index 0a0fb1b..aefad84 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
@@ -14,22 +14,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.service.queue;
 
+import static org.apache.dolphinscheduler.common.Constants.TASK_INFO_LENGTH;
+import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
 
-import org.springframework.stereotype.Service;
+import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
 
-import java.util.*;
+import java.util.Comparator;
 import java.util.concurrent.PriorityBlockingQueue;
 
-import static org.apache.dolphinscheduler.common.Constants.*;
+import org.springframework.stereotype.Service;
+
+
 
 /**
  * A singleton of a task queue implemented with zookeeper
  * tasks queue implementation
  */
 @Service
-public class TaskPriorityQueueImpl implements TaskPriorityQueue {
+public class TaskPriorityQueueImpl implements TaskPriorityQueue<String> {
     /**
      * queue size
      */
@@ -44,40 +49,43 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue {
      * put task takePriorityInfo
      *
      * @param taskPriorityInfo takePriorityInfo
-     * @throws Exception
+     * @throws TaskPriorityQueueException
      */
     @Override
-    public void put(String taskPriorityInfo) throws Exception {
+    public void put(String taskPriorityInfo) throws TaskPriorityQueueException {
         queue.put(taskPriorityInfo);
     }
 
     /**
      * take taskInfo
+     *
      * @return taskInfo
-     * @throws Exception
+     * @throws TaskPriorityQueueException
      */
     @Override
-    public String take() throws Exception {
+    public String take() throws TaskPriorityQueueException, InterruptedException {
         return queue.take();
     }
 
     /**
      * queue size
+     *
      * @return size
-     * @throws Exception
+     * @throws TaskPriorityQueueException
      */
     @Override
-    public int size() throws Exception {
+    public int size() throws TaskPriorityQueueException {
         return queue.size();
     }
 
     /**
      * TaskInfoComparator
      */
-    private class TaskInfoComparator implements Comparator<String>{
+    private class TaskInfoComparator implements Comparator<String> {
 
         /**
          * compare o1 o2
+         *
          * @param o1 o1
          * @param o2 o2
          * @return compare result
@@ -87,15 +95,15 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue {
             String s1 = o1;
             String s2 = o2;
             String[] s1Array = s1.split(UNDERLINE);
-            if(s1Array.length > TASK_INFO_LENGTH){
+            if (s1Array.length > TASK_INFO_LENGTH) {
                 // warning: if this length > 5, need to be changed
-                s1 = s1.substring(0, s1.lastIndexOf(UNDERLINE) );
+                s1 = s1.substring(0, s1.lastIndexOf(UNDERLINE));
             }
 
             String[] s2Array = s2.split(UNDERLINE);
-            if(s2Array.length > TASK_INFO_LENGTH){
+            if (s2Array.length > TASK_INFO_LENGTH) {
                 // warning: if this length > 5, need to be changed
-                s2 = s2.substring(0, s2.lastIndexOf(UNDERLINE) );
+                s2 = s2.substring(0, s2.lastIndexOf(UNDERLINE));
             }
 
             return s1.compareTo(s2);
diff --git a/dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java b/dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java
new file mode 100644
index 0000000..cf39d57
--- /dev/null
+++ b/dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package queue;
+
+import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Task instances priority queue implementation
+ * All the task instances are in the same process instance.
+ */
+public class PeerTaskInstancePriorityQueueTest {
+
+    @Test
+    public void testPut() throws Exception {
+        PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue();
+        TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH);
+        TaskInstance taskInstanceMediumPriority = createTaskInstance("high", Priority.MEDIUM);
+        queue.put(taskInstanceHigPriority);
+        queue.put(taskInstanceMediumPriority);
+        Assert.assertEquals(2,queue.size());
+    }
+
+    @Test
+    public void testPeek() throws Exception {
+        PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue();
+        int peekBeforeLength = queue.size();
+        queue.peek();
+        Assert.assertEquals(peekBeforeLength,queue.size());
+
+    }
+
+    @Test
+    public void testTake() throws Exception {
+        PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue();
+        int peekBeforeLength = queue.size();
+        queue.take();
+        Assert.assertTrue(queue.size() < peekBeforeLength);
+    }
+
+    /**
+     * get queue
+     *
+     * @return queue
+     * @throws Exception
+     */
+    private PeerTaskInstancePriorityQueue getPeerTaskInstancePriorityQueue() throws Exception {
+        PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue();
+        TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH);
+        TaskInstance taskInstanceMediumPriority = createTaskInstance("high", Priority.MEDIUM);
+        queue.put(taskInstanceHigPriority);
+        queue.put(taskInstanceMediumPriority);
+        return queue;
+    }
+
+    /**
+     * create task instance
+     *
+     * @param name      name
+     * @param priority  priority
+     * @return
+     */
+    private TaskInstance createTaskInstance(String name, Priority priority) {
+        TaskInstance taskInstance = new TaskInstance();
+        taskInstance.setName(name);
+        taskInstance.setTaskInstancePriority(priority);
+        return taskInstance;
+    }
+
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 5163ae7..4eda053 100644
--- a/pom.xml
+++ b/pom.xml
@@ -862,6 +862,7 @@
                         <include>**/service/zk/ZKServerTest.java</include>
                         <include>**/service/zk/CuratorZookeeperClientTest.java</include>
                         <include>**/service/queue/TaskUpdateQueueTest.java</include>
+                        <include>**/service/queue/PeerTaskInstancePriorityQueueTest.java</include>
 
                         <include>**/dao/mapper/DataSourceUserMapperTest.java</include>
                         <!--<iTaskUpdateQueueConsumerThreadnclude>**/dao/mapper/ErrorCommandMapperTest.java</iTaskUpdateQueueConsumerThreadnclude>-->