You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by er...@apache.org on 2022/03/21 02:33:31 UTC
[iotdb] 05/07: Fragment schedule develop
This is an automated email from the ASF dual-hosted git repository.
ericpai pushed a commit to branch new_mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8fb27a2194a7f2078539fd21ca34795ade5d6a34
Author: ericpai <er...@hotmail.com>
AuthorDate: Tue Mar 15 17:43:12 2022 +0800
Fragment schedule develop
---
.../mpp/execution/FragmentInstanceManager.java | 89 ++++++++++++++-
...utor.java => FragmentInstanceTaskCallback.java} | 32 +-----
.../execution/FragmentInstanceTaskExecutor.java | 9 +-
.../execution/FragmentInstanceTimeoutSentinel.java | 43 ++++++-
.../mpp/execution/IFragmentInstanceManager.java | 3 +-
.../mpp/execution/queue/IndexedBlockingQueue.java | 45 +++++++-
.../iotdb/mpp/execution/queue/L1PriorityQueue.java | 12 +-
.../iotdb/mpp/execution/queue/L2PriorityQueue.java | 17 ++-
.../mpp/execution/task/FragmentInstanceTask.java | 13 +++
.../mpp/execution/queue/L1PriorityQueueTest.java | 115 +++++++++++++++++++
.../mpp/execution/queue/L2PriorityQueueTest.java | 124 +++++++++++++++++++++
.../iotdb/mpp/execution/queue/QueueElement.java | 80 +++++++++++++
12 files changed, 530 insertions(+), 52 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceManager.java
index 4c7c157..8917dd4 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceManager.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.mpp.execution.queue.L1PriorityQueue;
import org.apache.iotdb.mpp.execution.queue.L2PriorityQueue;
import org.apache.iotdb.mpp.execution.task.FragmentInstanceID;
import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
+import org.apache.iotdb.mpp.execution.task.FragmentInstanceTaskStatus;
import java.util.List;
import java.util.Map;
@@ -40,7 +41,7 @@ public class FragmentInstanceManager implements IFragmentInstanceManager, IServi
private final IndexedBlockingQueue<FragmentInstanceTask> readyQueue;
private final IndexedBlockingQueue<FragmentInstanceTask> timeoutQueue;
- private final Map<String, List<FragmentInstanceID>> queryMap;
+ private final Map<String, List<FragmentInstanceTask>> queryMap;
private static final int MAX_CAPACITY = 1000; // TODO: load from config files
private static final int WORKER_THREAD_NUM = 4; // TODO: load from config files
@@ -65,7 +66,9 @@ public class FragmentInstanceManager implements IFragmentInstanceManager, IServi
for (int i = 0; i < WORKER_THREAD_NUM; i++) {
new FragmentInstanceTaskExecutor("Worker-Thread-" + i, workerGroups, readyQueue).start();
}
- new FragmentInstanceTimeoutSentinel("Sentinel-Thread", workerGroups, timeoutQueue).start();
+ new FragmentInstanceTimeoutSentinel(
+ "Sentinel-Thread", workerGroups, timeoutQueue, this::abortFragmentInstanceTask)
+ .start();
}
@Override
@@ -79,15 +82,89 @@ public class FragmentInstanceManager implements IFragmentInstanceManager, IServi
}
@Override
- public void submitFragmentInstance() {}
+ public void submitFragmentInstance() {
+ // TODO: pass a real task
+ FragmentInstanceTask task = new FragmentInstanceTask();
+
+ task.lock();
+ try {
+ timeoutQueue.push(task);
+ // TODO: if no upstream deps, set to ready
+ task.setStatus(FragmentInstanceTaskStatus.READY);
+ readyQueue.push(task);
+ } finally {
+ task.unlock();
+ }
+ }
@Override
public void inputBlockAvailable(
- FragmentInstanceID instanceID, FragmentInstanceID upstreamInstanceId) {}
+ FragmentInstanceID instanceID, FragmentInstanceID upstreamInstanceId) {
+ FragmentInstanceTask task = timeoutQueue.get(instanceID);
+ if (task == null) {
+ return;
+ }
+ task.lock();
+ try {
+ if (task.getStatus() != FragmentInstanceTaskStatus.BLOCKED) {
+ return;
+ }
+ task.inputReady(instanceID);
+ if (task.getStatus() == FragmentInstanceTaskStatus.READY) {
+ readyQueue.push(task);
+ }
+ } finally {
+ task.unlock();
+ }
+ }
@Override
- public void outputBlockAvailable(
- FragmentInstanceID instanceID, FragmentInstanceID downstreamInstanceId) {}
+ public void outputBlockAvailable(FragmentInstanceID instanceID) {
+ FragmentInstanceTask task = timeoutQueue.get(instanceID);
+ if (task == null) {
+ return;
+ }
+ task.lock();
+ try {
+ if (task.getStatus() != FragmentInstanceTaskStatus.BLOCKED) {
+ return;
+ }
+ task.outputReady();
+ if (task.getStatus() == FragmentInstanceTaskStatus.READY) {
+ readyQueue.push(task);
+ }
+ } finally {
+ task.unlock();
+ }
+ }
+
+ /** abort a {@link FragmentInstanceTask} */
+ void abortFragmentInstanceTask(FragmentInstanceTask task) {
+ List<FragmentInstanceTask> queryRelatedTasks = queryMap.remove(task.getId().getQueryId());
+ clearFragmentInstanceTask(task);
+ if (queryRelatedTasks != null) {
+ // if queryRelatedTask is not null, it means that the clean request comes from this node, not
+ // coordinator.
+ // TODO: tell coordinator
+ for (FragmentInstanceTask otherTask : queryRelatedTasks) {
+ clearFragmentInstanceTask(otherTask);
+ }
+ }
+ // TODO: call LocalMemoryManager to release resources
+ }
+
+ private void clearFragmentInstanceTask(FragmentInstanceTask task) {
+ task.lock();
+ try {
+ if (task.getStatus() != FragmentInstanceTaskStatus.FINISHED) {
+ task.setStatus(FragmentInstanceTaskStatus.ABORTED);
+ }
+ readyQueue.remove(task.getId());
+ timeoutQueue.remove(task.getId());
+ } finally {
+ task.unlock();
+ }
+ }
@Override
public void abortQuery(String queryId) {}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskCallback.java
similarity index 52%
copy from server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java
copy to server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskCallback.java
index 5c704db..6eee1ad 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskCallback.java
@@ -18,34 +18,10 @@
*/
package org.apache.iotdb.mpp.execution;
-import org.apache.iotdb.mpp.execution.queue.IndexedBlockingQueue;
import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/** the worker thread of {@link FragmentInstanceTask} */
-public class FragmentInstanceTaskExecutor extends Thread {
-
- private static final Logger logger = LoggerFactory.getLogger(FragmentInstanceTaskExecutor.class);
-
- private final IndexedBlockingQueue<FragmentInstanceTask> queue;
-
- public FragmentInstanceTaskExecutor(
- String workerId, ThreadGroup tg, IndexedBlockingQueue<FragmentInstanceTask> queue) {
- super(tg, workerId);
- this.queue = queue;
- }
-
- @Override
- public void run() {
- try {
- while (true) {
- FragmentInstanceTask next = queue.poll();
- // do logic here
- }
- } catch (InterruptedException e) {
- logger.info("{} is interrupted.", this.getName());
- }
- }
+/** A common interface for {@link FragmentInstanceTask} business logic callback */
+@FunctionalInterface
+public interface FragmentInstanceTaskCallback {
+ void call(FragmentInstanceTask task) throws Exception;
}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java
index 5c704db..b35a6f7 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java
@@ -39,13 +39,14 @@ public class FragmentInstanceTaskExecutor extends Thread {
@Override
public void run() {
- try {
- while (true) {
+ while (true) {
+ try {
FragmentInstanceTask next = queue.poll();
// do logic here
+ } catch (InterruptedException e) {
+ logger.info("{} is interrupted.", this.getName());
+ break;
}
- } catch (InterruptedException e) {
- logger.info("{} is interrupted.", this.getName());
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTimeoutSentinel.java b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTimeoutSentinel.java
index 7b352ba..4171f26 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTimeoutSentinel.java
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTimeoutSentinel.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.mpp.execution;
import org.apache.iotdb.mpp.execution.queue.IndexedBlockingQueue;
import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
+import org.apache.iotdb.mpp.execution.task.FragmentInstanceTaskStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,22 +32,52 @@ public class FragmentInstanceTimeoutSentinel extends Thread {
LoggerFactory.getLogger(FragmentInstanceTimeoutSentinel.class);
private final IndexedBlockingQueue<FragmentInstanceTask> queue;
+ private final FragmentInstanceTaskCallback timeoutCallback;
+ // the check interval in milliseconds if the queue head remains the same.
+ private static final int CHECK_INTERVAL = 100;
public FragmentInstanceTimeoutSentinel(
- String workerId, ThreadGroup tg, IndexedBlockingQueue<FragmentInstanceTask> queue) {
+ String workerId,
+ ThreadGroup tg,
+ IndexedBlockingQueue<FragmentInstanceTask> queue,
+ FragmentInstanceTaskCallback timeoutCallback) {
super(tg, workerId);
this.queue = queue;
+ this.timeoutCallback = timeoutCallback;
}
@Override
public void run() {
- try {
- while (true) {
+ while (true) {
+ try {
FragmentInstanceTask next = queue.poll();
- // do logic here
+ next.lock();
+ try {
+ // if this task is already in an end state, it means that the resource releasing will be
+ // handled by other threads, we don't care anymore.
+ if (next.isEndState()) {
+ continue;
+ }
+ // if this task is not in end state and not timeout, we should push it back to the queue.
+ if (next.getDDL() > System.currentTimeMillis()) {
+ queue.push(next);
+ Thread.sleep(CHECK_INTERVAL);
+ continue;
+ }
+ next.setStatus(FragmentInstanceTaskStatus.ABORTED);
+ } finally {
+ next.unlock();
+ }
+ try {
+ // Or we should do something to abort
+ timeoutCallback.call(next);
+ } catch (Exception e) {
+ logger.error("Abort instance " + next.getId() + " failed", e);
+ }
+ } catch (InterruptedException e) {
+ logger.info("{} is interrupted.", this.getName());
+ break;
}
- } catch (InterruptedException e) {
- logger.info("{} is interrupted.", this.getName());
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java
index 3bda549..a27db32 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java
@@ -39,9 +39,8 @@ public interface IFragmentInstanceManager {
* downstream data has been consumed.
*
* @param instanceID the fragment instance to be notified.
- * @param downstreamInstanceId the downstream instance id.
*/
- void outputBlockAvailable(FragmentInstanceID instanceID, FragmentInstanceID downstreamInstanceId);
+ void outputBlockAvailable(FragmentInstanceID instanceID);
/**
* abort all the instances in this query
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IndexedBlockingQueue.java b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IndexedBlockingQueue.java
index bcabb5c..6ddc610 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IndexedBlockingQueue.java
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IndexedBlockingQueue.java
@@ -29,7 +29,9 @@ package org.apache.iotdb.mpp.execution.queue;
* <p>3. Can push a non-null element to queue. When the queue is beyond the max size, an exception
* will be thrown.
*
- * <p>4. Can remove an element by a long type id.
+ * <p>4. Can remove an element by a type of {@link ID}.
+ *
+ * <p>5. Each element has the different ID.
*/
public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> {
@@ -79,15 +81,17 @@ public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> {
if (element == null) {
throw new NullPointerException("pushed element is null");
}
- if (size + 1 > MAX_CAPACITY) {
+ int sizeDelta = contains(element) ? 0 : 1;
+ if (size + sizeDelta > MAX_CAPACITY) {
throw new IllegalStateException("the queue is full");
}
pushToQueue(element);
+ size += sizeDelta;
this.notifyAll();
}
/**
- * Remove and return the element by a long id. It returns null if it doesn't exist.
+ * Remove and return the element by id. It returns null if it doesn't exist.
*
* @param id the id of the element to be removed.
* @return the removed element.
@@ -103,6 +107,17 @@ public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> {
}
/**
+ * Get the element by id. It returns null if it doesn't exist.
+ *
+ * @param id the id of the element.
+ * @return the removed element.
+ */
+ public synchronized E get(ID id) {
+ queryHolder.setId(id);
+ return get(queryHolder);
+ }
+
+ /**
* Get the current queue size.
*
* @return the current queue size.
@@ -139,10 +154,32 @@ public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> {
protected abstract void pushToQueue(E element);
/**
- * Remove and return the element by a long id. It returns null if it doesn't exist.
+ * Remove and return the element by its ID. It returns null if it doesn't exist.
+ *
+ * <p>This implementation needn't be thread-safe.
*
* @param element the element to be removed.
* @return the removed element.
*/
protected abstract E remove(E element);
+
+ /**
+ * Check whether an element with the same ID exists.
+ *
+ * <p>This implementation needn't be thread-safe.
+ *
+ * @param element the element to be checked.
+ * @return true if an element with the same ID exists, otherwise false.
+ */
+ protected abstract boolean contains(E element);
+
+ /**
+ * Return the element with the same id of the input, null if it doesn't exist.
+ *
+ * <p>This implementation needn't be thread-safe.
+ *
+ * @param element the element to be queried.
+ * @return the element with the same id in the queue. Null if it doesn't exist.
+ */
+ protected abstract E get(E element);
}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueue.java b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueue.java
index a5ccf14..a9cad83 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueue.java
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueue.java
@@ -28,10 +28,10 @@ import java.util.TreeMap;
* <p>The time complexity of operations are:
*
* <ul>
- * <li><b>{@link #size()}: </b> O(1).
* <li><b>{@link #remove(IDIndexedAccessible)} ()}: </b> O(logN).
* <li><b>{@link #push(IDIndexedAccessible)}: </b> O(logN).
* <li><b>{@link #poll()}: </b> O(logN).
+ * <li><b>{@link #get(ID)}}: </b> O(1).
* </ul>
*/
public class L1PriorityQueue<E extends IDIndexedAccessible> extends IndexedBlockingQueue<E> {
@@ -73,4 +73,14 @@ public class L1PriorityQueue<E extends IDIndexedAccessible> extends IndexedBlock
protected E remove(E element) {
return elements.remove(element);
}
+
+ @Override
+ protected boolean contains(E element) {
+ return elements.containsKey(element);
+ }
+
+ @Override
+ protected E get(E element) {
+ return elements.get(element);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueue.java b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueue.java
index 3b8a9e6..c23e74a 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueue.java
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueue.java
@@ -30,10 +30,10 @@ import java.util.TreeMap;
* <p>The time complexity of operations are:
*
* <ul>
- * <li><b>{@link #size()}: </b> O(1).
* <li><b>{@link #remove(IDIndexedAccessible)} ()}: </b> O(logN).
* <li><b>{@link #push(IDIndexedAccessible)}: </b> O(logN).
* <li><b>{@link #poll()}: </b> O(logN).
+ * <li><b>{@link #get(ID)}}: </b> O(1).
* </ul>
*/
public class L2PriorityQueue<E extends IDIndexedAccessible> extends IndexedBlockingQueue<E> {
@@ -75,6 +75,7 @@ public class L2PriorityQueue<E extends IDIndexedAccessible> extends IndexedBlock
@Override
protected void pushToQueue(E element) {
+ workingElements.remove(element);
idleElements.put(element, element);
}
@@ -86,4 +87,18 @@ public class L2PriorityQueue<E extends IDIndexedAccessible> extends IndexedBlock
}
return e;
}
+
+ @Override
+ protected boolean contains(E element) {
+ return workingElements.containsKey(element) || idleElements.containsKey(element);
+ }
+
+ @Override
+ protected E get(E element) {
+ E e = workingElements.get(element);
+ if (e != null) {
+ return e;
+ }
+ return idleElements.get(element);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTask.java b/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTask.java
index 7bd7205..1836c74 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTask.java
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTask.java
@@ -70,6 +70,19 @@ public class FragmentInstanceTask implements IDIndexedAccessible {
return status;
}
+ public boolean isEndState() {
+ return status == FragmentInstanceTaskStatus.ABORTED
+ || status == FragmentInstanceTaskStatus.FINISHED;
+ }
+
+ public void inputReady(FragmentInstanceID inputId) {
+ throw new UnsupportedOperationException("unsupported");
+ }
+
+ public void outputReady() {
+ throw new UnsupportedOperationException("unsupported");
+ }
+
public void setStatus(FragmentInstanceTaskStatus status) {
this.status = status;
}
diff --git a/server/src/test/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueueTest.java b/server/src/test/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueueTest.java
new file mode 100644
index 0000000..3809d4a
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueueTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.queue;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class L1PriorityQueueTest {
+
+ @Test
+ public void testPollBlocked() throws InterruptedException {
+ IndexedBlockingQueue<QueueElement> queue =
+ new L1PriorityQueue<>(
+ 10,
+ (o1, o2) -> {
+ if (o1.equals(o2)) {
+ return 0;
+ }
+ return Integer.compare(o1.getValue(), o2.getValue());
+ },
+ new QueueElement(new QueueElement.QueueElementID(0), 0));
+ List<QueueElement> res = new ArrayList<>();
+ Thread t1 =
+ new Thread(
+ () -> {
+ try {
+ QueueElement e = queue.poll();
+ res.add(e);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ });
+ t1.start();
+ Thread.sleep(10);
+ Assert.assertEquals(Thread.State.WAITING, t1.getState());
+ QueueElement e2 = new QueueElement(new QueueElement.QueueElementID(1), 1);
+ queue.push(e2);
+ Thread.sleep(10);
+ Assert.assertEquals(Thread.State.TERMINATED, t1.getState());
+ Assert.assertEquals(1, res.size());
+ Assert.assertEquals(e2.getId().toString(), res.get(0).getId().toString());
+ }
+
+ @Test
+ public void testPushExceedCapacity() {
+ IndexedBlockingQueue<QueueElement> queue =
+ new L1PriorityQueue<>(
+ 1,
+ (o1, o2) -> {
+ if (o1.equals(o2)) {
+ return 0;
+ }
+ return Integer.compare(o1.getValue(), o2.getValue());
+ },
+ new QueueElement(new QueueElement.QueueElementID(0), 0));
+ QueueElement e2 = new QueueElement(new QueueElement.QueueElementID(1), 1);
+ queue.push(e2);
+ QueueElement e2e = new QueueElement(new QueueElement.QueueElementID(1), 10);
+ queue.push(e2e);
+ QueueElement e3 = new QueueElement(new QueueElement.QueueElementID(2), 2);
+ try {
+ queue.push(e3);
+ Assert.fail();
+ } catch (IllegalStateException e) {
+ // ignore;
+ }
+ }
+
+ @Test
+ public void testPushAndPoll() throws InterruptedException {
+ IndexedBlockingQueue<QueueElement> queue =
+ new L1PriorityQueue<>(
+ 10,
+ (o1, o2) -> {
+ if (o1.equals(o2)) {
+ return 0;
+ }
+ return Integer.compare(o1.getValue(), o2.getValue());
+ },
+ new QueueElement(new QueueElement.QueueElementID(0), 0));
+ QueueElement e1 = new QueueElement(new QueueElement.QueueElementID(1), 10);
+ queue.push(e1);
+ QueueElement e1e = new QueueElement(new QueueElement.QueueElementID(1), 20);
+ queue.push(e1e);
+ // only 1 element with the same id can be put into
+ Assert.assertEquals(1, queue.size());
+ QueueElement e2 = new QueueElement(new QueueElement.QueueElementID(2), 5);
+ queue.push(e2);
+ Assert.assertEquals(2, queue.size());
+ Assert.assertEquals(e2.getId().toString(), queue.poll().getId().toString());
+ Assert.assertEquals(1, queue.size());
+ Assert.assertEquals(e1e.getId().toString(), queue.poll().getId().toString());
+ Assert.assertEquals(0, queue.size());
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueueTest.java b/server/src/test/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueueTest.java
new file mode 100644
index 0000000..5d34bb7
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueueTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.queue;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class L2PriorityQueueTest {
+ @Test
+ public void testPollBlocked() throws InterruptedException {
+ IndexedBlockingQueue<QueueElement> queue =
+ new L2PriorityQueue<>(
+ 10,
+ (o1, o2) -> {
+ if (o1.equals(o2)) {
+ return 0;
+ }
+ return Integer.compare(o1.getValue(), o2.getValue());
+ },
+ new QueueElement(new QueueElement.QueueElementID(0), 0));
+ List<QueueElement> res = new ArrayList<>();
+ Thread t1 =
+ new Thread(
+ () -> {
+ try {
+ QueueElement e = queue.poll();
+ res.add(e);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ });
+ t1.start();
+ Thread.sleep(10);
+ Assert.assertEquals(Thread.State.WAITING, t1.getState());
+ QueueElement e2 = new QueueElement(new QueueElement.QueueElementID(1), 1);
+ queue.push(e2);
+ Thread.sleep(10);
+ Assert.assertEquals(Thread.State.TERMINATED, t1.getState());
+ Assert.assertEquals(1, res.size());
+ Assert.assertEquals(e2.getId().toString(), res.get(0).getId().toString());
+ }
+
+ @Test
+ public void testPushExceedCapacity() {
+ IndexedBlockingQueue<QueueElement> queue =
+ new L2PriorityQueue<>(
+ 1,
+ (o1, o2) -> {
+ if (o1.equals(o2)) {
+ return 0;
+ }
+ return Integer.compare(o1.getValue(), o2.getValue());
+ },
+ new QueueElement(new QueueElement.QueueElementID(0), 0));
+ QueueElement e2 = new QueueElement(new QueueElement.QueueElementID(1), 1);
+ queue.push(e2);
+ QueueElement e2e = new QueueElement(new QueueElement.QueueElementID(1), 10);
+ queue.push(e2e);
+ QueueElement e3 = new QueueElement(new QueueElement.QueueElementID(2), 2);
+ try {
+ queue.push(e3);
+ Assert.fail();
+ } catch (IllegalStateException e) {
+ // ignore;
+ }
+ }
+
+ @Test
+ public void testPushAndPoll() throws InterruptedException {
+ IndexedBlockingQueue<QueueElement> queue =
+ new L2PriorityQueue<>(
+ 10,
+ (o1, o2) -> {
+ if (o1.equals(o2)) {
+ return 0;
+ }
+ int res = Integer.compare(o1.getValue(), o2.getValue());
+ if (res != 0) {
+ return res;
+ }
+ return String.CASE_INSENSITIVE_ORDER.compare(
+ o1.getId().toString(), o2.getId().toString());
+ },
+ new QueueElement(new QueueElement.QueueElementID(0), 0));
+ QueueElement e1 = new QueueElement(new QueueElement.QueueElementID(1), 10);
+ queue.push(e1);
+ QueueElement e1e = new QueueElement(new QueueElement.QueueElementID(1), 20);
+ queue.push(e1e);
+ // only 1 element with the same id can be put into
+ Assert.assertEquals(1, queue.size());
+ QueueElement e2 = new QueueElement(new QueueElement.QueueElementID(2), 5);
+ queue.push(e2);
+ Assert.assertEquals(2, queue.size());
+ Assert.assertEquals(e2.getId().toString(), queue.poll().getId().toString());
+ Assert.assertEquals(1, queue.size());
+ // L1: 5 -> 20 L2: 10
+ QueueElement e3 = new QueueElement(new QueueElement.QueueElementID(3), 10);
+ queue.push(e3);
+ Assert.assertEquals(e1e.getId().toString(), queue.poll().getId().toString());
+ Assert.assertEquals(1, queue.size());
+ Assert.assertEquals(e3.getId().toString(), queue.poll().getId().toString());
+ Assert.assertEquals(0, queue.size());
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/mpp/execution/queue/QueueElement.java b/server/src/test/java/org/apache/iotdb/mpp/execution/queue/QueueElement.java
new file mode 100644
index 0000000..4aed3a5
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/mpp/execution/queue/QueueElement.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.queue;
+
+public class QueueElement implements IDIndexedAccessible {
+ private QueueElementID id;
+ private final int value;
+
+ public QueueElement(QueueElementID id, int value) {
+ this.id = id;
+ this.value = value;
+ }
+
+ public int getValue() {
+ return this.value;
+ }
+
+ @Override
+ public ID getId() {
+ return id;
+ }
+
+ @Override
+ public void setId(ID id) {
+ this.id = (QueueElementID) id;
+ }
+
+ @Override
+ public int hashCode() {
+ return id.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return o instanceof QueueElement && ((QueueElement) o).getId().equals(this.id);
+ }
+
+ public static class QueueElementID implements ID {
+ private final int id;
+
+ public QueueElementID(int id) {
+ this.id = id;
+ }
+
+ public int getId() {
+ return this.id;
+ }
+
+ @Override
+ public int hashCode() {
+ return Integer.hashCode(id);
+ }
+
+ @Override
+ public String toString() {
+ return String.valueOf(id);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return o instanceof QueueElementID && ((QueueElementID) o).getId() == this.id;
+ }
+ }
+}