You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by li...@apache.org on 2020/12/18 09:28:46 UTC
[incubator-dolphinscheduler] branch dev updated:
[Fix-4222][Master]Add the priority queue to ensure that tasks are submitted
according to priority. (#4250)
This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 9fe5c37 [Fix-4222][Master]Add the priority queue to ensure that tasks are submitted according to priority. (#4250)
9fe5c37 is described below
commit 9fe5c3717ded3e42ccde854d5ba23a685c23508d
Author: lgcareer <18...@163.com>
AuthorDate: Fri Dec 18 17:28:37 2020 +0800
[Fix-4222][Master]Add the priority queue to ensure that tasks are submitted according to priority. (#4250)
* [Fix-4222][Master]Add the priority queue to ensure that tasks are submitted according to priority.
* [Fix-4222][Master]Add the priority queue to ensure that tasks are submitted according to priority.
* [Fix-4222][Master]Remove useless import
* [Fix-4222][Master]Reformat code style
* [Fix-4222][Master]Reformat code style
* [Fix-4222][Master]Reformat code style
* [Fix-4222][Master]add PeerTaskInstancePriorityQueueTest
* [Fix-4222][Master]Fix code smell
* [Fix-4222][Master]Reformat code style
* [Fix-4222][Master]Fix code smell
Co-authored-by: xingchun-chen <55...@users.noreply.github.com>
---
.../master/consumer/TaskPriorityQueueConsumer.java | 2 +-
.../server/master/runner/MasterExecThread.java | 72 +++++++-----
.../TaskPriorityQueueException.java} | 36 +++---
.../queue/PeerTaskInstancePriorityQueue.java | 127 +++++++++++++++++++++
.../service/queue/TaskPriorityQueue.java | 21 ++--
.../service/queue/TaskPriorityQueueImpl.java | 38 +++---
.../queue/PeerTaskInstancePriorityQueueTest.java | 89 +++++++++++++++
pom.xml | 1 +
8 files changed, 316 insertions(+), 70 deletions(-)
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index 89d3e97..a407e4e 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -90,7 +90,7 @@ public class TaskPriorityQueueConsumer extends Thread {
* taskUpdateQueue
*/
@Autowired
- private TaskPriorityQueue taskPriorityQueue;
+ private TaskPriorityQueue<String> taskPriorityQueue;
/**
* processService
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index 66de086..e68f9d5 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -55,6 +55,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.AlertManager;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
+import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
import org.apache.commons.io.FileUtils;
@@ -120,9 +121,9 @@ public class MasterExecThread implements Runnable {
private Map<String, TaskInstance> completeTaskList = new ConcurrentHashMap<>();
/**
- * ready to submit task list
+ * ready to submit task queue
*/
- private Map<String, TaskInstance> readyToSubmitTaskList = new ConcurrentHashMap<>();
+ private PeerTaskInstancePriorityQueue readyToSubmitTaskQueue = new PeerTaskInstancePriorityQueue();
/**
* depend failed task map
@@ -178,8 +179,8 @@ public class MasterExecThread implements Runnable {
/**
* constructor of MasterExecThread
*
- * @param processInstance processInstance
- * @param processService processService
+ * @param processInstance processInstance
+ * @param processService processService
* @param nettyRemotingClient nettyRemotingClient
*/
public MasterExecThread(ProcessInstance processInstance
@@ -483,7 +484,7 @@ public class MasterExecThread implements Runnable {
* encapsulation task
*
* @param processInstance process instance
- * @param nodeName node name
+ * @param nodeName node name
* @return TaskInstance
*/
private TaskInstance createTaskInstance(ProcessInstance processInstance, String nodeName,
@@ -563,7 +564,7 @@ public class MasterExecThread implements Runnable {
// if previous node success , post node submit
for (TaskInstance task : taskInstances) {
- if (readyToSubmitTaskList.containsKey(task.getName())) {
+ if (readyToSubmitTaskQueue.contains(task)) {
continue;
}
@@ -699,7 +700,7 @@ public class MasterExecThread implements Runnable {
return true;
}
if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE) {
- return readyToSubmitTaskList.size() == 0 || activeTaskNode.size() == 0;
+ return readyToSubmitTaskQueue.size() == 0 || activeTaskNode.size() == 0;
}
}
return false;
@@ -731,7 +732,7 @@ public class MasterExecThread implements Runnable {
List<TaskInstance> pauseList = getCompleteTaskByState(ExecutionStatus.PAUSE);
if (CollectionUtils.isNotEmpty(pauseList)
|| !isComplementEnd()
- || readyToSubmitTaskList.size() > 0) {
+ || readyToSubmitTaskQueue.size() > 0) {
return ExecutionStatus.PAUSE;
} else {
return ExecutionStatus.SUCCESS;
@@ -782,7 +783,7 @@ public class MasterExecThread implements Runnable {
// success
if (state == ExecutionStatus.RUNNING_EXECUTION) {
List<TaskInstance> killTasks = getCompleteTaskByState(ExecutionStatus.KILL);
- if (readyToSubmitTaskList.size() > 0) {
+ if (readyToSubmitTaskQueue.size() > 0) {
//tasks currently pending submission, no retries, indicating that depend is waiting to complete
return ExecutionStatus.RUNNING_EXECUTION;
} else if (CollectionUtils.isNotEmpty(killTasks)) {
@@ -804,8 +805,8 @@ public class MasterExecThread implements Runnable {
boolean result = false;
- for (String taskName : readyToSubmitTaskList.keySet()) {
- TaskInstance task = readyToSubmitTaskList.get(taskName);
+ for (Iterator<TaskInstance> iter = readyToSubmitTaskQueue.iterator(); iter.hasNext(); ) {
+ TaskInstance task = iter.next();
if (task.getState().typeIsFailure()) {
result = true;
break;
@@ -872,7 +873,11 @@ public class MasterExecThread implements Runnable {
*/
private void addTaskToStandByList(TaskInstance taskInstance) {
logger.info("add task to stand by list: {}", taskInstance.getName());
- readyToSubmitTaskList.putIfAbsent(taskInstance.getName(), taskInstance);
+ try {
+ readyToSubmitTaskQueue.put(taskInstance);
+ } catch (Exception e) {
+ logger.error("add task instance to readyToSubmitTaskQueue error");
+ }
}
/**
@@ -882,7 +887,11 @@ public class MasterExecThread implements Runnable {
*/
private void removeTaskFromStandbyList(TaskInstance taskInstance) {
logger.info("remove task from stand by list: {}", taskInstance.getName());
- readyToSubmitTaskList.remove(taskInstance.getName());
+ try {
+ readyToSubmitTaskQueue.remove(taskInstance);
+ } catch (Exception e) {
+ logger.error("remove task instance from readyToSubmitTaskQueue error");
+ }
}
/**
@@ -891,8 +900,8 @@ public class MasterExecThread implements Runnable {
* @return Boolean whether has retry task in standby
*/
private boolean hasRetryTaskInStandBy() {
- for (Map.Entry<String, TaskInstance> entry : readyToSubmitTaskList.entrySet()) {
- if (entry.getValue().getState().typeIsFailure()) {
+ for (Iterator<TaskInstance> iter = readyToSubmitTaskQueue.iterator(); iter.hasNext(); ) {
+ if (iter.next().getState().typeIsFailure()) {
return true;
}
}
@@ -1083,20 +1092,25 @@ public class MasterExecThread implements Runnable {
* handling the list of tasks to be submitted
*/
private void submitStandByTask() {
- for (Map.Entry<String, TaskInstance> entry : readyToSubmitTaskList.entrySet()) {
- TaskInstance task = entry.getValue();
- DependResult dependResult = getDependResultForTask(task);
- if (DependResult.SUCCESS == dependResult) {
- if (retryTaskIntervalOverTime(task)) {
- submitTaskExec(task);
+ try {
+ int length = readyToSubmitTaskQueue.size();
+ for (int i = 0; i < length; i++) {
+ TaskInstance task = readyToSubmitTaskQueue.peek();
+ DependResult dependResult = getDependResultForTask(task);
+ if (DependResult.SUCCESS == dependResult) {
+ if (retryTaskIntervalOverTime(task)) {
+ submitTaskExec(task);
+ removeTaskFromStandbyList(task);
+ }
+ } else if (DependResult.FAILED == dependResult) {
+ // if the dependency fails, the current node is not submitted and the state changes to failure.
+ dependFailedTask.put(task.getName(), task);
removeTaskFromStandbyList(task);
+ logger.info("task {},id:{} depend result : {}", task.getName(), task.getId(), dependResult);
}
- } else if (DependResult.FAILED == dependResult) {
- // if the dependency fails, the current node is not submitted and the state changes to failure.
- dependFailedTask.put(entry.getKey(), task);
- removeTaskFromStandbyList(task);
- logger.info("task {},id:{} depend result : {}", task.getName(), task.getId(), dependResult);
}
+ } catch (Exception e) {
+ logger.error("submit standby task error", e);
}
}
@@ -1185,9 +1199,9 @@ public class MasterExecThread implements Runnable {
* generate flow dag
*
* @param processDefinitionJson process definition json
- * @param startNodeNameList start node name list
- * @param recoveryNodeNameList recovery node name list
- * @param depNodeType depend node type
+ * @param startNodeNameList start node name list
+ * @param recoveryNodeNameList recovery node name list
+ * @param depNodeType depend node type
* @return ProcessDag process dag
* @throws Exception exception
*/
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/exceptions/TaskPriorityQueueException.java
similarity index 59%
copy from dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java
copy to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/exceptions/TaskPriorityQueueException.java
index 3ad9aef..30a7214 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/exceptions/TaskPriorityQueueException.java
@@ -14,31 +14,31 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.service.queue;
+package org.apache.dolphinscheduler.service.exceptions;
-public interface TaskPriorityQueue {
+/**
+ * task priority queue exception
+ */
+public class TaskPriorityQueueException extends Exception {
/**
- * put task info
+ * Construct a new runtime exception with the detail message
*
- * @param taskInfo taskInfo
- * @throws Exception
+ * @param message message
*/
- void put(String taskInfo) throws Exception;
+ public TaskPriorityQueueException(String message) {
+ super(message);
+ }
/**
- * take taskInfo
- * @return taskInfo
- * @throws Exception
- */
- String take()throws Exception;
-
- /**
- * size
+ * Construct a new runtime exception with the detail message and cause
*
- * @return size
- * @throws Exception
+ * @param message message
+ * @param cause cause
*/
- int size() throws Exception;
-}
\ No newline at end of file
+ public TaskPriorityQueueException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
new file mode 100644
index 0000000..d7a9025
--- /dev/null
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.queue;
+
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+
+/**
+ * Task instances priority queue implementation
+ * All the task instances are in the same process instance.
+ */
+public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInstance> {
+ /**
+ * queue size
+ */
+ private static final Integer QUEUE_MAX_SIZE = 3000;
+
+ /**
+ * queue
+ */
+ private PriorityQueue<TaskInstance> queue = new PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator());
+
+ /**
+ * put task instance to priority queue
+ *
+ * @param taskInstance taskInstance
+ * @throws TaskPriorityQueueException
+ */
+ public void put(TaskInstance taskInstance) throws TaskPriorityQueueException {
+ queue.add(taskInstance);
+ }
+
+ /**
+ * take task info
+ *
+ * @return task instance
+ * @throws TaskPriorityQueueException
+ */
+ @Override
+ public TaskInstance take() throws TaskPriorityQueueException {
+ return queue.poll();
+ }
+
+ /**
+ * peek taskInfo
+ *
+ * @return task instance
+ */
+ public TaskInstance peek() {
+ return queue.peek();
+ }
+
+ /**
+ * queue size
+ *
+ * @return size
+ */
+ public int size() {
+ return queue.size();
+ }
+
+ /**
+ * whether contains the task instance
+ *
+ * @param taskInstance task instance
+ * @return true is contains
+ */
+ public boolean contains(TaskInstance taskInstance) {
+ return queue.contains(taskInstance);
+ }
+
+ /**
+ * remove task
+ *
+ * @param taskInstance task instance
+ * @return true if remove success
+ */
+ public boolean remove(TaskInstance taskInstance) {
+ return queue.remove(taskInstance);
+ }
+
+ /**
+ * get iterator
+ *
+ * @return Iterator
+ */
+ public Iterator<TaskInstance> iterator() {
+ return queue.iterator();
+ }
+
+ /**
+ * TaskInfoComparator
+ */
+ private class TaskInfoComparator implements Comparator<TaskInstance> {
+
+ /**
+ * compare o1 o2
+ *
+ * @param o1 o1
+ * @param o2 o2
+ * @return compare result
+ */
+ @Override
+ public int compare(TaskInstance o1, TaskInstance o2) {
+ return o1.getTaskInstancePriority().compareTo(o2.getTaskInstancePriority());
+ }
+ }
+}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java
index 3ad9aef..14c6b38 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java
@@ -14,31 +14,38 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.service.queue;
+import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
-public interface TaskPriorityQueue {
+/**
+ * task priority queue
+ * @param <T>
+ */
+public interface TaskPriorityQueue<T> {
/**
* put task info
*
* @param taskInfo taskInfo
- * @throws Exception
+ * @throws TaskPriorityQueueException
*/
- void put(String taskInfo) throws Exception;
+ void put(T taskInfo) throws TaskPriorityQueueException;
/**
* take taskInfo
+ *
* @return taskInfo
- * @throws Exception
+ * @throws TaskPriorityQueueException
*/
- String take()throws Exception;
+ T take() throws TaskPriorityQueueException, InterruptedException;
/**
* size
*
* @return size
- * @throws Exception
+ * @throws TaskPriorityQueueException
*/
- int size() throws Exception;
+ int size() throws TaskPriorityQueueException;
}
\ No newline at end of file
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
index 0a0fb1b..aefad84 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
@@ -14,22 +14,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.service.queue;
+import static org.apache.dolphinscheduler.common.Constants.TASK_INFO_LENGTH;
+import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
-import org.springframework.stereotype.Service;
+import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
-import java.util.*;
+import java.util.Comparator;
import java.util.concurrent.PriorityBlockingQueue;
-import static org.apache.dolphinscheduler.common.Constants.*;
+import org.springframework.stereotype.Service;
+
+
/**
* A singleton of a task queue implemented with zookeeper
* tasks queue implementation
*/
@Service
-public class TaskPriorityQueueImpl implements TaskPriorityQueue {
+public class TaskPriorityQueueImpl implements TaskPriorityQueue<String> {
/**
* queue size
*/
@@ -44,40 +49,43 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue {
* put task takePriorityInfo
*
* @param taskPriorityInfo takePriorityInfo
- * @throws Exception
+ * @throws TaskPriorityQueueException
*/
@Override
- public void put(String taskPriorityInfo) throws Exception {
+ public void put(String taskPriorityInfo) throws TaskPriorityQueueException {
queue.put(taskPriorityInfo);
}
/**
* take taskInfo
+ *
* @return taskInfo
- * @throws Exception
+ * @throws TaskPriorityQueueException
*/
@Override
- public String take() throws Exception {
+ public String take() throws TaskPriorityQueueException, InterruptedException {
return queue.take();
}
/**
* queue size
+ *
* @return size
- * @throws Exception
+ * @throws TaskPriorityQueueException
*/
@Override
- public int size() throws Exception {
+ public int size() throws TaskPriorityQueueException {
return queue.size();
}
/**
* TaskInfoComparator
*/
- private class TaskInfoComparator implements Comparator<String>{
+ private class TaskInfoComparator implements Comparator<String> {
/**
* compare o1 o2
+ *
* @param o1 o1
* @param o2 o2
* @return compare result
@@ -87,15 +95,15 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue {
String s1 = o1;
String s2 = o2;
String[] s1Array = s1.split(UNDERLINE);
- if(s1Array.length > TASK_INFO_LENGTH){
+ if (s1Array.length > TASK_INFO_LENGTH) {
// warning: if this length > 5, need to be changed
- s1 = s1.substring(0, s1.lastIndexOf(UNDERLINE) );
+ s1 = s1.substring(0, s1.lastIndexOf(UNDERLINE));
}
String[] s2Array = s2.split(UNDERLINE);
- if(s2Array.length > TASK_INFO_LENGTH){
+ if (s2Array.length > TASK_INFO_LENGTH) {
// warning: if this length > 5, need to be changed
- s2 = s2.substring(0, s2.lastIndexOf(UNDERLINE) );
+ s2 = s2.substring(0, s2.lastIndexOf(UNDERLINE));
}
return s1.compareTo(s2);
diff --git a/dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java b/dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java
new file mode 100644
index 0000000..cf39d57
--- /dev/null
+++ b/dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package queue;
+
+import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Task instances priority queue implementation
+ * All the task instances are in the same process instance.
+ */
+public class PeerTaskInstancePriorityQueueTest {
+
+ @Test
+ public void testPut() throws Exception {
+ PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue();
+ TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH);
+ TaskInstance taskInstanceMediumPriority = createTaskInstance("high", Priority.MEDIUM);
+ queue.put(taskInstanceHigPriority);
+ queue.put(taskInstanceMediumPriority);
+ Assert.assertEquals(2,queue.size());
+ }
+
+ @Test
+ public void testPeek() throws Exception {
+ PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue();
+ int peekBeforeLength = queue.size();
+ queue.peek();
+ Assert.assertEquals(peekBeforeLength,queue.size());
+
+ }
+
+ @Test
+ public void testTake() throws Exception {
+ PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue();
+ int peekBeforeLength = queue.size();
+ queue.take();
+ Assert.assertTrue(queue.size() < peekBeforeLength);
+ }
+
+ /**
+ * get queue
+ *
+ * @return queue
+ * @throws Exception
+ */
+ private PeerTaskInstancePriorityQueue getPeerTaskInstancePriorityQueue() throws Exception {
+ PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue();
+ TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH);
+ TaskInstance taskInstanceMediumPriority = createTaskInstance("high", Priority.MEDIUM);
+ queue.put(taskInstanceHigPriority);
+ queue.put(taskInstanceMediumPriority);
+ return queue;
+ }
+
+ /**
+ * create task instance
+ *
+ * @param name name
+ * @param priority priority
+ * @return
+ */
+ private TaskInstance createTaskInstance(String name, Priority priority) {
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setName(name);
+ taskInstance.setTaskInstancePriority(priority);
+ return taskInstance;
+ }
+
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 5163ae7..4eda053 100644
--- a/pom.xml
+++ b/pom.xml
@@ -862,6 +862,7 @@
<include>**/service/zk/ZKServerTest.java</include>
<include>**/service/zk/CuratorZookeeperClientTest.java</include>
<include>**/service/queue/TaskUpdateQueueTest.java</include>
+ <include>**/service/queue/PeerTaskInstancePriorityQueueTest.java</include>
<include>**/dao/mapper/DataSourceUserMapperTest.java</include>
<!--<iTaskUpdateQueueConsumerThreadnclude>**/dao/mapper/ErrorCommandMapperTest.java</iTaskUpdateQueueConsumerThreadnclude>-->