You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/05/23 07:01:03 UTC

[dolphinscheduler] 14/33: [BUG][TaskGroup] Task group does not take effect (#10093)

This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch 3.0.0-beta-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit b016037a6f75686898b4a831be61fe11c5769e29
Author: BaoLiang <29...@users.noreply.github.com>
AuthorDate: Wed May 18 18:40:36 2022 +0800

    [BUG][TaskGroup] Task group does not take effect (#10093)
    
    * fix 10092: Task group does not take effect
    
    * fix 10092: Task group does not take effect
    
    * fix 10092: Task group does not take effect
    
    (cherry picked from commit ee2b855ced2e46bcdaf3ae7068941ac7e1c03c17)
---
 .../apache/dolphinscheduler/common/Constants.java  |  4 ++
 .../dolphinscheduler/dao/entity/TaskInstance.java  |  2 +
 .../master/runner/task/CommonTaskProcessor.java    |  2 +-
 .../consumer/TaskPriorityQueueConsumerTest.java    | 12 ++---
 .../queue/PeerTaskInstancePriorityQueue.java       |  5 ++
 .../service/queue/TaskPriority.java                | 33 ++++++++++--
 .../queue/PeerTaskInstancePriorityQueueTest.java   | 57 ++++++++++++++++----
 .../service/queue/TaskPriorityQueueImplTest.java   | 62 ++++++++++++++++------
 8 files changed, 140 insertions(+), 37 deletions(-)

diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 57b5c92970..19fbdb90ca 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -66,6 +66,8 @@ public final class Constants {
 
     public static final String BUCKET_NAME = "dolphinscheduler-test";
 
+    public static final String EMPTY_STRING = "";
+
     /**
      * fs.defaultFS
      */
@@ -422,6 +424,8 @@ public final class Constants {
      */
     public static final int DEFINITION_FAILURE = -1;
 
+    public static final int OPPOSITE_VALUE = -1;
+
     /**
      * process or task definition first version
      */
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index 6ab0c340f1..4ec1dfde2b 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -713,6 +713,8 @@ public class TaskInstance implements Serializable {
                 + ", executorName='" + executorName + '\''
                 + ", delayTime=" + delayTime
                 + ", dryRun=" + dryRun
+                + ", taskGroupId=" + taskGroupId
+                + ", taskGroupPriority=" + taskGroupPriority
                 + '}';
     }
 
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index 5833bc56c1..ffeb89a0d2 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -114,7 +114,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
 
             TaskPriority taskPriority = new TaskPriority(processInstance.getProcessInstancePriority().getCode(),
                     processInstance.getId(), taskInstance.getProcessInstancePriority().getCode(),
-                    taskInstance.getId(), org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP);
+                    taskInstance.getId(), taskInstance.getTaskGroupPriority(), org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP);
 
             TaskExecutionContext taskExecutionContext = getTaskExecutionContext(taskInstance);
             if (taskExecutionContext == null) {
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
index ec3f58aca0..05590e4541 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
@@ -98,7 +98,7 @@ public class TaskPriorityQueueConsumerTest {
         processDefinition.setUserId(2);
         taskInstance.setProcessDefine(processDefinition);
 
-        TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default");
+        TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, 1, "default");
         taskPriorityQueue.put(taskPriority);
 
         TimeUnit.SECONDS.sleep(10);
@@ -125,7 +125,7 @@ public class TaskPriorityQueueConsumerTest {
         ProcessDefinition processDefinition = new ProcessDefinition();
         processDefinition.setUserId(2);
         taskInstance.setProcessDefine(processDefinition);
-        TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default");
+        TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, 1, "default");
         taskPriorityQueue.put(taskPriority);
 
         DataSource dataSource = new DataSource();
@@ -166,7 +166,7 @@ public class TaskPriorityQueueConsumerTest {
         ProcessDefinition processDefinition = new ProcessDefinition();
         processDefinition.setUserId(2);
         taskInstance.setProcessDefine(processDefinition);
-        TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default");
+        TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, 1, "default");
         taskPriorityQueue.put(taskPriority);
 
         DataSource dataSource = new DataSource();
@@ -205,7 +205,7 @@ public class TaskPriorityQueueConsumerTest {
         ProcessDefinition processDefinition = new ProcessDefinition();
         processDefinition.setUserId(2);
         taskInstance.setProcessDefine(processDefinition);
-        TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default");
+        TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, 1, "default");
         taskPriorityQueue.put(taskPriority);
 
         DataSource dataSource = new DataSource();
@@ -266,7 +266,7 @@ public class TaskPriorityQueueConsumerTest {
 
         Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
 
-        TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "NoWorkGroup");
+        TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, 1, "NoWorkGroup");
         taskPriorityQueue.put(taskPriority);
 
         TimeUnit.SECONDS.sleep(10);
@@ -335,7 +335,7 @@ public class TaskPriorityQueueConsumerTest {
 
         Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
 
-        TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "NoWorkGroup");
+        TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, 1, "NoWorkGroup");
         taskPriorityQueue.put(taskPriority);
 
         taskPriorityQueueConsumer.run();
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
index 7502607bcf..231fd2a20f 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.service.queue;
 
+import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
 
@@ -168,6 +169,10 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
          */
         @Override
         public int compare(TaskInstance o1, TaskInstance o2) {
+            if(o1.getTaskInstancePriority().equals(o2.getTaskInstancePriority())){
+                // larger number, higher priority
+                return Constants.OPPOSITE_VALUE * Integer.compare(o1.getTaskGroupPriority(),o2.getTaskGroupPriority());
+            }
             return o1.getTaskInstancePriority().compareTo(o2.getTaskInstancePriority());
         }
     }
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
index 12a7258d6c..0aeec4609d 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
@@ -17,7 +17,9 @@
 
 package org.apache.dolphinscheduler.service.queue;
 
+import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
 
 import java.util.Map;
 import java.util.Objects;
@@ -67,6 +69,8 @@ public class TaskPriority implements Comparable<TaskPriority> {
      */
     private long checkpoint;
 
+    private int taskGroupPriority;
+
     public TaskPriority() {
         this.checkpoint = System.currentTimeMillis();
     }
@@ -74,11 +78,13 @@ public class TaskPriority implements Comparable<TaskPriority> {
     public TaskPriority(int processInstancePriority,
                         int processInstanceId,
                         int taskInstancePriority,
-                        int taskId, String groupName) {
+                        int taskId,
+                        int taskGroupPriority, String groupName) {
         this.processInstancePriority = processInstancePriority;
         this.processInstanceId = processInstanceId;
         this.taskInstancePriority = taskInstancePriority;
         this.taskId = taskId;
+        this.taskGroupPriority = taskGroupPriority;
         this.groupName = groupName;
         this.checkpoint = System.currentTimeMillis();
     }
@@ -147,6 +153,14 @@ public class TaskPriority implements Comparable<TaskPriority> {
         this.checkpoint = checkpoint;
     }
 
+    public int getTaskGroupPriority() {
+        return taskGroupPriority;
+    }
+
+    public void setTaskGroupPriority(int taskGroupPriority) {
+        this.taskGroupPriority = taskGroupPriority;
+    }
+
     @Override
     public int compareTo(TaskPriority other) {
         if (this.getProcessInstancePriority() > other.getProcessInstancePriority()) {
@@ -169,15 +183,22 @@ public class TaskPriority implements Comparable<TaskPriority> {
         if (this.getTaskInstancePriority() < other.getTaskInstancePriority()) {
             return -1;
         }
-
+        if(this.getTaskGroupPriority() != other.getTaskGroupPriority()){
+            // larger number, higher priority
+            return Constants.OPPOSITE_VALUE * Integer.compare(this.getTaskGroupPriority(), other.getTaskGroupPriority());
+        }
         if (this.getTaskId() > other.getTaskId()) {
             return 1;
         }
         if (this.getTaskId() < other.getTaskId()) {
             return -1;
         }
-
-        return this.getGroupName().compareTo(other.getGroupName());
+        String thisGroupName = StringUtils.isNotBlank(this.getGroupName()) ? this.getGroupName() : Constants.EMPTY_STRING;
+        String otherGroupName = StringUtils.isNotBlank(other.getGroupName()) ? other.getGroupName() : Constants.EMPTY_STRING;
+        if(!thisGroupName.equals(otherGroupName)){
+            return thisGroupName.compareTo(otherGroupName);
+        }
+        return Long.compare(this.getCheckpoint(), other.getCheckpoint());
     }
 
     @Override
@@ -193,11 +214,13 @@ public class TaskPriority implements Comparable<TaskPriority> {
                 && processInstanceId == that.processInstanceId
                 && taskInstancePriority == that.taskInstancePriority
                 && taskId == that.taskId
+                && taskGroupPriority == that.taskGroupPriority
                 && Objects.equals(groupName, that.groupName);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(processInstancePriority, processInstanceId, taskInstancePriority, taskId, groupName);
+        return Objects.hash(processInstancePriority, processInstanceId, taskInstancePriority, taskId, taskGroupPriority, groupName);
     }
+
 }
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java
index 7feb4c676b..8da3a6c194 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java
@@ -31,8 +31,8 @@ public class PeerTaskInstancePriorityQueueTest {
     @Test
     public void put() throws TaskPriorityQueueException {
         PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue();
-        TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH);
-        TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM);
+        TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1);
+        TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1);
         queue.put(taskInstanceHigPriority);
         queue.put(taskInstanceMediumPriority);
         Assert.assertEquals(2, queue.size());
@@ -60,10 +60,46 @@ public class PeerTaskInstancePriorityQueueTest {
     public void peek() throws Exception {
         PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue();
         int peekBeforeLength = queue.size();
-        queue.peek();
         Assert.assertEquals(peekBeforeLength, queue.size());
     }
 
+    @Test
+    public void peekTaskGroupPriority() throws Exception{
+        PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue();
+        TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 2);
+        TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.HIGH, 1);
+        queue.put(taskInstanceMediumPriority);
+        queue.put(taskInstanceHigPriority);
+        TaskInstance taskInstance = queue.peek();
+        queue.clear();
+        Assert.assertEquals(taskInstance.getName(), "high");
+
+        taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1);
+        taskInstanceMediumPriority = createTaskInstance("medium", Priority.HIGH, 2);
+        queue.put(taskInstanceMediumPriority);
+        queue.put(taskInstanceHigPriority);
+        taskInstance = queue.peek();
+        queue.clear();
+        Assert.assertEquals(taskInstance.getName(), "medium");
+
+        taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1);
+        taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 2);
+        queue.put(taskInstanceMediumPriority);
+        queue.put(taskInstanceHigPriority);
+        taskInstance = queue.peek();
+        queue.clear();
+        Assert.assertEquals(taskInstance.getName(), "high");
+
+        taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1);
+        taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1);
+        queue.put(taskInstanceMediumPriority);
+        queue.put(taskInstanceHigPriority);
+        taskInstance = queue.peek();
+        queue.clear();
+        Assert.assertEquals(taskInstance.getName(), "high");
+
+    }
+
     @Test
     public void size() throws Exception {
         Assert.assertEquals(2, getPeerTaskInstancePriorityQueue().size());
@@ -72,7 +108,7 @@ public class PeerTaskInstancePriorityQueueTest {
     @Test
     public void contains() throws Exception {
         PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue();
-        TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM);
+        TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1);
         queue.put(taskInstanceMediumPriority);
         Assert.assertTrue(queue.contains(taskInstanceMediumPriority));
     }
@@ -80,7 +116,7 @@ public class PeerTaskInstancePriorityQueueTest {
     @Test
     public void remove() throws Exception {
         PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue();
-        TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM);
+        TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1);
         queue.put(taskInstanceMediumPriority);
         int peekBeforeLength = queue.size();
         queue.remove(taskInstanceMediumPriority);
@@ -95,10 +131,12 @@ public class PeerTaskInstancePriorityQueueTest {
      */
     private PeerTaskInstancePriorityQueue getPeerTaskInstancePriorityQueue() throws Exception {
         PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue();
-        TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH);
-        TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM);
-        queue.put(taskInstanceHigPriority);
+        TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1);
+        TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1);
+        taskInstanceHigPriority.setTaskGroupPriority(3);
+        taskInstanceMediumPriority.setTaskGroupPriority(2);
         queue.put(taskInstanceMediumPriority);
+        queue.put(taskInstanceHigPriority);
         return queue;
     }
 
@@ -109,10 +147,11 @@ public class PeerTaskInstancePriorityQueueTest {
      * @param priority priority
      * @return
      */
-    private TaskInstance createTaskInstance(String name, Priority priority) {
+    private TaskInstance createTaskInstance(String name, Priority priority, int taskGroupPriority) {
         TaskInstance taskInstance = new TaskInstance();
         taskInstance.setName(name);
         taskInstance.setTaskInstancePriority(priority);
+        taskInstance.setTaskGroupPriority(taskGroupPriority);
         return taskInstance;
     }
 }
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java
index 3888d3d93c..c2fb14acfa 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java
@@ -31,9 +31,9 @@ public class TaskPriorityQueueImplTest {
 
     @Test
     public void testSort() {
-        TaskPriority priorityOne = new TaskPriority(1, 0, 0, 0, "default");
-        TaskPriority priorityTwo = new TaskPriority(2, 0, 0, 0, "default");
-        TaskPriority priorityThree = new TaskPriority(3, 0, 0, 0, "default");
+        TaskPriority priorityOne = new TaskPriority(1, 0, 0, 0, 1, "default");
+        TaskPriority priorityTwo = new TaskPriority(2, 0, 0, 0, 1, "default");
+        TaskPriority priorityThree = new TaskPriority(3, 0, 0, 0, 1, "default");
         List<TaskPriority> taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo);
         Collections.sort(taskPrioritys);
         Assert.assertEquals(
@@ -41,9 +41,9 @@ public class TaskPriorityQueueImplTest {
             taskPrioritys
         );
 
-        priorityOne = new TaskPriority(0, 1, 0, 0, "default");
-        priorityTwo = new TaskPriority(0, 2, 0, 0, "default");
-        priorityThree = new TaskPriority(0, 3, 0, 0, "default");
+        priorityOne = new TaskPriority(0, 1, 0, 0, 1, "default");
+        priorityTwo = new TaskPriority(0, 2, 0, 0, 1, "default");
+        priorityThree = new TaskPriority(0, 3, 0, 0, 1, "default");
         taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo);
         Collections.sort(taskPrioritys);
         Assert.assertEquals(
@@ -51,9 +51,9 @@ public class TaskPriorityQueueImplTest {
             taskPrioritys
         );
 
-        priorityOne = new TaskPriority(0, 0, 1, 0, "default");
-        priorityTwo = new TaskPriority(0, 0, 2, 0, "default");
-        priorityThree = new TaskPriority(0, 0, 3, 0, "default");
+        priorityOne = new TaskPriority(0, 0, 1, 0, 1, "default");
+        priorityTwo = new TaskPriority(0, 0, 2, 0, 1, "default");
+        priorityThree = new TaskPriority(0, 0, 3, 0, 1, "default");
         taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo);
         Collections.sort(taskPrioritys);
         Assert.assertEquals(
@@ -61,9 +61,9 @@ public class TaskPriorityQueueImplTest {
             taskPrioritys
         );
 
-        priorityOne = new TaskPriority(0, 0, 0, 1, "default");
-        priorityTwo = new TaskPriority(0, 0, 0, 2, "default");
-        priorityThree = new TaskPriority(0, 0, 0, 3, "default");
+        priorityOne = new TaskPriority(0, 0, 0, 1, 1, "default");
+        priorityTwo = new TaskPriority(0, 0, 0, 2, 1, "default");
+        priorityThree = new TaskPriority(0, 0, 0, 3, 1, "default");
         taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo);
         Collections.sort(taskPrioritys);
         Assert.assertEquals(
@@ -71,15 +71,45 @@ public class TaskPriorityQueueImplTest {
             taskPrioritys
         );
 
-        priorityOne = new TaskPriority(0, 0, 0, 0, "default_1");
-        priorityTwo = new TaskPriority(0, 0, 0, 0, "default_2");
-        priorityThree = new TaskPriority(0, 0, 0, 0, "default_3");
+        priorityOne = new TaskPriority(0, 0, 0, 0, 1, "default_1");
+        priorityTwo = new TaskPriority(0, 0, 0, 0, 1,"default_2");
+        priorityThree = new TaskPriority(0, 0, 0, 0, 1, "default_3");
         taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo);
         Collections.sort(taskPrioritys);
         Assert.assertEquals(
             Arrays.asList(priorityOne, priorityTwo, priorityThree),
             taskPrioritys
         );
+
+        priorityOne = new TaskPriority(0, 0, 0, 0, 2, "default_1");
+        priorityTwo = new TaskPriority(0, 0, 0, 0, 1,"default_2");
+        priorityThree = new TaskPriority(0, 0, 0, 0, 3, "default_3");
+        taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo);
+        Collections.sort(taskPrioritys);
+        Assert.assertEquals(
+                Arrays.asList(priorityThree, priorityOne, priorityTwo),
+                taskPrioritys
+        );
+
+        priorityOne = new TaskPriority(0, 0, 0, 0, 1, "default_1");
+        priorityTwo = new TaskPriority(0, 0, 0, 0, 1,"default_2");
+        priorityThree = new TaskPriority(0, 0, 0, 0, 3, "default_3");
+        taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo);
+        Collections.sort(taskPrioritys);
+        Assert.assertEquals(
+                Arrays.asList(priorityThree, priorityOne, priorityTwo),
+                taskPrioritys
+        );
+
+        priorityTwo = new TaskPriority(0, 0, 0, 0, 1,"default_1");
+        priorityOne = new TaskPriority(0, 0, 0, 0, 1, "default_1");
+        priorityThree = new TaskPriority(0, 0, 0, 0, 3, "default_1");
+        taskPrioritys = Arrays.asList(priorityTwo, priorityOne, priorityThree);
+        Collections.sort(taskPrioritys);
+        Assert.assertEquals(
+                Arrays.asList(priorityThree, priorityTwo, priorityOne),
+                taskPrioritys
+        );
     }
 
     @Test
@@ -134,7 +164,7 @@ public class TaskPriorityQueueImplTest {
      * @return
      */
     private TaskPriority createTaskPriority(Integer priority, Integer processInstanceId) {
-        TaskPriority priorityOne = new TaskPriority(priority, processInstanceId, 0, 0, "default");
+        TaskPriority priorityOne = new TaskPriority(priority, processInstanceId, 0, 0, 1, "default");
         return priorityOne;
     }
 }
\ No newline at end of file