You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/10/25 09:31:05 UTC
[iotdb] 01/02: Rerun a new query execution thread while previous one is interrupted
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch ReRunThread
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8c578d08936ba87b7c528f92333ab3976c19aa27
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Tue Oct 25 17:26:44 2022 +0800
Rerun a new query execution thread while previous one is interrupted
---
.../execution/schedule/AbstractDriverThread.java | 51 ++++++++++++----------
.../db/mpp/execution/schedule/DriverScheduler.java | 38 +++++++++++++++-
.../mpp/execution/schedule/DriverTaskThread.java | 5 ++-
.../schedule/DriverTaskTimeoutSentinelThread.java | 5 ++-
.../db/mpp/execution/schedule/ThreadProducer.java | 32 ++++++++++++++
.../DriverTaskTimeoutSentinelThreadTest.java | 23 +++++++---
6 files changed, 120 insertions(+), 34 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java
index b5608a7115..7742f39e33 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java
@@ -34,6 +34,7 @@ public abstract class AbstractDriverThread extends Thread implements Closeable {
private static final Logger logger = LoggerFactory.getLogger(AbstractDriverThread.class);
private final IndexedBlockingQueue<DriverTask> queue;
+ private final ThreadProducer producer;
protected final ITaskScheduler scheduler;
private volatile boolean closed;
@@ -41,43 +42,49 @@ public abstract class AbstractDriverThread extends Thread implements Closeable {
String workerId,
ThreadGroup tg,
IndexedBlockingQueue<DriverTask> queue,
- ITaskScheduler scheduler) {
+ ITaskScheduler scheduler,
+ ThreadProducer producer) {
super(tg, workerId);
this.queue = queue;
this.scheduler = scheduler;
this.closed = false;
+ this.producer = producer;
}
@Override
public void run() {
DriverTask next;
- while (!closed && !Thread.currentThread().isInterrupted()) {
- try {
- next = queue.poll();
- } catch (InterruptedException e) {
- logger.error("Executor " + this.getName() + "failed to poll driver task from queue");
- Thread.currentThread().interrupt();
- break;
- }
+ try {
+ while (!closed && !Thread.currentThread().isInterrupted()) {
+ try {
+ next = queue.poll();
+ } catch (InterruptedException e) {
+ logger.error("Executor " + this.getName() + "failed to poll driver task from queue");
+ Thread.currentThread().interrupt();
+ break;
+ }
- if (next == null) {
- logger.error("DriverTask should never be null");
- continue;
- }
+ if (next == null) {
+ logger.error("DriverTask should never be null");
+ continue;
+ }
- try (SetThreadName fragmentInstanceName =
- new SetThreadName(next.getFragmentInstance().getInfo().getFullId())) {
- execute(next);
- } catch (Throwable t) {
- // try-with-resource syntax will call close once after try block is done, so we need to
- // reset the thread name here
try (SetThreadName fragmentInstanceName =
new SetThreadName(next.getFragmentInstance().getInfo().getFullId())) {
- logger.error("[ExecuteFailed]", t);
- next.setAbortCause(FragmentInstanceAbortedException.BY_INTERNAL_ERROR_SCHEDULED);
- scheduler.toAborted(next);
+ execute(next);
+ } catch (Throwable t) {
+ // try-with-resource syntax will call close once after try block is done, so we need to
+ // reset the thread name here
+ try (SetThreadName fragmentInstanceName =
+ new SetThreadName(next.getFragmentInstance().getInfo().getFullId())) {
+ logger.error("[ExecuteFailed]", t);
+ next.setAbortCause(FragmentInstanceAbortedException.BY_INTERNAL_ERROR_SCHEDULED);
+ scheduler.toAborted(next);
+ }
}
}
+ } finally {
+ producer.produce(getName(), getThreadGroup(), queue, producer);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
index 4416cb84c9..5344cbde56 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
@@ -92,14 +92,48 @@ public class DriverScheduler implements IDriverScheduler, IService {
@Override
public void start() throws StartupException {
for (int i = 0; i < WORKER_THREAD_NUM; i++) {
+ int index = i;
+ String threadName = "Query-Worker-Thread-" + i;
+ ThreadProducer producer =
+ new ThreadProducer() {
+ @Override
+ public void produce(
+ String threadName,
+ ThreadGroup workerGroups,
+ IndexedBlockingQueue<DriverTask> queue,
+ ThreadProducer producer) {
+ DriverTaskThread newThread =
+ new DriverTaskThread(threadName, workerGroups, readyQueue, scheduler, this);
+ threads.set(index, newThread);
+ newThread.start();
+ }
+ };
AbstractDriverThread t =
- new DriverTaskThread("Query-Worker-Thread-" + i, workerGroups, readyQueue, scheduler);
+ new DriverTaskThread(threadName, workerGroups, readyQueue, scheduler, producer);
threads.add(t);
t.start();
}
+
+ String threadName = "Query-Sentinel-Thread";
+ ThreadProducer producer =
+ new ThreadProducer() {
+ @Override
+ public void produce(
+ String threadName,
+ ThreadGroup workerGroups,
+ IndexedBlockingQueue<DriverTask> queue,
+ ThreadProducer producer) {
+ DriverTaskTimeoutSentinelThread newThread =
+ new DriverTaskTimeoutSentinelThread(
+ threadName, workerGroups, timeoutQueue, scheduler, this);
+ threads.set(WORKER_THREAD_NUM, newThread);
+ newThread.start();
+ }
+ };
+
AbstractDriverThread t =
new DriverTaskTimeoutSentinelThread(
- "Query-Sentinel-Thread", workerGroups, timeoutQueue, scheduler);
+ threadName, workerGroups, timeoutQueue, scheduler, producer);
threads.add(t);
t.start();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java
index 52b890797e..28324a0502 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java
@@ -48,8 +48,9 @@ public class DriverTaskThread extends AbstractDriverThread {
String workerId,
ThreadGroup tg,
IndexedBlockingQueue<DriverTask> queue,
- ITaskScheduler scheduler) {
- super(workerId, tg, queue, scheduler);
+ ITaskScheduler scheduler,
+ ThreadProducer producer) {
+ super(workerId, tg, queue, scheduler, producer);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThread.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThread.java
index 7bf30053a5..a9533685a1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThread.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThread.java
@@ -34,8 +34,9 @@ public class DriverTaskTimeoutSentinelThread extends AbstractDriverThread {
String workerId,
ThreadGroup tg,
IndexedBlockingQueue<DriverTask> queue,
- ITaskScheduler scheduler) {
- super(workerId, tg, queue, scheduler);
+ ITaskScheduler scheduler,
+ ThreadProducer producer) {
+ super(workerId, tg, queue, scheduler, producer);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/ThreadProducer.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/ThreadProducer.java
new file mode 100644
index 0000000000..8db6d0c8cb
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/ThreadProducer.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.schedule;
+
+import org.apache.iotdb.db.mpp.execution.schedule.queue.IndexedBlockingQueue;
+import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask;
+
+@FunctionalInterface
+public interface ThreadProducer {
+
+ void produce(
+ String threadName,
+ ThreadGroup workerGroups,
+ IndexedBlockingQueue<DriverTask> queue,
+ ThreadProducer producer);
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java
index 0374a8188a..d61768e849 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java
@@ -40,6 +40,11 @@ import java.util.concurrent.TimeUnit;
public class DriverTaskTimeoutSentinelThreadTest {
+ private static final ThreadProducer producer =
+ (threadName, workerGroups, queue, producer) -> {
+ // do nothing;
+ };
+
@Test
public void testHandleInvalidStateTask() throws ExecutionException, InterruptedException {
ITaskScheduler mockScheduler = Mockito.mock(ITaskScheduler.class);
@@ -62,7 +67,8 @@ public class DriverTaskTimeoutSentinelThreadTest {
Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
AbstractDriverThread executor =
- new DriverTaskThread("0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler);
+ new DriverTaskThread(
+ "0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler, producer);
// FINISHED status test
DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.FINISHED);
@@ -123,7 +129,8 @@ public class DriverTaskTimeoutSentinelThreadTest {
.thenReturn(Futures.immediateCancelledFuture());
AbstractDriverThread executor =
- new DriverTaskThread("0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler);
+ new DriverTaskThread(
+ "0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler, producer);
DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY);
executor.execute(testTask);
Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
@@ -162,7 +169,8 @@ public class DriverTaskTimeoutSentinelThreadTest {
.thenAnswer(ans -> Futures.immediateVoidFuture());
Mockito.when(mockDriver.isFinished()).thenReturn(true);
AbstractDriverThread executor =
- new DriverTaskThread("0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler);
+ new DriverTaskThread(
+ "0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler, producer);
DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY);
executor.execute(testTask);
Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
@@ -210,7 +218,8 @@ public class DriverTaskTimeoutSentinelThreadTest {
Mockito.when(mockDriver.processFor(Mockito.any())).thenAnswer(ans -> mockFuture);
Mockito.when(mockDriver.isFinished()).thenReturn(false);
AbstractDriverThread executor =
- new DriverTaskThread("0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler);
+ new DriverTaskThread(
+ "0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler, producer);
DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY);
executor.execute(testTask);
Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
@@ -259,7 +268,8 @@ public class DriverTaskTimeoutSentinelThreadTest {
Mockito.when(mockDriver.processFor(Mockito.any())).thenAnswer(ans -> mockFuture);
Mockito.when(mockDriver.isFinished()).thenReturn(false);
AbstractDriverThread executor =
- new DriverTaskThread("0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler);
+ new DriverTaskThread(
+ "0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler, producer);
DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY);
executor.execute(testTask);
Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
@@ -292,7 +302,8 @@ public class DriverTaskTimeoutSentinelThreadTest {
FragmentInstanceId instanceId = new FragmentInstanceId(fragmentId, "inst-0");
Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
AbstractDriverThread executor =
- new DriverTaskThread("0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler);
+ new DriverTaskThread(
+ "0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler, producer);
Mockito.when(mockDriver.processFor(Mockito.any()))
.thenAnswer(
ans -> {