You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by er...@apache.org on 2022/03/17 02:42:49 UTC

[iotdb] 01/06: Basic implementation of FragmentInstanceManager

This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a commit to branch new_mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 678f7126e8da5722cdd37f350cbfd08861e6ba37
Author: ericpai <er...@hotmail.com>
AuthorDate: Mon Mar 14 18:08:07 2022 +0800

    Basic implementation of FragmentInstanceManager
---
 .../org/apache/iotdb/db/service/ServiceType.java   |   3 +-
 .../iotdb/mpp/execution/ExecutionContext.java      |  24 ++++
 .../mpp/execution/FragmentInstanceManager.java     | 101 ++++++++++++++
 .../execution/FragmentInstanceTaskExecutor.java    |  51 +++++++
 .../execution/FragmentInstanceTimeoutSentinel.java |  52 ++++++++
 .../mpp/execution/IFragmentInstanceManager.java    |  50 +++++++
 .../org/apache/iotdb/mpp/execution/queue/ID.java   |  22 +++
 .../mpp/execution/queue/IDIndexedAccessible.java   |  27 ++++
 .../mpp/execution/queue/IndexedBlockingQueue.java  | 148 +++++++++++++++++++++
 .../iotdb/mpp/execution/queue/L1PriorityQueue.java |  76 +++++++++++
 .../iotdb/mpp/execution/queue/L2PriorityQueue.java |  89 +++++++++++++
 .../mpp/execution/task/FragmentInstanceID.java     |  67 ++++++++++
 .../mpp/execution/task/FragmentInstanceTask.java   | 138 +++++++++++++++++++
 .../execution/task/FragmentInstanceTaskStatus.java |  37 ++++++
 14 files changed, 884 insertions(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
index 95edca6..9e7e19e 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
@@ -66,7 +66,8 @@ public enum ServiceType {
       "Cluster Data Heartbeat RPC Service", "ClusterDataHeartbeatRPCService"),
   CLUSTER_META_ENGINE("Cluster Meta Engine", "ClusterMetaEngine"),
   CLUSTER_DATA_ENGINE("Cluster Data Engine", "ClusterDataEngine"),
-  REST_SERVICE("REST Service", "REST Service");
+  REST_SERVICE("REST Service", "REST Service"),
+  FRAGMENT_INSTANCE_MANAGER_SERVICE("Fragment instance manager", "FragmentInstanceManager");
 
   private final String name;
   private final String jmxName;
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/ExecutionContext.java b/server/src/main/java/org/apache/iotdb/mpp/execution/ExecutionContext.java
new file mode 100644
index 0000000..7428b73
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/ExecutionContext.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution;
+
+import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
+
+/** The execution context of a {@link FragmentInstanceTask} */
+public class ExecutionContext {}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceManager.java
new file mode 100644
index 0000000..4c7c157
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceManager.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution;
+
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.service.IService;
+import org.apache.iotdb.db.service.ServiceType;
+import org.apache.iotdb.mpp.execution.queue.IndexedBlockingQueue;
+import org.apache.iotdb.mpp.execution.queue.L1PriorityQueue;
+import org.apache.iotdb.mpp.execution.queue.L2PriorityQueue;
+import org.apache.iotdb.mpp.execution.task.FragmentInstanceID;
+import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** the manager of fragment instances scheduling */
+public class FragmentInstanceManager implements IFragmentInstanceManager, IService {
+
+  public static IFragmentInstanceManager getInstance() {
+    return InstanceHolder.instance;
+  }
+
+  private final IndexedBlockingQueue<FragmentInstanceTask> readyQueue;
+  private final IndexedBlockingQueue<FragmentInstanceTask> timeoutQueue;
+  private final Map<String, List<FragmentInstanceID>> queryMap;
+
+  private static final int MAX_CAPACITY = 1000; // TODO: load from config files
+  private static final int WORKER_THREAD_NUM = 4; // TODO: load from config files
+  private final ThreadGroup workerGroups = new ThreadGroup("ScheduleThreads");
+
+  public FragmentInstanceManager() {
+    this.readyQueue =
+        new L2PriorityQueue<>(
+            MAX_CAPACITY,
+            new FragmentInstanceTask.SchedulePriorityComparator(),
+            new FragmentInstanceTask());
+    this.timeoutQueue =
+        new L1PriorityQueue<>(
+            MAX_CAPACITY,
+            new FragmentInstanceTask.SchedulePriorityComparator(),
+            new FragmentInstanceTask());
+    this.queryMap = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void start() throws StartupException {
+    for (int i = 0; i < WORKER_THREAD_NUM; i++) {
+      new FragmentInstanceTaskExecutor("Worker-Thread-" + i, workerGroups, readyQueue).start();
+    }
+    new FragmentInstanceTimeoutSentinel("Sentinel-Thread", workerGroups, timeoutQueue).start();
+  }
+
+  @Override
+  public void stop() {
+    workerGroups.interrupt();
+  }
+
+  @Override
+  public ServiceType getID() {
+    return ServiceType.FRAGMENT_INSTANCE_MANAGER_SERVICE;
+  }
+
+  @Override
+  public void submitFragmentInstance() {}
+
+  @Override
+  public void inputBlockAvailable(
+      FragmentInstanceID instanceID, FragmentInstanceID upstreamInstanceId) {}
+
+  @Override
+  public void outputBlockAvailable(
+      FragmentInstanceID instanceID, FragmentInstanceID downstreamInstanceId) {}
+
+  @Override
+  public void abortQuery(String queryId) {}
+
+  private static class InstanceHolder {
+
+    private InstanceHolder() {}
+
+    private static final IFragmentInstanceManager instance = new FragmentInstanceManager();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java
new file mode 100644
index 0000000..5c704db
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution;
+
+import org.apache.iotdb.mpp.execution.queue.IndexedBlockingQueue;
+import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** the worker thread of {@link FragmentInstanceTask} */
+public class FragmentInstanceTaskExecutor extends Thread {
+
+  private static final Logger logger = LoggerFactory.getLogger(FragmentInstanceTaskExecutor.class);
+
+  private final IndexedBlockingQueue<FragmentInstanceTask> queue;
+
+  public FragmentInstanceTaskExecutor(
+      String workerId, ThreadGroup tg, IndexedBlockingQueue<FragmentInstanceTask> queue) {
+    super(tg, workerId);
+    this.queue = queue;
+  }
+
+  @Override
+  public void run() {
+    try {
+      while (true) {
+        FragmentInstanceTask next = queue.poll();
+        // do logic here
+      }
+    } catch (InterruptedException e) {
+      logger.info("{} is interrupted.", this.getName());
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTimeoutSentinel.java b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTimeoutSentinel.java
new file mode 100644
index 0000000..7b352ba
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTimeoutSentinel.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution;
+
+import org.apache.iotdb.mpp.execution.queue.IndexedBlockingQueue;
+import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** the thread for watching the timeout of {@link FragmentInstanceTask} */
+public class FragmentInstanceTimeoutSentinel extends Thread {
+
+  private static final Logger logger =
+      LoggerFactory.getLogger(FragmentInstanceTimeoutSentinel.class);
+
+  private final IndexedBlockingQueue<FragmentInstanceTask> queue;
+
+  public FragmentInstanceTimeoutSentinel(
+      String workerId, ThreadGroup tg, IndexedBlockingQueue<FragmentInstanceTask> queue) {
+    super(tg, workerId);
+    this.queue = queue;
+  }
+
+  @Override
+  public void run() {
+    try {
+      while (true) {
+        FragmentInstanceTask next = queue.poll();
+        // do logic here
+      }
+    } catch (InterruptedException e) {
+      logger.info("{} is interrupted.", this.getName());
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java
new file mode 100644
index 0000000..e0eecfa
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution;
+
+import org.apache.iotdb.mpp.execution.task.FragmentInstanceID;
+
+/** the interface of fragment instance scheduling */
+public interface IFragmentInstanceManager {
+
+  void submitFragmentInstance();
+
+  /**
+   * the notifying interface for {@link DataBlockManager} when upstream data comes.
+   *
+   * @param instanceID the fragment instance to be notified.
+   * @param upstreamInstanceId the upstream instance id.
+   */
+  void inputBlockAvailable(FragmentInstanceID instanceID, FragmentInstanceID upstreamInstanceId);
+
+  /**
+   * the notifying interface for {@link DataBlockManager} when downstream data has been consumed.
+   *
+   * @param instanceID the fragment instance to be notified.
+   * @param downstreamInstanceId the downstream instance id.
+   */
+  void outputBlockAvailable(FragmentInstanceID instanceID, FragmentInstanceID downstreamInstanceId);
+
+  /**
+   * abort all the instances in this query
+   *
+   * @param queryId the id of the query to be aborted.
+   */
+  void abortQuery(String queryId);
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/ID.java b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/ID.java
new file mode 100644
index 0000000..cc7d58f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/ID.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.queue;
+
+/** A simple interface to indicate the id type */
+public interface ID {}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IDIndexedAccessible.java b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IDIndexedAccessible.java
new file mode 100644
index 0000000..5ae4c96
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IDIndexedAccessible.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.queue;
+
+/** A simple interface for id getter and setter */
+public interface IDIndexedAccessible {
+
+  ID getId();
+
+  void setId(ID id);
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IndexedBlockingQueue.java b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IndexedBlockingQueue.java
new file mode 100644
index 0000000..bcabb5c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IndexedBlockingQueue.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.queue;
+
+/**
+ * The base class of a special kind of blocking queue, which has these characters:
+ *
+ * <p>1. Thread-safe.
+ *
+ * <p>2. Can poll from queue head. When the queue is empty, the poll() will be blocked until an
+ * element is inserted.
+ *
+ * <p>3. Can push a non-null element to queue. When the queue is beyond the max size, an exception
+ * will be thrown.
+ *
+ * <p>4. Can remove an element by a long type id.
+ */
+public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> {
+
+  private final int MAX_CAPACITY;
+  private final E queryHolder;
+  private int size;
+
+  /**
+   * Init the queue with a max capacity. The queryHolder is just a simple reused object in query to
+   * avoid small objects allocation. It should be not used in any other places out of the queue as
+   * the id may be mutated.
+   *
+   * @param maxCapacity the max capacity of the queue.
+   * @param queryHolder the query holder instance.
+   * @throws IllegalArgumentException if maxCapacity <= 0.
+   */
+  public IndexedBlockingQueue(int maxCapacity, E queryHolder) {
+    this.MAX_CAPACITY = maxCapacity;
+    this.queryHolder = queryHolder;
+  }
+
+  /**
+   * Get and remove the first element of the queue. If the queue is empty, this call will be blocked
+   * until an element has been pushed.
+   *
+   * @return the queue head element.
+   */
+  public synchronized E poll() throws InterruptedException {
+    while (isEmpty()) {
+      this.wait();
+    }
+    E output = pollFirst();
+    size--;
+    return output;
+  }
+
+  /**
+   * Push an element to the queue. The new element position is determined by the implementation. If
+   * the queue size has been reached the maxCapacity, an {@link IllegalStateException} will be
+   * thrown. If the element is null, an {@link NullPointerException} will be thrown.
+   *
+   * @param element the element to be pushed.
+   * @throws NullPointerException the pushed element is null.
+   * @throws IllegalStateException the queue size has been reached the maxCapacity.
+   */
+  public synchronized void push(E element) {
+    if (element == null) {
+      throw new NullPointerException("pushed element is null");
+    }
+    if (size + 1 > MAX_CAPACITY) {
+      throw new IllegalStateException("the queue is full");
+    }
+    pushToQueue(element);
+    this.notifyAll();
+  }
+
+  /**
+   * Remove and return the element by a long id. It returns null if it doesn't exist.
+   *
+   * @param id the id of the element to be removed.
+   * @return the removed element.
+   */
+  public synchronized E remove(ID id) {
+    queryHolder.setId(id);
+    E output = remove(queryHolder);
+    if (output == null) {
+      return null;
+    }
+    size--;
+    return output;
+  }
+
+  /**
+   * Get the current queue size.
+   *
+   * @return the current queue size.
+   */
+  public final synchronized int size() {
+    return size;
+  }
+
+  /**
+   * Whether the queue is empty.
+   *
+   * <p>This implementation needn't be thread-safe.
+   *
+   * @return true if the queue is empty, otherwise false.
+   */
+  protected abstract boolean isEmpty();
+
+  /**
+   * Get and remove the first element.
+   *
+   * <p>This implementation needn't be thread-safe.
+   *
+   * @return The first element.
+   */
+  protected abstract E pollFirst();
+
+  /**
+   * Push the element into the queue.
+   *
+   * <p>This implementation needn't be thread-safe.
+   *
+   * @param element the element to be pushed.
+   */
+  protected abstract void pushToQueue(E element);
+
+  /**
+   * Remove and return the element by a long id. It returns null if it doesn't exist.
+   *
+   * @param element the element to be removed.
+   * @return the removed element.
+   */
+  protected abstract E remove(E element);
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueue.java b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueue.java
new file mode 100644
index 0000000..a5ccf14
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueue.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.queue;
+
+import java.util.Comparator;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/**
+ * An efficient subclass of {@link IndexedBlockingQueue} with 1-level priority groups.
+ *
+ * <p>The time complexity of operations are:
+ *
+ * <ul>
+ *   <li><b>{@link #size()}: </b> O(1).
+ *   <li><b>{@link #remove(IDIndexedAccessible)} ()}: </b> O(logN).
+ *   <li><b>{@link #push(IDIndexedAccessible)}: </b> O(logN).
+ *   <li><b>{@link #poll()}: </b> O(logN).
+ * </ul>
+ */
+public class L1PriorityQueue<E extends IDIndexedAccessible> extends IndexedBlockingQueue<E> {
+
+  // Here we use a map not a set to act as a queue because we need to get the element reference
+  // after it was removed.
+  private final SortedMap<E, E> elements;
+
+  /**
+   * Init the queue with max capacity and specified comparator.
+   *
+   * @see IndexedBlockingQueue
+   * @param maxCapacity the max capacity of the queue.
+   * @param comparator the comparator for comparing the elements.
+   * @param queryHolder the query holder instance.
+   * @throws IllegalArgumentException if maxCapacity <= 0.
+   */
+  public L1PriorityQueue(int maxCapacity, Comparator<E> comparator, E queryHolder) {
+    super(maxCapacity, queryHolder);
+    this.elements = new TreeMap<>(comparator);
+  }
+
+  @Override
+  protected boolean isEmpty() {
+    return elements.isEmpty();
+  }
+
+  @Override
+  protected E pollFirst() {
+    return elements.remove(elements.firstKey());
+  }
+
+  @Override
+  protected void pushToQueue(E element) {
+    elements.put(element, element);
+  }
+
+  @Override
+  protected E remove(E element) {
+    return elements.remove(element);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueue.java b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueue.java
new file mode 100644
index 0000000..3b8a9e6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueue.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.queue;
+
+import java.util.Comparator;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/**
+ * An efficient subclass of {@link IndexedBlockingQueue} with 2-level priority groups. The
+ * advantages compared to {@link L1PriorityQueue} are that each element in this queue will not be
+ * starved to death by its low sequence order.
+ *
+ * <p>The time complexity of operations are:
+ *
+ * <ul>
+ *   <li><b>{@link #size()}: </b> O(1).
+ *   <li><b>{@link #remove(IDIndexedAccessible)} ()}: </b> O(logN).
+ *   <li><b>{@link #push(IDIndexedAccessible)}: </b> O(logN).
+ *   <li><b>{@link #poll()}: </b> O(logN).
+ * </ul>
+ */
+public class L2PriorityQueue<E extends IDIndexedAccessible> extends IndexedBlockingQueue<E> {
+
+  // Here we use a map not a set to act as a queue because we need to get the element reference
+  // after it was removed.
+  private SortedMap<E, E> workingElements;
+  private SortedMap<E, E> idleElements;
+
+  /**
+   * Init the queue with max capacity and specified comparator.
+   *
+   * @see IndexedBlockingQueue
+   * @param maxCapacity the max capacity of the queue.
+   * @param comparator the comparator for comparing the elements.
+   * @param queryHolder the query holder instance.
+   * @throws IllegalArgumentException if maxCapacity <= 0.
+   */
+  public L2PriorityQueue(int maxCapacity, Comparator<E> comparator, E queryHolder) {
+    super(maxCapacity, queryHolder);
+    this.workingElements = new TreeMap<>(comparator);
+    this.idleElements = new TreeMap<>(comparator);
+  }
+
+  @Override
+  protected boolean isEmpty() {
+    return workingElements.isEmpty() && idleElements.isEmpty();
+  }
+
+  @Override
+  protected E pollFirst() {
+    if (workingElements.isEmpty()) {
+      SortedMap<E, E> tmp = workingElements;
+      workingElements = idleElements;
+      idleElements = tmp;
+    }
+    return workingElements.remove(workingElements.firstKey());
+  }
+
+  @Override
+  protected void pushToQueue(E element) {
+    idleElements.put(element, element);
+  }
+
+  @Override
+  protected E remove(E element) {
+    E e = workingElements.remove(element);
+    if (e == null) {
+      e = idleElements.remove(element);
+    }
+    return e;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceID.java b/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceID.java
new file mode 100644
index 0000000..f52b5c3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceID.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.task;
+
+import org.apache.iotdb.mpp.execution.queue.ID;
+
+import org.jetbrains.annotations.NotNull;
+
+/** the class of id of the fragment instance */
+public class FragmentInstanceID implements ID, Comparable<FragmentInstanceTask> {
+
+  private final String instanceId;
+  private final String fragmentId;
+  private final String queryId;
+
+  public FragmentInstanceID(String queryId, String fragmentId, String instanceId) {
+    this.queryId = queryId;
+    this.fragmentId = fragmentId;
+    this.instanceId = instanceId;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    return o instanceof FragmentInstanceID
+        && queryId.equals(((FragmentInstanceID) o).getQueryId())
+        && fragmentId.equals(((FragmentInstanceID) o).getFragmentId())
+        && instanceId.equals(((FragmentInstanceID) o).getInstanceId());
+  }
+
+  public String toString() {
+    return String.format("%s.%s.%s", getInstanceId(), getFragmentId(), getQueryId());
+  }
+
+  public String getInstanceId() {
+    return instanceId;
+  }
+
+  public String getFragmentId() {
+    return fragmentId;
+  }
+
+  public String getQueryId() {
+    return queryId;
+  }
+
+  // This is the default comparator of FragmentInstanceID
+  @Override
+  public int compareTo(@NotNull FragmentInstanceTask o) {
+    return String.CASE_INSENSITIVE_ORDER.compare(this.toString(), o.toString());
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTask.java b/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTask.java
new file mode 100644
index 0000000..7bd7205
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTask.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.task;
+
+import org.apache.iotdb.mpp.execution.ExecutionContext;
+import org.apache.iotdb.mpp.execution.FragmentInstanceTaskExecutor;
+import org.apache.iotdb.mpp.execution.queue.ID;
+import org.apache.iotdb.mpp.execution.queue.IDIndexedAccessible;
+
+import java.util.Comparator;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * the scheduling element of {@link FragmentInstanceTaskExecutor}. It wraps a single
+ * FragmentInstance.
+ */
+public class FragmentInstanceTask implements IDIndexedAccessible {
+
+  private FragmentInstanceID id;
+  private FragmentInstanceTaskStatus status;
+  private final ExecutionContext executionContext;
+
+  // the higher this field is, the higher probability it will be scheduled.
+  private long schedulePriority;
+  private final long ddl;
+  private final Lock lock;
+
+  /** Initialize a dummy instance for queryHolder */
+  public FragmentInstanceTask() {
+    this(null, 0L, null);
+  }
+
+  public FragmentInstanceTask(
+      FragmentInstanceID id, long timeoutMs, FragmentInstanceTaskStatus status) {
+    this.id = id;
+    this.setStatus(status);
+    this.executionContext = new ExecutionContext();
+    this.schedulePriority = 0L;
+    this.ddl = System.currentTimeMillis() + timeoutMs;
+    this.lock = new ReentrantLock();
+  }
+
+  public FragmentInstanceID getId() {
+    return id;
+  }
+
+  @Override
+  public void setId(ID id) {
+    this.id = (FragmentInstanceID) id;
+  }
+
+  public FragmentInstanceTaskStatus getStatus() {
+    return status;
+  }
+
+  public void setStatus(FragmentInstanceTaskStatus status) {
+    this.status = status;
+  }
+
+  public ExecutionContext getExecutionContext() {
+    return executionContext;
+  }
+
+  /** Update the schedule priority according to the execution context. */
+  public void updateSchedulePriority() {
+    // TODO: need to implement here
+    this.schedulePriority = System.currentTimeMillis() - ddl;
+  }
+
+  public void lock() {
+    lock.lock();
+  }
+
+  public void unlock() {
+    lock.unlock();
+  }
+
+  public double getSchedulePriority() {
+    return schedulePriority;
+  }
+
+  public long getDDL() {
+    return ddl;
+  }
+
+  /** a comparator of ddl, the less the ddl is, the low order it has. */
+  public static class TimeoutComparator implements Comparator<FragmentInstanceTask> {
+
+    @Override
+    public int compare(FragmentInstanceTask o1, FragmentInstanceTask o2) {
+      if (o1.getId().equals(o2.getId())) {
+        return 0;
+      }
+      if (o1.getDDL() < o2.getDDL()) {
+        return -1;
+      }
+      if (o1.getDDL() > o2.getDDL()) {
+        return 1;
+      }
+      return o1.getId().compareTo(o2);
+    }
+  }
+
+  /** a comparator of ddl, the higher the schedulePriority is, the low order it has. */
+  public static class SchedulePriorityComparator implements Comparator<FragmentInstanceTask> {
+
+    @Override
+    public int compare(FragmentInstanceTask o1, FragmentInstanceTask o2) {
+      if (o1.getId().equals(o2.getId())) {
+        return 0;
+      }
+      if (o1.getSchedulePriority() > o2.getSchedulePriority()) {
+        return -1;
+      }
+      if (o1.getSchedulePriority() < o2.getSchedulePriority()) {
+        return 1;
+      }
+      return o1.getId().compareTo(o2);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTaskStatus.java b/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTaskStatus.java
new file mode 100644
index 0000000..5ee9fb3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTaskStatus.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.task;
+
+/** the status enum of {@link FragmentInstanceTask} */
+public enum FragmentInstanceTaskStatus {
+  /* Ready to be executed */
+  READY,
+
+  /* Being executed */
+  RUNNING,
+
+  /* Waiting upstream input or output consumed by downstream FragmentInstances */
+  BLOCKED,
+
+  /* Interrupted caused by timeout or coordinator's cancellation */
+  ABORTED,
+
+  /* Finished by met the EOF of upstream inputs */
+  FINISHED,
+}