You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by er...@apache.org on 2022/03/21 02:33:31 UTC

[iotdb] 05/07: Fragment schedule develop

This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a commit to branch new_mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8fb27a2194a7f2078539fd21ca34795ade5d6a34
Author: ericpai <er...@hotmail.com>
AuthorDate: Tue Mar 15 17:43:12 2022 +0800

    Fragment schedule develop
---
 .../mpp/execution/FragmentInstanceManager.java     |  89 ++++++++++++++-
 ...utor.java => FragmentInstanceTaskCallback.java} |  32 +-----
 .../execution/FragmentInstanceTaskExecutor.java    |   9 +-
 .../execution/FragmentInstanceTimeoutSentinel.java |  43 ++++++-
 .../mpp/execution/IFragmentInstanceManager.java    |   3 +-
 .../mpp/execution/queue/IndexedBlockingQueue.java  |  45 +++++++-
 .../iotdb/mpp/execution/queue/L1PriorityQueue.java |  12 +-
 .../iotdb/mpp/execution/queue/L2PriorityQueue.java |  17 ++-
 .../mpp/execution/task/FragmentInstanceTask.java   |  13 +++
 .../mpp/execution/queue/L1PriorityQueueTest.java   | 115 +++++++++++++++++++
 .../mpp/execution/queue/L2PriorityQueueTest.java   | 124 +++++++++++++++++++++
 .../iotdb/mpp/execution/queue/QueueElement.java    |  80 +++++++++++++
 12 files changed, 530 insertions(+), 52 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceManager.java
index 4c7c157..8917dd4 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceManager.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.mpp.execution.queue.L1PriorityQueue;
 import org.apache.iotdb.mpp.execution.queue.L2PriorityQueue;
 import org.apache.iotdb.mpp.execution.task.FragmentInstanceID;
 import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
+import org.apache.iotdb.mpp.execution.task.FragmentInstanceTaskStatus;
 
 import java.util.List;
 import java.util.Map;
@@ -40,7 +41,7 @@ public class FragmentInstanceManager implements IFragmentInstanceManager, IServi
 
   private final IndexedBlockingQueue<FragmentInstanceTask> readyQueue;
   private final IndexedBlockingQueue<FragmentInstanceTask> timeoutQueue;
-  private final Map<String, List<FragmentInstanceID>> queryMap;
+  private final Map<String, List<FragmentInstanceTask>> queryMap;
 
   private static final int MAX_CAPACITY = 1000; // TODO: load from config files
   private static final int WORKER_THREAD_NUM = 4; // TODO: load from config files
@@ -65,7 +66,9 @@ public class FragmentInstanceManager implements IFragmentInstanceManager, IServi
     for (int i = 0; i < WORKER_THREAD_NUM; i++) {
       new FragmentInstanceTaskExecutor("Worker-Thread-" + i, workerGroups, readyQueue).start();
     }
-    new FragmentInstanceTimeoutSentinel("Sentinel-Thread", workerGroups, timeoutQueue).start();
+    new FragmentInstanceTimeoutSentinel(
+            "Sentinel-Thread", workerGroups, timeoutQueue, this::abortFragmentInstanceTask)
+        .start();
   }
 
   @Override
@@ -79,15 +82,89 @@ public class FragmentInstanceManager implements IFragmentInstanceManager, IServi
   }
 
   @Override
-  public void submitFragmentInstance() {}
+  public void submitFragmentInstance() {
+    // TODO: pass a real task
+    FragmentInstanceTask task = new FragmentInstanceTask();
+
+    task.lock();
+    try {
+      timeoutQueue.push(task);
+      // TODO: if no upstream deps, set to ready
+      task.setStatus(FragmentInstanceTaskStatus.READY);
+      readyQueue.push(task);
+    } finally {
+      task.unlock();
+    }
+  }
 
   @Override
   public void inputBlockAvailable(
-      FragmentInstanceID instanceID, FragmentInstanceID upstreamInstanceId) {}
+      FragmentInstanceID instanceID, FragmentInstanceID upstreamInstanceId) {
+    FragmentInstanceTask task = timeoutQueue.get(instanceID);
+    if (task == null) {
+      return;
+    }
+    task.lock();
+    try {
+      if (task.getStatus() != FragmentInstanceTaskStatus.BLOCKED) {
+        return;
+      }
+      task.inputReady(instanceID);
+      if (task.getStatus() == FragmentInstanceTaskStatus.READY) {
+        readyQueue.push(task);
+      }
+    } finally {
+      task.unlock();
+    }
+  }
 
   @Override
-  public void outputBlockAvailable(
-      FragmentInstanceID instanceID, FragmentInstanceID downstreamInstanceId) {}
+  public void outputBlockAvailable(FragmentInstanceID instanceID) {
+    FragmentInstanceTask task = timeoutQueue.get(instanceID);
+    if (task == null) {
+      return;
+    }
+    task.lock();
+    try {
+      if (task.getStatus() != FragmentInstanceTaskStatus.BLOCKED) {
+        return;
+      }
+      task.outputReady();
+      if (task.getStatus() == FragmentInstanceTaskStatus.READY) {
+        readyQueue.push(task);
+      }
+    } finally {
+      task.unlock();
+    }
+  }
+
+  /** abort a {@link FragmentInstanceTask} */
+  void abortFragmentInstanceTask(FragmentInstanceTask task) {
+    List<FragmentInstanceTask> queryRelatedTasks = queryMap.remove(task.getId().getQueryId());
+    clearFragmentInstanceTask(task);
+    if (queryRelatedTasks != null) {
+      // if queryRelatedTask is not null, it means that the clean request comes from this node, not
+      // coordinator.
+      // TODO: tell coordinator
+      for (FragmentInstanceTask otherTask : queryRelatedTasks) {
+        clearFragmentInstanceTask(otherTask);
+      }
+    }
+    // TODO: call LocalMemoryManager to release resources
+  }
+
+  private void clearFragmentInstanceTask(FragmentInstanceTask task) {
+    task.lock();
+    try {
+      if (task.getStatus() != FragmentInstanceTaskStatus.FINISHED) {
+        task.setStatus(FragmentInstanceTaskStatus.ABORTED);
+      }
+      readyQueue.remove(task.getId());
+      timeoutQueue.remove(task.getId());
+    } finally {
+      task.unlock();
+    }
+  }
 
   @Override
   public void abortQuery(String queryId) {}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskCallback.java
similarity index 52%
copy from server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java
copy to server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskCallback.java
index 5c704db..6eee1ad 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskCallback.java
@@ -18,34 +18,10 @@
  */
 package org.apache.iotdb.mpp.execution;
 
-import org.apache.iotdb.mpp.execution.queue.IndexedBlockingQueue;
 import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/** the worker thread of {@link FragmentInstanceTask} */
-public class FragmentInstanceTaskExecutor extends Thread {
-
-  private static final Logger logger = LoggerFactory.getLogger(FragmentInstanceTaskExecutor.class);
-
-  private final IndexedBlockingQueue<FragmentInstanceTask> queue;
-
-  public FragmentInstanceTaskExecutor(
-      String workerId, ThreadGroup tg, IndexedBlockingQueue<FragmentInstanceTask> queue) {
-    super(tg, workerId);
-    this.queue = queue;
-  }
-
-  @Override
-  public void run() {
-    try {
-      while (true) {
-        FragmentInstanceTask next = queue.poll();
-        // do logic here
-      }
-    } catch (InterruptedException e) {
-      logger.info("{} is interrupted.", this.getName());
-    }
-  }
+/** A common interface for {@link FragmentInstanceTask} business logic callback */
+@FunctionalInterface
+public interface FragmentInstanceTaskCallback {
+  void call(FragmentInstanceTask task) throws Exception;
 }
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java
index 5c704db..b35a6f7 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java
@@ -39,13 +39,14 @@ public class FragmentInstanceTaskExecutor extends Thread {
 
   @Override
   public void run() {
-    try {
-      while (true) {
+    while (true) {
+      try {
         FragmentInstanceTask next = queue.poll();
         // do logic here
+      } catch (InterruptedException e) {
+        logger.info("{} is interrupted.", this.getName());
+        break;
       }
-    } catch (InterruptedException e) {
-      logger.info("{} is interrupted.", this.getName());
     }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTimeoutSentinel.java b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTimeoutSentinel.java
index 7b352ba..4171f26 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTimeoutSentinel.java
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTimeoutSentinel.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.mpp.execution;
 
 import org.apache.iotdb.mpp.execution.queue.IndexedBlockingQueue;
 import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
+import org.apache.iotdb.mpp.execution.task.FragmentInstanceTaskStatus;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,22 +32,52 @@ public class FragmentInstanceTimeoutSentinel extends Thread {
       LoggerFactory.getLogger(FragmentInstanceTimeoutSentinel.class);
 
   private final IndexedBlockingQueue<FragmentInstanceTask> queue;
+  private final FragmentInstanceTaskCallback timeoutCallback;
+  // the check interval in milliseconds if the queue head remains the same.
+  private static final int CHECK_INTERVAL = 100;
 
   public FragmentInstanceTimeoutSentinel(
-      String workerId, ThreadGroup tg, IndexedBlockingQueue<FragmentInstanceTask> queue) {
+      String workerId,
+      ThreadGroup tg,
+      IndexedBlockingQueue<FragmentInstanceTask> queue,
+      FragmentInstanceTaskCallback timeoutCallback) {
     super(tg, workerId);
     this.queue = queue;
+    this.timeoutCallback = timeoutCallback;
   }
 
   @Override
   public void run() {
-    try {
-      while (true) {
+    while (true) {
+      try {
         FragmentInstanceTask next = queue.poll();
-        // do logic here
+        next.lock();
+        try {
+          // if this task is already in an end state, it means that the resource releasing will be
+          // handled by other threads, we don't care anymore.
+          if (next.isEndState()) {
+            continue;
+          }
+          // if this task is not in end state and not timeout, we should push it back to the queue.
+          if (next.getDDL() > System.currentTimeMillis()) {
+            queue.push(next);
+            Thread.sleep(CHECK_INTERVAL);
+            continue;
+          }
+          next.setStatus(FragmentInstanceTaskStatus.ABORTED);
+        } finally {
+          next.unlock();
+        }
+        try {
+          // Or we should do something to abort
+          timeoutCallback.call(next);
+        } catch (Exception e) {
+          logger.error("Abort instance " + next.getId() + " failed", e);
+        }
+      } catch (InterruptedException e) {
+        logger.info("{} is interrupted.", this.getName());
+        break;
       }
-    } catch (InterruptedException e) {
-      logger.info("{} is interrupted.", this.getName());
     }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java
index 3bda549..a27db32 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java
@@ -39,9 +39,8 @@ public interface IFragmentInstanceManager {
    * downstream data has been consumed.
    *
    * @param instanceID the fragment instance to be notified.
-   * @param downstreamInstanceId the downstream instance id.
    */
-  void outputBlockAvailable(FragmentInstanceID instanceID, FragmentInstanceID downstreamInstanceId);
+  void outputBlockAvailable(FragmentInstanceID instanceID);
 
   /**
    * abort all the instances in this query
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IndexedBlockingQueue.java b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IndexedBlockingQueue.java
index bcabb5c..6ddc610 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IndexedBlockingQueue.java
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IndexedBlockingQueue.java
@@ -29,7 +29,9 @@ package org.apache.iotdb.mpp.execution.queue;
  * <p>3. Can push a non-null element to queue. When the queue is beyond the max size, an exception
  * will be thrown.
  *
- * <p>4. Can remove an element by a long type id.
+ * <p>4. Can remove an element by a type of {@link ID}.
+ *
+ * <p>5. Each element has the different ID.
  */
 public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> {
 
@@ -79,15 +81,17 @@ public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> {
     if (element == null) {
       throw new NullPointerException("pushed element is null");
     }
-    if (size + 1 > MAX_CAPACITY) {
+    int sizeDelta = contains(element) ? 0 : 1;
+    if (size + sizeDelta > MAX_CAPACITY) {
       throw new IllegalStateException("the queue is full");
     }
     pushToQueue(element);
+    size += sizeDelta;
     this.notifyAll();
   }
 
   /**
-   * Remove and return the element by a long id. It returns null if it doesn't exist.
+   * Remove and return the element by id. It returns null if it doesn't exist.
    *
    * @param id the id of the element to be removed.
    * @return the removed element.
@@ -103,6 +107,17 @@ public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> {
   }
 
   /**
+   * Get the element by id. It returns null if it doesn't exist.
+   *
+   * @param id the id of the element.
+   * @return the removed element.
+   */
+  public synchronized E get(ID id) {
+    queryHolder.setId(id);
+    return get(queryHolder);
+  }
+
+  /**
    * Get the current queue size.
    *
    * @return the current queue size.
@@ -139,10 +154,32 @@ public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> {
   protected abstract void pushToQueue(E element);
 
   /**
-   * Remove and return the element by a long id. It returns null if it doesn't exist.
+   * Remove and return the element by its ID. It returns null if it doesn't exist.
+   *
+   * <p>This implementation needn't be thread-safe.
    *
    * @param element the element to be removed.
    * @return the removed element.
    */
   protected abstract E remove(E element);
+
+  /**
+   * Check whether an element with the same ID exists.
+   *
+   * <p>This implementation needn't be thread-safe.
+   *
+   * @param element the element to be checked.
+   * @return true if an element with the same ID exists, otherwise false.
+   */
+  protected abstract boolean contains(E element);
+
+  /**
+   * Return the element with the same id of the input, null if it doesn't exist.
+   *
+   * <p>This implementation needn't be thread-safe.
+   *
+   * @param element the element to be queried.
+   * @return the element with the same id in the queue. Null if it doesn't exist.
+   */
+  protected abstract E get(E element);
 }
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueue.java b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueue.java
index a5ccf14..a9cad83 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueue.java
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueue.java
@@ -28,10 +28,10 @@ import java.util.TreeMap;
  * <p>The time complexity of operations are:
  *
  * <ul>
- *   <li><b>{@link #size()}: </b> O(1).
  *   <li><b>{@link #remove(IDIndexedAccessible)} ()}: </b> O(logN).
  *   <li><b>{@link #push(IDIndexedAccessible)}: </b> O(logN).
  *   <li><b>{@link #poll()}: </b> O(logN).
+ *   <li><b>{@link #get(ID)}}: </b> O(1).
  * </ul>
  */
 public class L1PriorityQueue<E extends IDIndexedAccessible> extends IndexedBlockingQueue<E> {
@@ -73,4 +73,14 @@ public class L1PriorityQueue<E extends IDIndexedAccessible> extends IndexedBlock
   protected E remove(E element) {
     return elements.remove(element);
   }
+
+  @Override
+  protected boolean contains(E element) {
+    return elements.containsKey(element);
+  }
+
+  @Override
+  protected E get(E element) {
+    return elements.get(element);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueue.java b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueue.java
index 3b8a9e6..c23e74a 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueue.java
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueue.java
@@ -30,10 +30,10 @@ import java.util.TreeMap;
  * <p>The time complexity of operations are:
  *
  * <ul>
- *   <li><b>{@link #size()}: </b> O(1).
  *   <li><b>{@link #remove(IDIndexedAccessible)} ()}: </b> O(logN).
  *   <li><b>{@link #push(IDIndexedAccessible)}: </b> O(logN).
  *   <li><b>{@link #poll()}: </b> O(logN).
+ *   <li><b>{@link #get(ID)}}: </b> O(1).
  * </ul>
  */
 public class L2PriorityQueue<E extends IDIndexedAccessible> extends IndexedBlockingQueue<E> {
@@ -75,6 +75,7 @@ public class L2PriorityQueue<E extends IDIndexedAccessible> extends IndexedBlock
 
   @Override
   protected void pushToQueue(E element) {
+    workingElements.remove(element);
     idleElements.put(element, element);
   }
 
@@ -86,4 +87,18 @@ public class L2PriorityQueue<E extends IDIndexedAccessible> extends IndexedBlock
     }
     return e;
   }
+
+  @Override
+  protected boolean contains(E element) {
+    return workingElements.containsKey(element) || idleElements.containsKey(element);
+  }
+
+  @Override
+  protected E get(E element) {
+    E e = workingElements.get(element);
+    if (e != null) {
+      return e;
+    }
+    return idleElements.get(element);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTask.java b/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTask.java
index 7bd7205..1836c74 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTask.java
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTask.java
@@ -70,6 +70,19 @@ public class FragmentInstanceTask implements IDIndexedAccessible {
     return status;
   }
 
+  public boolean isEndState() {
+    return status == FragmentInstanceTaskStatus.ABORTED
+        || status == FragmentInstanceTaskStatus.FINISHED;
+  }
+
+  public void inputReady(FragmentInstanceID inputId) {
+    throw new UnsupportedOperationException("unsupported");
+  }
+
+  public void outputReady() {
+    throw new UnsupportedOperationException("unsupported");
+  }
+
   public void setStatus(FragmentInstanceTaskStatus status) {
     this.status = status;
   }
diff --git a/server/src/test/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueueTest.java b/server/src/test/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueueTest.java
new file mode 100644
index 0000000..3809d4a
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueueTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.queue;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class L1PriorityQueueTest {
+
+  @Test
+  public void testPollBlocked() throws InterruptedException {
+    IndexedBlockingQueue<QueueElement> queue =
+        new L1PriorityQueue<>(
+            10,
+            (o1, o2) -> {
+              if (o1.equals(o2)) {
+                return 0;
+              }
+              return Integer.compare(o1.getValue(), o2.getValue());
+            },
+            new QueueElement(new QueueElement.QueueElementID(0), 0));
+    List<QueueElement> res = new ArrayList<>();
+    Thread t1 =
+        new Thread(
+            () -> {
+              try {
+                QueueElement e = queue.poll();
+                res.add(e);
+              } catch (InterruptedException e) {
+                e.printStackTrace();
+                Assert.fail();
+              }
+            });
+    t1.start();
+    Thread.sleep(10);
+    Assert.assertEquals(Thread.State.WAITING, t1.getState());
+    QueueElement e2 = new QueueElement(new QueueElement.QueueElementID(1), 1);
+    queue.push(e2);
+    Thread.sleep(10);
+    Assert.assertEquals(Thread.State.TERMINATED, t1.getState());
+    Assert.assertEquals(1, res.size());
+    Assert.assertEquals(e2.getId().toString(), res.get(0).getId().toString());
+  }
+
+  @Test
+  public void testPushExceedCapacity() {
+    IndexedBlockingQueue<QueueElement> queue =
+        new L1PriorityQueue<>(
+            1,
+            (o1, o2) -> {
+              if (o1.equals(o2)) {
+                return 0;
+              }
+              return Integer.compare(o1.getValue(), o2.getValue());
+            },
+            new QueueElement(new QueueElement.QueueElementID(0), 0));
+    QueueElement e2 = new QueueElement(new QueueElement.QueueElementID(1), 1);
+    queue.push(e2);
+    QueueElement e2e = new QueueElement(new QueueElement.QueueElementID(1), 10);
+    queue.push(e2e);
+    QueueElement e3 = new QueueElement(new QueueElement.QueueElementID(2), 2);
+    try {
+      queue.push(e3);
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      // ignore;
+    }
+  }
+
+  @Test
+  public void testPushAndPoll() throws InterruptedException {
+    IndexedBlockingQueue<QueueElement> queue =
+        new L1PriorityQueue<>(
+            10,
+            (o1, o2) -> {
+              if (o1.equals(o2)) {
+                return 0;
+              }
+              return Integer.compare(o1.getValue(), o2.getValue());
+            },
+            new QueueElement(new QueueElement.QueueElementID(0), 0));
+    QueueElement e1 = new QueueElement(new QueueElement.QueueElementID(1), 10);
+    queue.push(e1);
+    QueueElement e1e = new QueueElement(new QueueElement.QueueElementID(1), 20);
+    queue.push(e1e);
+    // only 1 element with the same id can be put into
+    Assert.assertEquals(1, queue.size());
+    QueueElement e2 = new QueueElement(new QueueElement.QueueElementID(2), 5);
+    queue.push(e2);
+    Assert.assertEquals(2, queue.size());
+    Assert.assertEquals(e2.getId().toString(), queue.poll().getId().toString());
+    Assert.assertEquals(1, queue.size());
+    Assert.assertEquals(e1e.getId().toString(), queue.poll().getId().toString());
+    Assert.assertEquals(0, queue.size());
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueueTest.java b/server/src/test/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueueTest.java
new file mode 100644
index 0000000..5d34bb7
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueueTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.queue;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class L2PriorityQueueTest {
+  @Test
+  public void testPollBlocked() throws InterruptedException {
+    IndexedBlockingQueue<QueueElement> queue =
+        new L2PriorityQueue<>(
+            10,
+            (o1, o2) -> {
+              if (o1.equals(o2)) {
+                return 0;
+              }
+              return Integer.compare(o1.getValue(), o2.getValue());
+            },
+            new QueueElement(new QueueElement.QueueElementID(0), 0));
+    List<QueueElement> res = new ArrayList<>();
+    Thread t1 =
+        new Thread(
+            () -> {
+              try {
+                QueueElement e = queue.poll();
+                res.add(e);
+              } catch (InterruptedException e) {
+                e.printStackTrace();
+                Assert.fail();
+              }
+            });
+    t1.start();
+    Thread.sleep(10);
+    Assert.assertEquals(Thread.State.WAITING, t1.getState());
+    QueueElement e2 = new QueueElement(new QueueElement.QueueElementID(1), 1);
+    queue.push(e2);
+    Thread.sleep(10);
+    Assert.assertEquals(Thread.State.TERMINATED, t1.getState());
+    Assert.assertEquals(1, res.size());
+    Assert.assertEquals(e2.getId().toString(), res.get(0).getId().toString());
+  }
+
+  @Test
+  public void testPushExceedCapacity() {
+    IndexedBlockingQueue<QueueElement> queue =
+        new L2PriorityQueue<>(
+            1,
+            (o1, o2) -> {
+              if (o1.equals(o2)) {
+                return 0;
+              }
+              return Integer.compare(o1.getValue(), o2.getValue());
+            },
+            new QueueElement(new QueueElement.QueueElementID(0), 0));
+    QueueElement e2 = new QueueElement(new QueueElement.QueueElementID(1), 1);
+    queue.push(e2);
+    QueueElement e2e = new QueueElement(new QueueElement.QueueElementID(1), 10);
+    queue.push(e2e);
+    QueueElement e3 = new QueueElement(new QueueElement.QueueElementID(2), 2);
+    try {
+      queue.push(e3);
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      // ignore;
+    }
+  }
+
+  @Test
+  public void testPushAndPoll() throws InterruptedException {
+    IndexedBlockingQueue<QueueElement> queue =
+        new L2PriorityQueue<>(
+            10,
+            (o1, o2) -> {
+              if (o1.equals(o2)) {
+                return 0;
+              }
+              int res = Integer.compare(o1.getValue(), o2.getValue());
+              if (res != 0) {
+                return res;
+              }
+              return String.CASE_INSENSITIVE_ORDER.compare(
+                  o1.getId().toString(), o2.getId().toString());
+            },
+            new QueueElement(new QueueElement.QueueElementID(0), 0));
+    QueueElement e1 = new QueueElement(new QueueElement.QueueElementID(1), 10);
+    queue.push(e1);
+    QueueElement e1e = new QueueElement(new QueueElement.QueueElementID(1), 20);
+    queue.push(e1e);
+    // only 1 element with the same id can be put into
+    Assert.assertEquals(1, queue.size());
+    QueueElement e2 = new QueueElement(new QueueElement.QueueElementID(2), 5);
+    queue.push(e2);
+    Assert.assertEquals(2, queue.size());
+    Assert.assertEquals(e2.getId().toString(), queue.poll().getId().toString());
+    Assert.assertEquals(1, queue.size());
+    // L1: 5 -> 20 L2: 10
+    QueueElement e3 = new QueueElement(new QueueElement.QueueElementID(3), 10);
+    queue.push(e3);
+    Assert.assertEquals(e1e.getId().toString(), queue.poll().getId().toString());
+    Assert.assertEquals(1, queue.size());
+    Assert.assertEquals(e3.getId().toString(), queue.poll().getId().toString());
+    Assert.assertEquals(0, queue.size());
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/mpp/execution/queue/QueueElement.java b/server/src/test/java/org/apache/iotdb/mpp/execution/queue/QueueElement.java
new file mode 100644
index 0000000..4aed3a5
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/mpp/execution/queue/QueueElement.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.queue;
+
+public class QueueElement implements IDIndexedAccessible {
+  private QueueElementID id;
+  private final int value;
+
+  public QueueElement(QueueElementID id, int value) {
+    this.id = id;
+    this.value = value;
+  }
+
+  public int getValue() {
+    return this.value;
+  }
+
+  @Override
+  public ID getId() {
+    return id;
+  }
+
+  @Override
+  public void setId(ID id) {
+    this.id = (QueueElementID) id;
+  }
+
+  @Override
+  public int hashCode() {
+    return id.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    return o instanceof QueueElement && ((QueueElement) o).getId().equals(this.id);
+  }
+
+  public static class QueueElementID implements ID {
+    private final int id;
+
+    public QueueElementID(int id) {
+      this.id = id;
+    }
+
+    public int getId() {
+      return this.id;
+    }
+
+    @Override
+    public int hashCode() {
+      return Integer.hashCode(id);
+    }
+
+    @Override
+    public String toString() {
+      return String.valueOf(id);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      return o instanceof QueueElementID && ((QueueElementID) o).getId() == this.id;
+    }
+  }
+}