You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by er...@apache.org on 2022/03/17 02:42:49 UTC
[iotdb] 01/06: Basic implementation of FragmentInstanceManager
This is an automated email from the ASF dual-hosted git repository.
ericpai pushed a commit to branch new_mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 678f7126e8da5722cdd37f350cbfd08861e6ba37
Author: ericpai <er...@hotmail.com>
AuthorDate: Mon Mar 14 18:08:07 2022 +0800
Basic implementation of FragmentInstanceManager
---
.../org/apache/iotdb/db/service/ServiceType.java | 3 +-
.../iotdb/mpp/execution/ExecutionContext.java | 24 ++++
.../mpp/execution/FragmentInstanceManager.java | 101 ++++++++++++++
.../execution/FragmentInstanceTaskExecutor.java | 51 +++++++
.../execution/FragmentInstanceTimeoutSentinel.java | 52 ++++++++
.../mpp/execution/IFragmentInstanceManager.java | 50 +++++++
.../org/apache/iotdb/mpp/execution/queue/ID.java | 22 +++
.../mpp/execution/queue/IDIndexedAccessible.java | 27 ++++
.../mpp/execution/queue/IndexedBlockingQueue.java | 148 +++++++++++++++++++++
.../iotdb/mpp/execution/queue/L1PriorityQueue.java | 76 +++++++++++
.../iotdb/mpp/execution/queue/L2PriorityQueue.java | 89 +++++++++++++
.../mpp/execution/task/FragmentInstanceID.java | 67 ++++++++++
.../mpp/execution/task/FragmentInstanceTask.java | 138 +++++++++++++++++++
.../execution/task/FragmentInstanceTaskStatus.java | 37 ++++++
14 files changed, 884 insertions(+), 1 deletion(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
index 95edca6..9e7e19e 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
@@ -66,7 +66,8 @@ public enum ServiceType {
"Cluster Data Heartbeat RPC Service", "ClusterDataHeartbeatRPCService"),
CLUSTER_META_ENGINE("Cluster Meta Engine", "ClusterMetaEngine"),
CLUSTER_DATA_ENGINE("Cluster Data Engine", "ClusterDataEngine"),
- REST_SERVICE("REST Service", "REST Service");
+ REST_SERVICE("REST Service", "REST Service"),
+ FRAGMENT_INSTANCE_MANAGER_SERVICE("Fragment instance manager", "FragmentInstanceManager");
private final String name;
private final String jmxName;
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/ExecutionContext.java b/server/src/main/java/org/apache/iotdb/mpp/execution/ExecutionContext.java
new file mode 100644
index 0000000..7428b73
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/ExecutionContext.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution;
+
+import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
+
+/** The execution context of a {@link FragmentInstanceTask} */
+public class ExecutionContext {}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceManager.java
new file mode 100644
index 0000000..4c7c157
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceManager.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution;
+
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.service.IService;
+import org.apache.iotdb.db.service.ServiceType;
+import org.apache.iotdb.mpp.execution.queue.IndexedBlockingQueue;
+import org.apache.iotdb.mpp.execution.queue.L1PriorityQueue;
+import org.apache.iotdb.mpp.execution.queue.L2PriorityQueue;
+import org.apache.iotdb.mpp.execution.task.FragmentInstanceID;
+import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** the manager of fragment instances scheduling */
+public class FragmentInstanceManager implements IFragmentInstanceManager, IService {
+
+ public static IFragmentInstanceManager getInstance() {
+ return InstanceHolder.instance;
+ }
+
+ private final IndexedBlockingQueue<FragmentInstanceTask> readyQueue;
+ private final IndexedBlockingQueue<FragmentInstanceTask> timeoutQueue;
+ private final Map<String, List<FragmentInstanceID>> queryMap;
+
+ private static final int MAX_CAPACITY = 1000; // TODO: load from config files
+ private static final int WORKER_THREAD_NUM = 4; // TODO: load from config files
+ private final ThreadGroup workerGroups = new ThreadGroup("ScheduleThreads");
+
+ public FragmentInstanceManager() {
+ this.readyQueue =
+ new L2PriorityQueue<>(
+ MAX_CAPACITY,
+ new FragmentInstanceTask.SchedulePriorityComparator(),
+ new FragmentInstanceTask());
+ this.timeoutQueue =
+ new L1PriorityQueue<>(
+ MAX_CAPACITY,
+ new FragmentInstanceTask.SchedulePriorityComparator(),
+ new FragmentInstanceTask());
+ this.queryMap = new ConcurrentHashMap<>();
+ }
+
+ @Override
+ public void start() throws StartupException {
+ for (int i = 0; i < WORKER_THREAD_NUM; i++) {
+ new FragmentInstanceTaskExecutor("Worker-Thread-" + i, workerGroups, readyQueue).start();
+ }
+ new FragmentInstanceTimeoutSentinel("Sentinel-Thread", workerGroups, timeoutQueue).start();
+ }
+
+ @Override
+ public void stop() {
+ workerGroups.interrupt();
+ }
+
+ @Override
+ public ServiceType getID() {
+ return ServiceType.FRAGMENT_INSTANCE_MANAGER_SERVICE;
+ }
+
+ @Override
+ public void submitFragmentInstance() {}
+
+ @Override
+ public void inputBlockAvailable(
+ FragmentInstanceID instanceID, FragmentInstanceID upstreamInstanceId) {}
+
+ @Override
+ public void outputBlockAvailable(
+ FragmentInstanceID instanceID, FragmentInstanceID downstreamInstanceId) {}
+
+ @Override
+ public void abortQuery(String queryId) {}
+
+ private static class InstanceHolder {
+
+ private InstanceHolder() {}
+
+ private static final IFragmentInstanceManager instance = new FragmentInstanceManager();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java
new file mode 100644
index 0000000..5c704db
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution;
+
+import org.apache.iotdb.mpp.execution.queue.IndexedBlockingQueue;
+import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** the worker thread of {@link FragmentInstanceTask} */
+public class FragmentInstanceTaskExecutor extends Thread {
+
+ private static final Logger logger = LoggerFactory.getLogger(FragmentInstanceTaskExecutor.class);
+
+ private final IndexedBlockingQueue<FragmentInstanceTask> queue;
+
+ public FragmentInstanceTaskExecutor(
+ String workerId, ThreadGroup tg, IndexedBlockingQueue<FragmentInstanceTask> queue) {
+ super(tg, workerId);
+ this.queue = queue;
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ FragmentInstanceTask next = queue.poll();
+ // do logic here
+ }
+ } catch (InterruptedException e) {
+ logger.info("{} is interrupted.", this.getName());
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTimeoutSentinel.java b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTimeoutSentinel.java
new file mode 100644
index 0000000..7b352ba
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTimeoutSentinel.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution;
+
+import org.apache.iotdb.mpp.execution.queue.IndexedBlockingQueue;
+import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** the thread for watching the timeout of {@link FragmentInstanceTask} */
+public class FragmentInstanceTimeoutSentinel extends Thread {
+
+ private static final Logger logger =
+ LoggerFactory.getLogger(FragmentInstanceTimeoutSentinel.class);
+
+ private final IndexedBlockingQueue<FragmentInstanceTask> queue;
+
+ public FragmentInstanceTimeoutSentinel(
+ String workerId, ThreadGroup tg, IndexedBlockingQueue<FragmentInstanceTask> queue) {
+ super(tg, workerId);
+ this.queue = queue;
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ FragmentInstanceTask next = queue.poll();
+ // do logic here
+ }
+ } catch (InterruptedException e) {
+ logger.info("{} is interrupted.", this.getName());
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java
new file mode 100644
index 0000000..e0eecfa
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution;
+
+import org.apache.iotdb.mpp.execution.task.FragmentInstanceID;
+
+/** the interface of fragment instance scheduling */
+public interface IFragmentInstanceManager {
+
+ void submitFragmentInstance();
+
+ /**
+ * the notifying interface for {@link DataBlockManager} when upstream data comes.
+ *
+ * @param instanceID the fragment instance to be notified.
+ * @param upstreamInstanceId the upstream instance id.
+ */
+ void inputBlockAvailable(FragmentInstanceID instanceID, FragmentInstanceID upstreamInstanceId);
+
+ /**
+ * the notifying interface for {@link DataBlockManager} when downstream data has been consumed.
+ *
+ * @param instanceID the fragment instance to be notified.
+ * @param downstreamInstanceId the downstream instance id.
+ */
+ void outputBlockAvailable(FragmentInstanceID instanceID, FragmentInstanceID downstreamInstanceId);
+
+ /**
+ * abort all the instances in this query
+ *
+ * @param queryId the id of the query to be aborted.
+ */
+ void abortQuery(String queryId);
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/ID.java b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/ID.java
new file mode 100644
index 0000000..cc7d58f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/ID.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.queue;
+
+/** A simple interface to indicate the id type */
+public interface ID {}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IDIndexedAccessible.java b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IDIndexedAccessible.java
new file mode 100644
index 0000000..5ae4c96
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IDIndexedAccessible.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.queue;
+
+/** A simple interface for id getter and setter */
+public interface IDIndexedAccessible {
+
+ ID getId();
+
+ void setId(ID id);
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IndexedBlockingQueue.java b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IndexedBlockingQueue.java
new file mode 100644
index 0000000..bcabb5c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IndexedBlockingQueue.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.queue;
+
+/**
+ * The base class of a special kind of blocking queue, which has these characters:
+ *
+ * <p>1. Thread-safe.
+ *
+ * <p>2. Can poll from queue head. When the queue is empty, the poll() will be blocked until an
+ * element is inserted.
+ *
+ * <p>3. Can push a non-null element to queue. When the queue is beyond the max size, an exception
+ * will be thrown.
+ *
+ * <p>4. Can remove an element by a long type id.
+ */
+public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> {
+
+ private final int MAX_CAPACITY;
+ private final E queryHolder;
+ private int size;
+
+ /**
+ * Init the queue with a max capacity. The queryHolder is just a simple reused object in query to
+ * avoid small objects allocation. It should be not used in any other places out of the queue as
+ * the id may be mutated.
+ *
+ * @param maxCapacity the max capacity of the queue.
+ * @param queryHolder the query holder instance.
+ * @throws IllegalArgumentException if maxCapacity <= 0.
+ */
+ public IndexedBlockingQueue(int maxCapacity, E queryHolder) {
+ this.MAX_CAPACITY = maxCapacity;
+ this.queryHolder = queryHolder;
+ }
+
+ /**
+ * Get and remove the first element of the queue. If the queue is empty, this call will be blocked
+ * until an element has been pushed.
+ *
+ * @return the queue head element.
+ */
+ public synchronized E poll() throws InterruptedException {
+ while (isEmpty()) {
+ this.wait();
+ }
+ E output = pollFirst();
+ size--;
+ return output;
+ }
+
+ /**
+ * Push an element to the queue. The new element position is determined by the implementation. If
+ * the queue size has been reached the maxCapacity, an {@link IllegalStateException} will be
+ * thrown. If the element is null, an {@link NullPointerException} will be thrown.
+ *
+ * @param element the element to be pushed.
+ * @throws NullPointerException the pushed element is null.
+ * @throws IllegalStateException the queue size has been reached the maxCapacity.
+ */
+ public synchronized void push(E element) {
+ if (element == null) {
+ throw new NullPointerException("pushed element is null");
+ }
+ if (size + 1 > MAX_CAPACITY) {
+ throw new IllegalStateException("the queue is full");
+ }
+ pushToQueue(element);
+ this.notifyAll();
+ }
+
+ /**
+ * Remove and return the element by a long id. It returns null if it doesn't exist.
+ *
+ * @param id the id of the element to be removed.
+ * @return the removed element.
+ */
+ public synchronized E remove(ID id) {
+ queryHolder.setId(id);
+ E output = remove(queryHolder);
+ if (output == null) {
+ return null;
+ }
+ size--;
+ return output;
+ }
+
+ /**
+ * Get the current queue size.
+ *
+ * @return the current queue size.
+ */
+ public final synchronized int size() {
+ return size;
+ }
+
+ /**
+ * Whether the queue is empty.
+ *
+ * <p>This implementation needn't be thread-safe.
+ *
+ * @return true if the queue is empty, otherwise false.
+ */
+ protected abstract boolean isEmpty();
+
+ /**
+ * Get and remove the first element.
+ *
+ * <p>This implementation needn't be thread-safe.
+ *
+ * @return The first element.
+ */
+ protected abstract E pollFirst();
+
+ /**
+ * Push the element into the queue.
+ *
+ * <p>This implementation needn't be thread-safe.
+ *
+ * @param element the element to be pushed.
+ */
+ protected abstract void pushToQueue(E element);
+
+ /**
+ * Remove and return the element by a long id. It returns null if it doesn't exist.
+ *
+ * @param element the element to be removed.
+ * @return the removed element.
+ */
+ protected abstract E remove(E element);
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueue.java b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueue.java
new file mode 100644
index 0000000..a5ccf14
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueue.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.queue;
+
+import java.util.Comparator;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/**
+ * An efficient subclass of {@link IndexedBlockingQueue} with 1-level priority groups.
+ *
+ * <p>The time complexity of operations are:
+ *
+ * <ul>
+ * <li><b>{@link #size()}: </b> O(1).
+ * <li><b>{@link #remove(IDIndexedAccessible)} ()}: </b> O(logN).
+ * <li><b>{@link #push(IDIndexedAccessible)}: </b> O(logN).
+ * <li><b>{@link #poll()}: </b> O(logN).
+ * </ul>
+ */
+public class L1PriorityQueue<E extends IDIndexedAccessible> extends IndexedBlockingQueue<E> {
+
+ // Here we use a map not a set to act as a queue because we need to get the element reference
+ // after it was removed.
+ private final SortedMap<E, E> elements;
+
+ /**
+ * Init the queue with max capacity and specified comparator.
+ *
+ * @see IndexedBlockingQueue
+ * @param maxCapacity the max capacity of the queue.
+ * @param comparator the comparator for comparing the elements.
+ * @param queryHolder the query holder instance.
+ * @throws IllegalArgumentException if maxCapacity <= 0.
+ */
+ public L1PriorityQueue(int maxCapacity, Comparator<E> comparator, E queryHolder) {
+ super(maxCapacity, queryHolder);
+ this.elements = new TreeMap<>(comparator);
+ }
+
+ @Override
+ protected boolean isEmpty() {
+ return elements.isEmpty();
+ }
+
+ @Override
+ protected E pollFirst() {
+ return elements.remove(elements.firstKey());
+ }
+
+ @Override
+ protected void pushToQueue(E element) {
+ elements.put(element, element);
+ }
+
+ @Override
+ protected E remove(E element) {
+ return elements.remove(element);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueue.java b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueue.java
new file mode 100644
index 0000000..3b8a9e6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueue.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.queue;
+
+import java.util.Comparator;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/**
+ * An efficient subclass of {@link IndexedBlockingQueue} with 2-level priority groups. The
+ * advantages compared to {@link L1PriorityQueue} are that each element in this queue will not be
+ * starved to death by its low sequence order.
+ *
+ * <p>The time complexity of operations are:
+ *
+ * <ul>
+ * <li><b>{@link #size()}: </b> O(1).
+ * <li><b>{@link #remove(IDIndexedAccessible)} ()}: </b> O(logN).
+ * <li><b>{@link #push(IDIndexedAccessible)}: </b> O(logN).
+ * <li><b>{@link #poll()}: </b> O(logN).
+ * </ul>
+ */
+public class L2PriorityQueue<E extends IDIndexedAccessible> extends IndexedBlockingQueue<E> {
+
+ // Here we use a map not a set to act as a queue because we need to get the element reference
+ // after it was removed.
+ private SortedMap<E, E> workingElements;
+ private SortedMap<E, E> idleElements;
+
+ /**
+ * Init the queue with max capacity and specified comparator.
+ *
+ * @see IndexedBlockingQueue
+ * @param maxCapacity the max capacity of the queue.
+ * @param comparator the comparator for comparing the elements.
+ * @param queryHolder the query holder instance.
+ * @throws IllegalArgumentException if maxCapacity <= 0.
+ */
+ public L2PriorityQueue(int maxCapacity, Comparator<E> comparator, E queryHolder) {
+ super(maxCapacity, queryHolder);
+ this.workingElements = new TreeMap<>(comparator);
+ this.idleElements = new TreeMap<>(comparator);
+ }
+
+ @Override
+ protected boolean isEmpty() {
+ return workingElements.isEmpty() && idleElements.isEmpty();
+ }
+
+ @Override
+ protected E pollFirst() {
+ if (workingElements.isEmpty()) {
+ SortedMap<E, E> tmp = workingElements;
+ workingElements = idleElements;
+ idleElements = tmp;
+ }
+ return workingElements.remove(workingElements.firstKey());
+ }
+
+ @Override
+ protected void pushToQueue(E element) {
+ idleElements.put(element, element);
+ }
+
+ @Override
+ protected E remove(E element) {
+ E e = workingElements.remove(element);
+ if (e == null) {
+ e = idleElements.remove(element);
+ }
+ return e;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceID.java b/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceID.java
new file mode 100644
index 0000000..f52b5c3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceID.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.task;
+
+import org.apache.iotdb.mpp.execution.queue.ID;
+
+import org.jetbrains.annotations.NotNull;
+
+/** the class of id of the fragment instance */
+public class FragmentInstanceID implements ID, Comparable<FragmentInstanceTask> {
+
+ private final String instanceId;
+ private final String fragmentId;
+ private final String queryId;
+
+ public FragmentInstanceID(String queryId, String fragmentId, String instanceId) {
+ this.queryId = queryId;
+ this.fragmentId = fragmentId;
+ this.instanceId = instanceId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return o instanceof FragmentInstanceID
+ && queryId.equals(((FragmentInstanceID) o).getQueryId())
+ && fragmentId.equals(((FragmentInstanceID) o).getFragmentId())
+ && instanceId.equals(((FragmentInstanceID) o).getInstanceId());
+ }
+
+ public String toString() {
+ return String.format("%s.%s.%s", getInstanceId(), getFragmentId(), getQueryId());
+ }
+
+ public String getInstanceId() {
+ return instanceId;
+ }
+
+ public String getFragmentId() {
+ return fragmentId;
+ }
+
+ public String getQueryId() {
+ return queryId;
+ }
+
+ // This is the default comparator of FragmentInstanceID
+ @Override
+ public int compareTo(@NotNull FragmentInstanceTask o) {
+ return String.CASE_INSENSITIVE_ORDER.compare(this.toString(), o.toString());
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTask.java b/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTask.java
new file mode 100644
index 0000000..7bd7205
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTask.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.task;
+
+import org.apache.iotdb.mpp.execution.ExecutionContext;
+import org.apache.iotdb.mpp.execution.FragmentInstanceTaskExecutor;
+import org.apache.iotdb.mpp.execution.queue.ID;
+import org.apache.iotdb.mpp.execution.queue.IDIndexedAccessible;
+
+import java.util.Comparator;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * the scheduling element of {@link FragmentInstanceTaskExecutor}. It wraps a single
+ * FragmentInstance.
+ */
+public class FragmentInstanceTask implements IDIndexedAccessible {
+
+ private FragmentInstanceID id;
+ private FragmentInstanceTaskStatus status;
+ private final ExecutionContext executionContext;
+
+ // the higher this field is, the higher probability it will be scheduled.
+ private long schedulePriority;
+ private final long ddl;
+ private final Lock lock;
+
+ /** Initialize a dummy instance for queryHolder */
+ public FragmentInstanceTask() {
+ this(null, 0L, null);
+ }
+
+ public FragmentInstanceTask(
+ FragmentInstanceID id, long timeoutMs, FragmentInstanceTaskStatus status) {
+ this.id = id;
+ this.setStatus(status);
+ this.executionContext = new ExecutionContext();
+ this.schedulePriority = 0L;
+ this.ddl = System.currentTimeMillis() + timeoutMs;
+ this.lock = new ReentrantLock();
+ }
+
+ public FragmentInstanceID getId() {
+ return id;
+ }
+
+ @Override
+ public void setId(ID id) {
+ this.id = (FragmentInstanceID) id;
+ }
+
+ public FragmentInstanceTaskStatus getStatus() {
+ return status;
+ }
+
+ public void setStatus(FragmentInstanceTaskStatus status) {
+ this.status = status;
+ }
+
+ public ExecutionContext getExecutionContext() {
+ return executionContext;
+ }
+
+ /** Update the schedule priority according to the execution context. */
+ public void updateSchedulePriority() {
+ // TODO: need to implement here
+ this.schedulePriority = System.currentTimeMillis() - ddl;
+ }
+
+ public void lock() {
+ lock.lock();
+ }
+
+ public void unlock() {
+ lock.unlock();
+ }
+
+ public double getSchedulePriority() {
+ return schedulePriority;
+ }
+
+ public long getDDL() {
+ return ddl;
+ }
+
+ /** a comparator of ddl, the less the ddl is, the low order it has. */
+ public static class TimeoutComparator implements Comparator<FragmentInstanceTask> {
+
+ @Override
+ public int compare(FragmentInstanceTask o1, FragmentInstanceTask o2) {
+ if (o1.getId().equals(o2.getId())) {
+ return 0;
+ }
+ if (o1.getDDL() < o2.getDDL()) {
+ return -1;
+ }
+ if (o1.getDDL() > o2.getDDL()) {
+ return 1;
+ }
+ return o1.getId().compareTo(o2);
+ }
+ }
+
+ /** a comparator of ddl, the higher the schedulePriority is, the low order it has. */
+ public static class SchedulePriorityComparator implements Comparator<FragmentInstanceTask> {
+
+ @Override
+ public int compare(FragmentInstanceTask o1, FragmentInstanceTask o2) {
+ if (o1.getId().equals(o2.getId())) {
+ return 0;
+ }
+ if (o1.getSchedulePriority() > o2.getSchedulePriority()) {
+ return -1;
+ }
+ if (o1.getSchedulePriority() < o2.getSchedulePriority()) {
+ return 1;
+ }
+ return o1.getId().compareTo(o2);
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTaskStatus.java b/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTaskStatus.java
new file mode 100644
index 0000000..5ee9fb3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTaskStatus.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.task;
+
+/** the status enum of {@link FragmentInstanceTask} */
+public enum FragmentInstanceTaskStatus {
+ /* Ready to be executed */
+ READY,
+
+ /* Being executed */
+ RUNNING,
+
+ /* Waiting upstream input or output consumed by downstream FragmentInstances */
+ BLOCKED,
+
+ /* Interrupted caused by timeout or coordinator's cancellation */
+ ABORTED,
+
+ /* Finished by met the EOF of upstream inputs */
+ FINISHED,
+}