You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/22 01:23:42 UTC
[iotdb] branch master updated: Add FragmentInstanceStateMachine for FragmentInstance State change (#5615)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f9226c380d Add FragmentInstanceStateMachine for FragmentInstance State change (#5615)
f9226c380d is described below
commit f9226c380dc262f9966717a53ae80a72885824d3
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Fri Apr 22 09:23:37 2022 +0800
Add FragmentInstanceStateMachine for FragmentInstance State change (#5615)
---
pom.xml | 5 +
server/pom.xml | 4 +
.../iotdb/db/mpp/buffer/DataBlockManager.java | 12 +-
.../apache/iotdb/db/mpp/buffer/ISinkHandle.java | 7 +-
.../org/apache/iotdb/db/mpp/buffer/SinkHandle.java | 29 +-
.../apache/iotdb/db/mpp/buffer/StubSinkHandle.java | 4 +-
.../apache/iotdb/db/mpp/execution/DataDriver.java | 189 ++-------
.../org/apache/iotdb/db/mpp/execution/Driver.java | 426 ++++++++++++++++++++-
.../iotdb/db/mpp/execution/DriverContext.java | 13 +-
.../db/mpp/execution/FragmentInstanceContext.java | 95 +++--
.../mpp/execution/FragmentInstanceExecution.java | 66 +++-
...t.java => FragmentInstanceFailureListener.java} | 29 +-
.../db/mpp/execution/FragmentInstanceManager.java | 57 ++-
.../db/mpp/execution/FragmentInstanceState.java | 2 +-
.../execution/FragmentInstanceStateMachine.java | 182 +++++++++
.../execution/{DriverContext.java => IDriver.java} | 30 +-
.../iotdb/db/mpp/execution/SchemaDriver.java | 140 +------
.../org/apache/iotdb/db/mpp/operator/Operator.java | 4 +-
.../db/mpp/operator/process/LimitOperator.java | 4 +-
.../db/mpp/operator/process/TransformOperator.java | 2 +-
.../mpp/operator/schema/SchemaFetchOperator.java | 3 +-
.../db/mpp/operator/source/ExchangeOperator.java | 2 +-
.../schedule/FragmentInstanceAbortedException.java | 4 +-
.../db/mpp/schedule/FragmentInstanceScheduler.java | 4 +-
.../mpp/schedule/FragmentInstanceTaskExecutor.java | 4 +-
.../mpp/schedule/IFragmentInstanceScheduler.java | 6 +-
.../db/mpp/schedule/task/FragmentInstanceTask.java | 16 +-
.../apache/iotdb/db/mpp/buffer/SinkHandleTest.java | 68 +---
.../iotdb/db/mpp/execution/DataDriverTest.java | 30 +-
.../iotdb/db/mpp/operator/LimitOperatorTest.java | 19 +-
.../operator/SeriesAggregateScanOperatorTest.java | 15 +-
.../db/mpp/operator/SeriesScanOperatorTest.java | 19 +-
.../db/mpp/operator/TimeJoinOperatorTest.java | 19 +-
.../operator/schema/SchemaScanOperatorTest.java | 32 +-
.../db/mpp/schedule/DefaultTaskSchedulerTest.java | 16 +-
.../schedule/FragmentInstanceSchedulerTest.java | 12 +-
.../FragmentInstanceTimeoutSentinelTest.java | 12 +-
37 files changed, 995 insertions(+), 586 deletions(-)
diff --git a/pom.xml b/pom.xml
index 912e982c23..5abd10d608 100644
--- a/pom.xml
+++ b/pom.xml
@@ -498,6 +498,11 @@
<artifactId>slice</artifactId>
<version>0.41</version>
</dependency>
+ <dependency>
+ <groupId>io.airlift</groupId>
+ <artifactId>stats</artifactId>
+ <version>200</version>
+ </dependency>
<dependency>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>
diff --git a/server/pom.xml b/server/pom.xml
index cf18fa5054..791dd03a5a 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -89,6 +89,10 @@
<groupId>io.airlift</groupId>
<artifactId>units</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.airlift</groupId>
+ <artifactId>stats</artifactId>
+ </dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>airline</artifactId>
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
index e09acf28cb..7a38ef6d09 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
@@ -61,6 +61,8 @@ public class DataBlockManager implements IDataBlockManager {
void onClosed(SinkHandle sinkHandle);
void onAborted(SinkHandle sinkHandle);
+
+ void onFailure(Throwable t);
}
/** Handle thrift communications. */
@@ -166,6 +168,7 @@ public class DataBlockManager implements IDataBlockManager {
/** Listen to the state changes of a source handle. */
class SourceHandleListenerImpl implements SourceHandleListener {
+
@Override
public void onFinished(SourceHandle sourceHandle) {
logger.info("Release resources of finished source handle {}", sourceHandle);
@@ -208,12 +211,12 @@ public class DataBlockManager implements IDataBlockManager {
logger.info("Resources of finished sink handle {} has already been released", sinkHandle);
}
sinkHandles.remove(sinkHandle.getLocalFragmentInstanceId());
- context.finish();
+ context.finished();
}
@Override
public void onClosed(SinkHandle sinkHandle) {
- context.flushing();
+ context.transitionToFlushing();
}
@Override
@@ -224,6 +227,11 @@ public class DataBlockManager implements IDataBlockManager {
}
sinkHandles.remove(sinkHandle.getLocalFragmentInstanceId());
}
+
+ @Override
+ public void onFailure(Throwable t) {
+ context.failed(t);
+ }
}
private final LocalMemoryManager localMemoryManager;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
index b27e86186b..99f92e3f45 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
@@ -25,7 +25,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.util.List;
-public interface ISinkHandle extends AutoCloseable {
+public interface ISinkHandle {
/** Get the total amount of memory used by buffered tsblocks. */
long getBufferRetainedSizeInBytes();
@@ -41,7 +41,7 @@ public interface ISinkHandle extends AutoCloseable {
* the invocation will be ignored. This can happen with limit queries. A {@link RuntimeException}
* will be thrown if any exception happened during the data transmission.
*/
- void send(List<TsBlock> tsBlocks) throws IOException;
+ void send(List<TsBlock> tsBlocks);
/**
* Send a {@link TsBlock} to a specific partition. If no-more-tsblocks has been set, the send
@@ -70,8 +70,7 @@ public interface ISinkHandle extends AutoCloseable {
* downstream instances. A {@link RuntimeException} will be thrown if any exception happened
* during the data transmission.
*/
- @Override
- void close() throws IOException;
+ void close();
/**
* Abort the sink handle. Discard all tsblocks which may still be in the memory buffer and cancel
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
index f66d8b49f3..c540103126 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
@@ -74,7 +74,6 @@ public class SinkHandle implements ISinkHandle {
private long bufferRetainedSizeInBytes;
private boolean closed;
private boolean noMoreTsBlocks;
- private Throwable throwable;
public SinkHandle(
TEndPoint remoteEndpoint,
@@ -114,11 +113,8 @@ public class SinkHandle implements ISinkHandle {
}
@Override
- public void send(List<TsBlock> tsBlocks) throws IOException {
+ public void send(List<TsBlock> tsBlocks) {
Validate.notNull(tsBlocks, "tsBlocks is null");
- if (throwable != null) {
- throw new IOException(throwable);
- }
if (closed) {
throw new IllegalStateException("Sink handle is closed.");
}
@@ -194,24 +190,21 @@ public class SinkHandle implements ISinkHandle {
}
@Override
- public void close() throws IOException {
+ public void close() {
logger.info("Sink handle {} is being closed.", this);
- if (throwable != null) {
- throw new IOException(throwable);
- }
if (closed) {
return;
}
+ try {
+ sendEndOfDataBlockEvent();
+ } catch (TException e) {
+ throw new RuntimeException("Send EndOfDataBlockEvent failed", e);
+ }
synchronized (this) {
closed = true;
noMoreTsBlocks = true;
}
sinkHandleListener.onClosed(this);
- try {
- sendEndOfDataBlockEvent();
- } catch (TException e) {
- throw new IOException(e);
- }
logger.info("Sink handle {} is closed.", this);
}
@@ -247,7 +240,7 @@ public class SinkHandle implements ISinkHandle {
@Override
public boolean isFinished() {
- return throwable == null && noMoreTsBlocks && sequenceIdToTsBlock.isEmpty();
+ return noMoreTsBlocks && sequenceIdToTsBlock.isEmpty();
}
@Override
@@ -362,7 +355,7 @@ public class SinkHandle implements ISinkHandle {
try {
client.onNewDataBlockEvent(newDataBlockEvent);
break;
- } catch (TException e) {
+ } catch (Throwable e) {
logger.error(
"Failed to send new data block event to plan node {} of {} due to {}, attempt times: {}",
remotePlanNodeId,
@@ -371,9 +364,7 @@ public class SinkHandle implements ISinkHandle {
attempt,
e);
if (attempt == MAX_ATTEMPT_TIMES) {
- synchronized (this) {
- throwable = e;
- }
+ sinkHandleListener.onFailure(e);
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
index 9f3b9240c2..49a13eddd7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
@@ -59,7 +59,7 @@ public class StubSinkHandle implements ISinkHandle {
}
@Override
- public void send(List<TsBlock> tsBlocks) throws IOException {
+ public void send(List<TsBlock> tsBlocks) {
this.tsBlocks.addAll(tsBlocks);
}
@@ -87,7 +87,7 @@ public class StubSinkHandle implements ISinkHandle {
return;
}
closed = true;
- instanceContext.flushing();
+ instanceContext.transitionToFlushing();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
index 5ae66b14d7..e6e8e8083a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.db.mpp.execution;
-import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -26,160 +25,70 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.idtable.IDTable;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
-import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.operator.Operator;
import org.apache.iotdb.db.mpp.operator.source.DataSourceOperator;
import org.apache.iotdb.db.query.control.FileReaderManager;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
-import io.airlift.units.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.NotThreadSafe;
-import java.io.IOException;
-import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
-import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
-import static org.apache.iotdb.db.mpp.operator.Operator.NOT_BLOCKED;
-
/**
* One dataDriver is responsible for one FragmentInstance which is for data query, which may
* contains several series.
*/
@NotThreadSafe
-public class DataDriver implements Driver {
-
- private static final Logger logger = LoggerFactory.getLogger(DataDriver.class);
-
- private final Operator root;
- private final ISinkHandle sinkHandle;
- private final DataDriverContext driverContext;
+public class DataDriver extends Driver {
private boolean init;
- private boolean closed;
/** closed tsfile used in this fragment instance */
private Set<TsFileResource> closedFilePaths;
/** unClosed tsfile used in this fragment instance */
private Set<TsFileResource> unClosedFilePaths;
- private final AtomicReference<SettableFuture<Void>> driverBlockedFuture = new AtomicReference<>();
-
public DataDriver(Operator root, ISinkHandle sinkHandle, DataDriverContext driverContext) {
- this.root = root;
- this.sinkHandle = sinkHandle;
- this.driverContext = driverContext;
+ super(root, sinkHandle, driverContext);
this.closedFilePaths = new HashSet<>();
this.unClosedFilePaths = new HashSet<>();
- // initially the driverBlockedFuture is not blocked (it is completed)
- SettableFuture<Void> future = SettableFuture.create();
- future.set(null);
- driverBlockedFuture.set(future);
- }
-
- @Override
- public boolean isFinished() {
- try {
- boolean isFinished =
- closed || (driverBlockedFuture.get().isDone() && root != null && root.isFinished());
- if (isFinished) {
- close();
- }
- return isFinished;
- } catch (Throwable t) {
- logger.error(
- "Failed to query whether the data driver {} is finished", driverContext.getId(), t);
- driverContext.failed(t);
- close();
- return true;
- }
}
@Override
- public ListenableFuture<Void> processFor(Duration duration) {
-
- SettableFuture<Void> blockedFuture = driverBlockedFuture.get();
- // initialization may be time-consuming, so we keep it in the processFor method
- // in normal case, it won't cause deadlock and should finish soon, otherwise it will be a
- // critical bug
+ protected boolean init(SettableFuture<Void> blockedFuture) {
if (!init) {
try {
initialize();
} catch (Throwable t) {
- logger.error(
+ LOGGER.error(
"Failed to do the initialization for fragment instance {} ", driverContext.getId(), t);
driverContext.failed(t);
- close();
blockedFuture.setException(t);
- return blockedFuture;
+ return false;
}
}
-
- // if the driver is blocked we don't need to continue
- if (!blockedFuture.isDone()) {
- return blockedFuture;
- }
-
- long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS);
-
- long start = System.nanoTime();
- try {
- do {
- ListenableFuture<Void> future = processInternal();
- if (!future.isDone()) {
- return updateDriverBlockedFuture(future);
- }
- } while (System.nanoTime() - start < maxRuntime && !root.isFinished());
- } catch (Throwable t) {
- logger.error("Failed to execute fragment instance {}", driverContext.getId(), t);
- driverContext.failed(t);
- close();
- blockedFuture.setException(t);
- return blockedFuture;
- }
- return NOT_BLOCKED;
- }
-
- @Override
- public FragmentInstanceId getInfo() {
- return driverContext.getId();
+ return true;
}
+ /**
+ * All file paths used by this fragment instance must be cleared and thus the usage reference must
+ * be decreased.
+ */
@Override
- public void close() {
- if (closed) {
- return;
+ protected void releaseResource() {
+ for (TsFileResource tsFile : closedFilePaths) {
+ FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, true);
}
- closed = true;
- try {
- if (root != null) {
- root.close();
- }
- if (sinkHandle != null) {
- sinkHandle.close();
- }
- } catch (Throwable t) {
- logger.error("Failed to closed driver {}", driverContext.getId(), t);
- driverContext.failed(t);
- } finally {
- removeUsedFilesForQuery();
+ closedFilePaths = null;
+ for (TsFileResource tsFile : unClosedFilePaths) {
+ FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, true);
}
- }
-
- @Override
- public void failed(Throwable t) {
- driverContext.failed(t);
+ unClosedFilePaths = null;
}
/**
@@ -187,7 +96,8 @@ public class DataDriver implements Driver {
* we should change all the blocked lock operation into tryLock
*/
private void initialize() throws QueryProcessException {
- List<DataSourceOperator> sourceOperators = driverContext.getSourceOperators();
+ List<DataSourceOperator> sourceOperators =
+ ((DataDriverContext) driverContext).getSourceOperators();
if (sourceOperators != null && !sourceOperators.isEmpty()) {
QueryDataSource dataSource = initQueryDataSourceCache();
sourceOperators.forEach(
@@ -210,13 +120,12 @@ public class DataDriver implements Driver {
* QueryDataSource needed for this query
*/
public QueryDataSource initQueryDataSourceCache() throws QueryProcessException {
- DataRegion dataRegion = driverContext.getDataRegion();
+ DataDriverContext context = (DataDriverContext) driverContext;
+ DataRegion dataRegion = context.getDataRegion();
dataRegion.readLock();
try {
List<PartialPath> pathList =
- driverContext.getPaths().stream()
- .map(IDTable::translateQueryPath)
- .collect(Collectors.toList());
+ context.getPaths().stream().map(IDTable::translateQueryPath).collect(Collectors.toList());
// when all the selected series are under the same device, the QueryDataSource will be
// filtered according to timeIndex
Set<String> selectedDeviceIdSet =
@@ -227,7 +136,7 @@ public class DataDriver implements Driver {
pathList,
selectedDeviceIdSet.size() == 1 ? selectedDeviceIdSet.iterator().next() : null,
driverContext.getFragmentInstanceContext(),
- driverContext.getTimeFilter());
+ context.getTimeFilter());
// used files should be added before mergeLock is unlocked, or they may be deleted by
// running merge
@@ -268,67 +177,17 @@ public class DataDriver implements Driver {
}
}
- /**
- * All file paths used by this fragment instance must be cleared and thus the usage reference must
- * be decreased.
- */
- private void removeUsedFilesForQuery() {
- for (TsFileResource tsFile : closedFilePaths) {
- FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, true);
- }
- closedFilePaths = null;
- for (TsFileResource tsFile : unClosedFilePaths) {
- FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, true);
- }
- unClosedFilePaths = null;
- }
-
/**
* Increase the usage reference of filePath of job id. Before the invoking of this method, <code>
* this.setqueryIdForCurrentRequestThread</code> has been invoked, so <code>
* sealedFilePathsMap.get(queryId)</code> or <code>unsealedFilePathsMap.get(queryId)</code> must
* not return null.
*/
- void addFilePathToMap(TsFileResource tsFile, boolean isClosed) {
+ private void addFilePathToMap(TsFileResource tsFile, boolean isClosed) {
Set<TsFileResource> pathSet = isClosed ? closedFilePaths : unClosedFilePaths;
if (!pathSet.contains(tsFile)) {
pathSet.add(tsFile);
FileReaderManager.getInstance().increaseFileReaderReference(tsFile, isClosed);
}
}
-
- private ListenableFuture<Void> processInternal() throws IOException, IoTDBException {
- ListenableFuture<Void> blocked = root.isBlocked();
- if (!blocked.isDone()) {
- return blocked;
- }
- blocked = sinkHandle.isFull();
- if (!blocked.isDone()) {
- return blocked;
- }
- if (root.hasNext()) {
- TsBlock tsBlock = root.next();
- if (tsBlock != null && !tsBlock.isEmpty()) {
- sinkHandle.send(Collections.singletonList(tsBlock));
- }
- }
- return NOT_BLOCKED;
- }
-
- private ListenableFuture<Void> updateDriverBlockedFuture(
- ListenableFuture<Void> sourceBlockedFuture) {
- // driverBlockedFuture will be completed as soon as the sourceBlockedFuture is completed
- // or any of the operators gets a memory revocation request
- SettableFuture<Void> newDriverBlockedFuture = SettableFuture.create();
- driverBlockedFuture.set(newDriverBlockedFuture);
- sourceBlockedFuture.addListener(() -> newDriverBlockedFuture.set(null), directExecutor());
-
- // TODO Although we don't have memory management for operator now, we should consider it for
- // future
- // it's possible that memory revoking is requested for some operator
- // before we update driverBlockedFuture above and we don't want to miss that
- // notification, so we check to see whether that's the case before returning.
-
- return newDriverBlockedFuture;
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java
index f211ce593c..40da49ff15 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java
@@ -18,25 +18,94 @@
*/
package org.apache.iotdb.db.mpp.execution;
+import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.operator.Operator;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import io.airlift.units.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.io.Closeable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Throwables.throwIfUnchecked;
+import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
+import static java.lang.Boolean.TRUE;
+import static org.apache.iotdb.db.mpp.operator.Operator.NOT_BLOCKED;
/**
* Driver encapsulates some methods which are necessary for execution scheduler to run a fragment
* instance
*/
-public interface Driver extends Closeable {
+public abstract class Driver implements IDriver {
+
+ protected static final Logger LOGGER = LoggerFactory.getLogger(Driver.class);
+
+ protected final Operator root;
+ protected final ISinkHandle sinkHandle;
+ protected final DriverContext driverContext;
+ protected final AtomicReference<SettableFuture<Void>> driverBlockedFuture =
+ new AtomicReference<>();
+ protected final AtomicReference<State> state = new AtomicReference<>(State.ALIVE);
+
+ protected final DriverLock exclusiveLock = new DriverLock();
+
+ protected enum State {
+ ALIVE,
+ NEED_DESTRUCTION,
+ DESTROYED
+ }
+
+ public Driver(Operator root, ISinkHandle sinkHandle, DriverContext driverContext) {
+ checkNotNull(root, "root Operator should not be null");
+ checkNotNull(sinkHandle, "SinkHandle should not be null");
+ this.root = root;
+ this.sinkHandle = sinkHandle;
+ this.driverContext = driverContext;
+
+ // initially the driverBlockedFuture is not blocked (it is completed)
+ SettableFuture<Void> future = SettableFuture.create();
+ future.set(null);
+ driverBlockedFuture.set(future);
+ }
/**
* Used to judge whether this fragment instance should be scheduled for execution anymore
*
* @return true if the FragmentInstance is done or terminated due to failure, otherwise false.
*/
- boolean isFinished();
+ @Override
+ public boolean isFinished() {
+ checkLockNotHeld("Cannot check finished status while holding the driver lock");
+
+ // if we can get the lock, attempt a clean shutdown; otherwise someone else will shutdown
+ Optional<Boolean> result = tryWithLockUnInterruptibly(this::isFinishedInternal);
+ return result.orElseGet(() -> state.get() != State.ALIVE || driverContext.isDone());
+ }
+
+ /**
+ * do initialization
+ *
+ * @return true if init succeed, false otherwise
+ */
+ protected abstract boolean init(SettableFuture<Void> blockedFuture);
+
+ /** release resource this driver used */
+ protected abstract void releaseResource();
/**
* run the fragment instance for {@param duration} time slice, the time of this run is likely not
@@ -48,23 +117,366 @@ public interface Driver extends Closeable {
* next processing otherwise, meaning that this fragment instance is blocked and not ready for
* next processing.
*/
- ListenableFuture<Void> processFor(Duration duration);
+ @Override
+ public ListenableFuture<Void> processFor(Duration duration) {
+
+ SettableFuture<Void> blockedFuture = driverBlockedFuture.get();
+ // initialization may be time-consuming, so we keep it in the processFor method
+ // in normal case, it won't cause deadlock and should finish soon, otherwise it will be a
+ // critical bug
+ if (!init(blockedFuture)) {
+ return blockedFuture;
+ }
+
+ // if the driver is blocked we don't need to continue
+ if (!blockedFuture.isDone()) {
+ return blockedFuture;
+ }
+
+ long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS);
+
+ Optional<ListenableFuture<Void>> result =
+ tryWithLock(
+ 100,
+ TimeUnit.MILLISECONDS,
+ true,
+ () -> {
+ long start = System.nanoTime();
+ do {
+ ListenableFuture<Void> future = processInternal();
+ if (!future.isDone()) {
+ return updateDriverBlockedFuture(future);
+ }
+ } while (System.nanoTime() - start < maxRuntime && !isFinishedInternal());
+ return NOT_BLOCKED;
+ });
+
+ return result.orElse(NOT_BLOCKED);
+ }
/**
* the id information about this Fragment Instance.
*
* @return a {@link FragmentInstanceId} instance.
*/
- FragmentInstanceId getInfo();
+ @Override
+ public FragmentInstanceId getInfo() {
+ return driverContext.getId();
+ }
/** clear resource used by this fragment instance */
@Override
- void close();
+ public void close() {
+ // mark the service for destruction
+ if (!state.compareAndSet(State.ALIVE, State.NEED_DESTRUCTION)) {
+ return;
+ }
+
+ exclusiveLock.interruptCurrentOwner();
+
+ // if we can get the lock, attempt a clean shutdown; otherwise someone else will shut down
+ tryWithLockUnInterruptibly(() -> TRUE);
+ }
/**
* fail current driver
*
* @param t reason cause this failure
*/
- void failed(Throwable t);
+ @Override
+ public void failed(Throwable t) {
+ driverContext.failed(t);
+ }
+
+ @Override
+ public ISinkHandle getSinkHandle() {
+ return sinkHandle;
+ }
+
+ @GuardedBy("exclusiveLock")
+ private boolean isFinishedInternal() {
+ checkLockHeld("Lock must be held to call isFinishedInternal");
+
+ boolean finished = state.get() != State.ALIVE || driverContext.isDone() || root.isFinished();
+ if (finished) {
+ state.compareAndSet(State.ALIVE, State.NEED_DESTRUCTION);
+ }
+ return finished;
+ }
+
+ private ListenableFuture<Void> processInternal() {
+ try {
+ ListenableFuture<Void> blocked = root.isBlocked();
+ if (!blocked.isDone()) {
+ return blocked;
+ }
+ blocked = sinkHandle.isFull();
+ if (!blocked.isDone()) {
+ return blocked;
+ }
+ if (root.hasNext()) {
+ TsBlock tsBlock = root.next();
+ if (tsBlock != null && !tsBlock.isEmpty()) {
+ sinkHandle.send(Collections.singletonList(tsBlock));
+ }
+ }
+ return NOT_BLOCKED;
+ } catch (Throwable t) {
+ LOGGER.error("Failed to execute fragment instance {}", driverContext.getId(), t);
+ List<StackTraceElement> interrupterStack = exclusiveLock.getInterrupterStack();
+ if (interrupterStack == null) {
+ driverContext.failed(t);
+ throw t;
+ }
+
+ // Driver thread was interrupted which should only happen if the task is already finished.
+ // If this becomes the actual cause of a failed query there is a bug in the task state
+ // machine.
+ Exception exception = new Exception("Interrupted By");
+ exception.setStackTrace(interrupterStack.toArray(new StackTraceElement[0]));
+ RuntimeException newException = new RuntimeException("Driver was interrupted", exception);
+ newException.addSuppressed(t);
+ driverContext.failed(newException);
+ throw newException;
+ }
+ }
+
+ private ListenableFuture<Void> updateDriverBlockedFuture(
+ ListenableFuture<Void> sourceBlockedFuture) {
+ // driverBlockedFuture will be completed as soon as the sourceBlockedFuture is completed
+ // or any of the operators gets a memory revocation request
+ SettableFuture<Void> newDriverBlockedFuture = SettableFuture.create();
+ driverBlockedFuture.set(newDriverBlockedFuture);
+ sourceBlockedFuture.addListener(() -> newDriverBlockedFuture.set(null), directExecutor());
+
+ // TODO Although we don't have memory management for operator now, we should consider it for
+ // future
+ // it's possible that memory revoking is requested for some operator
+ // before we update driverBlockedFuture above and we don't want to miss that
+ // notification, so we check to see whether that's the case before returning.
+
+ return newDriverBlockedFuture;
+ }
+
+ private synchronized void checkLockNotHeld(String message) {
+ checkState(!exclusiveLock.isHeldByCurrentThread(), message);
+ }
+
+ @GuardedBy("exclusiveLock")
+ private synchronized void checkLockHeld(String message) {
+ checkState(exclusiveLock.isHeldByCurrentThread(), message);
+ }
+
+ /**
+ * Try to acquire the {@code exclusiveLock} immediately and run a {@code task} The task will not
+ * be interrupted if the {@code Driver} is closed.
+ *
+ * <p>Note: task cannot return null
+ */
+ private <T> Optional<T> tryWithLockUnInterruptibly(Supplier<T> task) {
+ return tryWithLock(0, TimeUnit.MILLISECONDS, false, task);
+ }
+
+ /**
+ * Try to acquire the {@code exclusiveLock} with {@code timeout} and run a {@code task}. If the
+ * {@code interruptOnClose} flag is set to {@code true} the {@code task} will be interrupted if
+ * the {@code Driver} is closed.
+ *
+ * <p>Note: task cannot return null
+ */
+ private <T> Optional<T> tryWithLock(
+ long timeout, TimeUnit unit, boolean interruptOnClose, Supplier<T> task) {
+ checkLockNotHeld("Lock cannot be reacquired");
+
+ boolean acquired = false;
+ try {
+ acquired = exclusiveLock.tryLock(timeout, unit, interruptOnClose);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ if (!acquired) {
+ return Optional.empty();
+ }
+
+ Optional<T> result;
+ try {
+ result = Optional.of(task.get());
+ } finally {
+ try {
+ destroyIfNecessary();
+ } finally {
+ exclusiveLock.unlock();
+ }
+ }
+
+ // We need to recheck whether the state is NEED_DESTRUCTION, if so, destroy the driver.
+ // We assume that there is another concurrent Thread-A calling close method, it successfully CAS
+ // state from ALIVE to NEED_DESTRUCTION just after current Thread-B do destroyIfNecessary() and
+ // before exclusiveLock.unlock().
+ // Then Thread-A call this method, trying to acquire lock and do destroy things, but it won't
+ // succeed because the lock is still held by Thread-B. So Thread-A exit.
+ // If we don't do this recheck here, Thread-B will exit too. Nobody will do destroy things.
+ if (state.get() == State.NEED_DESTRUCTION && exclusiveLock.tryLock(interruptOnClose)) {
+ try {
+ destroyIfNecessary();
+ } finally {
+ exclusiveLock.unlock();
+ }
+ }
+
+ return result;
+ }
+
+ @GuardedBy("exclusiveLock")
+ private void destroyIfNecessary() {
+ checkLockHeld("Lock must be held to call destroyIfNecessary");
+
+ if (!state.compareAndSet(State.NEED_DESTRUCTION, State.DESTROYED)) {
+ return;
+ }
+
+ // if we get an error while closing a driver, record it and we will throw it at the end
+ Throwable inFlightException = null;
+ try {
+ inFlightException = closeAndDestroyOperators();
+ driverContext.finished();
+ } catch (Throwable t) {
+ // this shouldn't happen but be safe
+ inFlightException =
+ addSuppressedException(
+ inFlightException, t, "Error destroying driver for task %s", driverContext.getId());
+ } finally {
+ releaseResource();
+ }
+
+ if (inFlightException != null) {
+ // this will always be an Error or Runtime
+ throwIfUnchecked(inFlightException);
+ throw new RuntimeException(inFlightException);
+ }
+ }
+
+ private Throwable closeAndDestroyOperators() {
+ // record the current interrupted status (and clear the flag); we'll reset it later
+ boolean wasInterrupted = Thread.interrupted();
+
+ Throwable inFlightException = null;
+
+ try {
+ root.close();
+ sinkHandle.close();
+ } catch (InterruptedException t) {
+ // don't record the stack
+ wasInterrupted = true;
+ } catch (Throwable t) {
+ // TODO currently, we won't know exact operator which is failed in closing
+ inFlightException =
+ addSuppressedException(
+ inFlightException,
+ t,
+ "Error closing operator {} for fragment instance {}",
+ root.getOperatorContext().getOperatorId(),
+ driverContext.getId());
+ } finally {
+ // reset the interrupted flag
+ if (wasInterrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ return inFlightException;
+ }
+
+ private static Throwable addSuppressedException(
+ Throwable inFlightException, Throwable newException, String message, Object... args) {
+ if (newException instanceof Error) {
+ if (inFlightException == null) {
+ inFlightException = newException;
+ } else {
+ // Self-suppression not permitted
+ if (inFlightException != newException) {
+ inFlightException.addSuppressed(newException);
+ }
+ }
+ } else {
+ // log normal exceptions instead of rethrowing them
+ LOGGER.error(message, args, newException);
+ }
+ return inFlightException;
+ }
+
+ private static class DriverLock {
+ private final ReentrantLock lock = new ReentrantLock();
+
+ @GuardedBy("this")
+ private Thread currentOwner;
+
+ @GuardedBy("this")
+ private boolean currentOwnerInterruptionAllowed;
+
+ @GuardedBy("this")
+ private List<StackTraceElement> interrupterStack;
+
+ public boolean isHeldByCurrentThread() {
+ return lock.isHeldByCurrentThread();
+ }
+
+ public boolean tryLock(boolean currentThreadInterruptionAllowed) {
+ checkState(!lock.isHeldByCurrentThread(), "Lock is not reentrant");
+ boolean acquired = lock.tryLock();
+ if (acquired) {
+ setOwner(currentThreadInterruptionAllowed);
+ }
+ return acquired;
+ }
+
+ public boolean tryLock(long timeout, TimeUnit unit, boolean currentThreadInterruptionAllowed)
+ throws InterruptedException {
+ checkState(!lock.isHeldByCurrentThread(), "Lock is not reentrant");
+ boolean acquired = lock.tryLock(timeout, unit);
+ if (acquired) {
+ setOwner(currentThreadInterruptionAllowed);
+ }
+ return acquired;
+ }
+
+ private synchronized void setOwner(boolean interruptionAllowed) {
+ checkState(lock.isHeldByCurrentThread(), "Current thread does not hold lock");
+ currentOwner = Thread.currentThread();
+ currentOwnerInterruptionAllowed = interruptionAllowed;
+ // NOTE: We do not use interrupted stack information to know that another
+ // thread has attempted to interrupt the driver, and interrupt this new lock
+ // owner. The interrupted stack information is for debugging purposes only.
+ // In the case of interruption, the caller should (and does) have a separate
+ // state to prevent further processing in the Driver.
+ }
+
+ public synchronized void unlock() {
+ checkState(lock.isHeldByCurrentThread(), "Current thread does not hold lock");
+ currentOwner = null;
+ currentOwnerInterruptionAllowed = false;
+ lock.unlock();
+ }
+
+ public synchronized List<StackTraceElement> getInterrupterStack() {
+ return interrupterStack;
+ }
+
+ public synchronized void interruptCurrentOwner() {
+ if (!currentOwnerInterruptionAllowed) {
+ return;
+ }
+ // there is a benign race condition here were the lock holder
+ // can be change between attempting to get lock and grabbing
+ // the synchronized lock here, but in either case we want to
+ // interrupt the lock holder thread
+ if (interrupterStack == null) {
+ interrupterStack = ImmutableList.copyOf(Thread.currentThread().getStackTrace());
+ }
+
+ if (currentOwner != null) {
+ currentOwner.interrupt();
+ }
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
index 8c20a2c334..8985508b0c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
@@ -20,10 +20,14 @@ package org.apache.iotdb.db.mpp.execution;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import java.util.concurrent.atomic.AtomicBoolean;
+
public class DriverContext {
private final FragmentInstanceContext fragmentInstanceContext;
+ private final AtomicBoolean finished = new AtomicBoolean();
+
public DriverContext(FragmentInstanceContext fragmentInstanceContext) {
this.fragmentInstanceContext = fragmentInstanceContext;
}
@@ -38,13 +42,14 @@ public class DriverContext {
public void failed(Throwable cause) {
fragmentInstanceContext.failed(cause);
+ finished.set(true);
}
- public void finish() {
- fragmentInstanceContext.finish();
+ public void finished() {
+ finished.compareAndSet(false, true);
}
- public void flushing() {
- fragmentInstanceContext.flushing();
+ public boolean isDone() {
+ return finished.get();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
index b2e51be2bc..c5a3232c18 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static com.google.common.base.Preconditions.checkArgument;
@@ -41,14 +42,19 @@ public class FragmentInstanceContext extends QueryContext {
// TODO if we split one fragment instance into multiple pipelines to run, we need to replace it
// with CopyOnWriteArrayList or some other thread safe data structure
private final List<OperatorContext> operatorContexts = new ArrayList<>();
- private final long createNanos = System.nanoTime();
private DriverContext driverContext;
- // TODO we may use StateMachine<FragmentInstanceState> to replace it
- private final AtomicReference<FragmentInstanceState> state;
+ private final FragmentInstanceStateMachine stateMachine;
+
+ private final long createNanos = System.nanoTime();
+
+ private final AtomicLong startNanos = new AtomicLong();
+ private final AtomicLong endNanos = new AtomicLong();
- private long endTime = -1;
+ private final AtomicReference<Long> executionStartTime = new AtomicReference<>();
+ private final AtomicReference<Long> lastExecutionStartTime = new AtomicReference<>();
+ private final AtomicReference<Long> executionEndTime = new AtomicReference<>();
// private final GcMonitor gcMonitor;
// private final AtomicLong startNanos = new AtomicLong();
@@ -58,10 +64,51 @@ public class FragmentInstanceContext extends QueryContext {
// private final AtomicLong endFullGcCount = new AtomicLong(-1);
// private final AtomicLong endFullGcTimeNanos = new AtomicLong(-1);
- public FragmentInstanceContext(
- FragmentInstanceId id, AtomicReference<FragmentInstanceState> state) {
+ public static FragmentInstanceContext createFragmentInstanceContext(
+ FragmentInstanceId id, FragmentInstanceStateMachine stateMachine) {
+ FragmentInstanceContext instanceContext = new FragmentInstanceContext(id, stateMachine);
+ instanceContext.initialize();
+ return instanceContext;
+ }
+
+ private FragmentInstanceContext(
+ FragmentInstanceId id, FragmentInstanceStateMachine stateMachine) {
this.id = id;
- this.state = state;
+ this.stateMachine = stateMachine;
+ }
+
+ public void start() {
+ long now = System.currentTimeMillis();
+ executionStartTime.compareAndSet(null, now);
+ startNanos.compareAndSet(0, System.nanoTime());
+
+ // always update last execution start time
+ lastExecutionStartTime.set(now);
+ }
+
+ // the state change listener is added here in a separate initialize() method
+ // instead of the constructor to prevent leaking the "this" reference to
+ // another thread, which will cause unsafe publication of this instance.
+ private void initialize() {
+ stateMachine.addStateChangeListener(this::updateStatsIfDone);
+ }
+
+ private void updateStatsIfDone(FragmentInstanceState newState) {
+ if (newState.isDone()) {
+ long now = System.currentTimeMillis();
+
+ // before setting the end times, make sure a start has been recorded
+ executionStartTime.compareAndSet(null, now);
+ startNanos.compareAndSet(0, System.nanoTime());
+
+ // Only update last start time, if the nothing was started
+ lastExecutionStartTime.compareAndSet(null, now);
+
+ // use compare and set from initial value to avoid overwriting if there
+ // were a duplicate notification, which shouldn't happen
+ executionEndTime.compareAndSet(null, now);
+ endNanos.compareAndSet(0, System.nanoTime());
+ }
}
public OperatorContext addOperatorContext(
@@ -98,40 +145,18 @@ public class FragmentInstanceContext extends QueryContext {
}
public void failed(Throwable cause) {
- LOGGER.warn("Fragment Instance {} failed.", id, cause);
- state.set(FragmentInstanceState.FAILED);
- }
-
- public void cancel() {
- state.set(FragmentInstanceState.CANCELED);
- this.endTime = System.currentTimeMillis();
+ stateMachine.failed(cause);
}
- public void abort() {
- state.set(FragmentInstanceState.ABORTED);
- this.endTime = System.currentTimeMillis();
+ public void finished() {
+ stateMachine.finished();
}
- public void finish() {
- if (state.get().isDone()) {
- return;
- }
- state.set(FragmentInstanceState.FINISHED);
- this.endTime = System.currentTimeMillis();
- }
-
- public void flushing() {
- if (state.get().isDone()) {
- return;
- }
- state.set(FragmentInstanceState.FLUSHING);
+ public void transitionToFlushing() {
+ stateMachine.transitionToFlushing();
}
public long getEndTime() {
- return endTime;
- }
-
- public void setEndTime(long endTime) {
- this.endTime = endTime;
+ return executionEndTime.get();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
index 21e0cc3d50..15c85c4d99 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
@@ -18,14 +18,15 @@
*/
package org.apache.iotdb.db.mpp.execution;
+import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.schedule.IFragmentInstanceScheduler;
import com.google.common.collect.ImmutableList;
-
-import java.util.concurrent.atomic.AtomicReference;
+import io.airlift.stats.CounterStat;
import static java.util.Objects.requireNonNull;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.FAILED;
public class FragmentInstanceExecution {
@@ -34,25 +35,39 @@ public class FragmentInstanceExecution {
private final FragmentInstanceId instanceId;
private final FragmentInstanceContext context;
- private final Driver driver;
+ private final IDriver driver;
+
+ private final ISinkHandle sinkHandle;
- // TODO we may use StateMachine<FragmentInstanceState> to replace it
- private final AtomicReference<FragmentInstanceState> state;
+ private final FragmentInstanceStateMachine stateMachine;
private long lastHeartbeat;
- public FragmentInstanceExecution(
+ public static FragmentInstanceExecution createFragmentInstanceExecution(
IFragmentInstanceScheduler scheduler,
FragmentInstanceId instanceId,
FragmentInstanceContext context,
- Driver driver,
- AtomicReference<FragmentInstanceState> state) {
+ IDriver driver,
+ FragmentInstanceStateMachine stateMachine,
+ CounterStat failedInstances) {
+ FragmentInstanceExecution execution =
+ new FragmentInstanceExecution(scheduler, instanceId, context, driver, stateMachine);
+ execution.initialize(failedInstances);
+ return execution;
+ }
+
+ private FragmentInstanceExecution(
+ IFragmentInstanceScheduler scheduler,
+ FragmentInstanceId instanceId,
+ FragmentInstanceContext context,
+ IDriver driver,
+ FragmentInstanceStateMachine stateMachine) {
this.scheduler = scheduler;
this.instanceId = instanceId;
this.context = context;
this.driver = driver;
- this.state = state;
- state.set(FragmentInstanceState.RUNNING);
+ this.sinkHandle = driver.getSinkHandle();
+ this.stateMachine = stateMachine;
scheduler.submitFragmentInstances(instanceId.getQueryId(), ImmutableList.of(driver));
}
@@ -65,24 +80,43 @@ public class FragmentInstanceExecution {
}
public FragmentInstanceState getInstanceState() {
- return state.get();
+ return stateMachine.getState();
}
public FragmentInstanceInfo getInstanceInfo() {
- return new FragmentInstanceInfo(state.get(), context.getEndTime());
+ return new FragmentInstanceInfo(stateMachine.getState(), context.getEndTime());
}
public void failed(Throwable cause) {
requireNonNull(cause, "cause is null");
- context.failed(cause);
+ stateMachine.failed(cause);
}
public void cancel() {
- context.cancel();
+ stateMachine.cancel();
}
public void abort() {
- scheduler.abortFragmentInstance(instanceId);
- context.abort();
+ stateMachine.abort();
+ }
+
+ // this is a separate method to ensure that the `this` reference is not leaked during construction
+ private void initialize(CounterStat failedInstances) {
+ requireNonNull(failedInstances, "failedInstances is null");
+ stateMachine.addStateChangeListener(
+ newState -> {
+ if (!newState.isDone()) {
+ return;
+ }
+
+ // Update failed tasks counter
+ if (newState == FAILED) {
+ failedInstances.update(1);
+ }
+
+ driver.close();
+ sinkHandle.abort();
+ scheduler.abortFragmentInstance(instanceId);
+ });
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceFailureListener.java
similarity index 58%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceFailureListener.java
index 8c20a2c334..3f99848c29 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceFailureListener.java
@@ -20,31 +20,6 @@ package org.apache.iotdb.db.mpp.execution;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
-public class DriverContext {
-
- private final FragmentInstanceContext fragmentInstanceContext;
-
- public DriverContext(FragmentInstanceContext fragmentInstanceContext) {
- this.fragmentInstanceContext = fragmentInstanceContext;
- }
-
- public FragmentInstanceId getId() {
- return fragmentInstanceContext.getId();
- }
-
- public FragmentInstanceContext getFragmentInstanceContext() {
- return fragmentInstanceContext;
- }
-
- public void failed(Throwable cause) {
- fragmentInstanceContext.failed(cause);
- }
-
- public void finish() {
- fragmentInstanceContext.finish();
- }
-
- public void flushing() {
- fragmentInstanceContext.flushing();
- }
+public interface FragmentInstanceFailureListener {
+ void onTaskFailed(FragmentInstanceId taskId, Throwable failure);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
index 537f330f04..438e1bea0e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
@@ -27,17 +27,20 @@ import org.apache.iotdb.db.mpp.schedule.IFragmentInstanceScheduler;
import org.apache.iotdb.db.mpp.sql.planner.LocalExecutionPlanner;
import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
+import io.airlift.stats.CounterStat;
import io.airlift.units.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
import static java.util.Objects.requireNonNull;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceExecution.createFragmentInstanceExecution;
public class FragmentInstanceManager {
@@ -49,9 +52,13 @@ public class FragmentInstanceManager {
private final IFragmentInstanceScheduler scheduler = FragmentInstanceScheduler.getInstance();
private final ScheduledExecutorService instanceManagementExecutor;
+ private final ExecutorService instanceNotificationExecutor;
private final Duration infoCacheTime;
+ // record failed instances count
+ private final CounterStat failedInstances = new CounterStat();
+
public static FragmentInstanceManager getInstance() {
return FragmentInstanceManager.InstanceHolder.INSTANCE;
}
@@ -61,13 +68,15 @@ public class FragmentInstanceManager {
this.instanceExecution = new ConcurrentHashMap<>();
this.instanceManagementExecutor =
IoTDBThreadPoolFactory.newScheduledThreadPool(1, "instance-management");
+ this.instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(4, "instance-notification");
this.infoCacheTime = new Duration(15, TimeUnit.MINUTES);
instanceManagementExecutor.scheduleWithFixedDelay(
() -> {
try {
- removeOldTasks();
+ removeOldInstances();
} catch (Throwable e) {
logger.warn("Error removing old tasks", e);
}
@@ -85,13 +94,14 @@ public class FragmentInstanceManager {
instanceExecution.computeIfAbsent(
instanceId,
id -> {
- AtomicReference<FragmentInstanceState> state = new AtomicReference<>();
- state.set(FragmentInstanceState.PLANNED);
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
FragmentInstanceContext context =
instanceContext.computeIfAbsent(
instanceId,
- fragmentInstanceId -> new FragmentInstanceContext(fragmentInstanceId, state));
+ fragmentInstanceId ->
+ createFragmentInstanceContext(fragmentInstanceId, stateMachine));
try {
DataDriver driver =
@@ -100,9 +110,10 @@ public class FragmentInstanceManager {
context,
instance.getTimeFilter(),
dataRegion);
- return new FragmentInstanceExecution(scheduler, instanceId, context, driver, state);
+ return createFragmentInstanceExecution(
+ scheduler, instanceId, context, driver, stateMachine, failedInstances);
} catch (Throwable t) {
- context.failed(t);
+ stateMachine.failed(t);
return null;
}
});
@@ -118,26 +129,29 @@ public class FragmentInstanceManager {
instanceExecution.computeIfAbsent(
instanceId,
id -> {
- AtomicReference<FragmentInstanceState> state = new AtomicReference<>();
- state.set(FragmentInstanceState.PLANNED);
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
FragmentInstanceContext context =
instanceContext.computeIfAbsent(
instanceId,
- fragmentInstanceId -> new FragmentInstanceContext(fragmentInstanceId, state));
+ fragmentInstanceId ->
+ createFragmentInstanceContext(fragmentInstanceId, stateMachine));
try {
SchemaDriver driver =
planner.plan(instance.getFragment().getRoot(), context, schemaRegion);
- return new FragmentInstanceExecution(scheduler, instanceId, context, driver, state);
+ return createFragmentInstanceExecution(
+ scheduler, instanceId, context, driver, stateMachine, failedInstances);
} catch (Throwable t) {
- context.failed(t);
+ stateMachine.failed(t);
return null;
}
});
return execution != null ? execution.getInstanceInfo() : createFailedInstanceInfo(instanceId);
}
+ /** Aborts a FragmentInstance. */
public FragmentInstanceInfo abortFragmentInstance(FragmentInstanceId fragmentInstanceId) {
FragmentInstanceExecution execution = instanceExecution.remove(fragmentInstanceId);
if (execution != null) {
@@ -148,6 +162,19 @@ public class FragmentInstanceManager {
return null;
}
+ /** Cancels a FragmentInstance. */
+ public FragmentInstanceInfo cancelTask(FragmentInstanceId instanceId) {
+ requireNonNull(instanceId, "taskId is null");
+
+ FragmentInstanceExecution execution = instanceExecution.remove(instanceId);
+ if (execution != null) {
+ instanceContext.remove(instanceId);
+ execution.cancel();
+ return execution.getInstanceInfo();
+ }
+ return null;
+ }
+
/**
* Gets the info for the specified fragment instance.
*
@@ -163,12 +190,16 @@ public class FragmentInstanceManager {
return execution.getInstanceInfo();
}
+ public CounterStat getFailedInstances() {
+ return failedInstances;
+ }
+
private FragmentInstanceInfo createFailedInstanceInfo(FragmentInstanceId instanceId) {
return new FragmentInstanceInfo(
FragmentInstanceState.FAILED, instanceContext.get(instanceId).getEndTime());
}
- private void removeOldTasks() {
+ private void removeOldInstances() {
long oldestAllowedInstance = System.currentTimeMillis() - infoCacheTime.toMillis();
instanceContext
.entrySet()
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceState.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceState.java
index d55af83e98..d1a182bcbd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceState.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceState.java
@@ -40,7 +40,7 @@ public enum FragmentInstanceState {
/** Instance has finished executing and all output has been consumed. */
FINISHED(true, false),
/** Instance was canceled by a user. */
- CANCELED(true, true),
+ CANCELLED(true, true),
/** Instance was aborted due to a failure in the query. The failure was not in this instance. */
ABORTED(true, true),
/** Instance execution failed. */
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceStateMachine.java
new file mode 100644
index 0000000000..febc9e9e09
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceStateMachine.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution;
+
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.execution.StateMachine.StateChangeListener;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.util.concurrent.Futures.immediateFuture;
+import static java.util.Objects.requireNonNull;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.ABORTED;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.CANCELLED;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.FAILED;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.FINISHED;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.FLUSHING;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.RUNNING;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.TERMINAL_INSTANCE_STATES;
+
+@ThreadSafe
+public class FragmentInstanceStateMachine {
+ private static final Logger LOGGER = LoggerFactory.getLogger(FragmentInstanceStateMachine.class);
+
+ private final long createdTime = System.currentTimeMillis();
+
+ private final FragmentInstanceId instanceId;
+ private final Executor executor;
+ private final StateMachine<FragmentInstanceState> instanceState;
+ private final LinkedBlockingQueue<Throwable> failureCauses = new LinkedBlockingQueue<>();
+
+ @GuardedBy("this")
+ private final Map<FragmentInstanceId, Throwable> sourceInstanceFailures = new HashMap<>();
+
+ @GuardedBy("this")
+ private final List<FragmentInstanceFailureListener> sourceInstanceFailureListeners =
+ new ArrayList<>();
+
+ public FragmentInstanceStateMachine(FragmentInstanceId fragmentInstanceId, Executor executor) {
+ this.instanceId = requireNonNull(fragmentInstanceId, "fragmentInstanceId is null");
+ this.executor = requireNonNull(executor, "executor is null");
+ instanceState =
+ new StateMachine<>(
+ "FragmentInstance " + fragmentInstanceId, executor, RUNNING, TERMINAL_INSTANCE_STATES);
+ instanceState.addStateChangeListener(
+ newState -> LOGGER.debug("Fragment Instance {} is {}", fragmentInstanceId, newState));
+ }
+
+ public long getCreatedTime() {
+ return createdTime;
+ }
+
+ public FragmentInstanceId getFragmentInstanceId() {
+ return instanceId;
+ }
+
+ public FragmentInstanceState getState() {
+ return instanceState.get();
+ }
+
+ public ListenableFuture<FragmentInstanceState> getStateChange(
+ FragmentInstanceState currentState) {
+ requireNonNull(currentState, "currentState is null");
+ checkArgument(!currentState.isDone(), "Current state is already done");
+
+ ListenableFuture<FragmentInstanceState> future = instanceState.getStateChange(currentState);
+ FragmentInstanceState state = instanceState.get();
+ if (state.isDone()) {
+ return immediateFuture(state);
+ }
+ return future;
+ }
+
+ public LinkedBlockingQueue<Throwable> getFailureCauses() {
+ return failureCauses;
+ }
+
+ public void transitionToFlushing() {
+ instanceState.setIf(FLUSHING, currentState -> currentState == RUNNING);
+ }
+
+ public void finished() {
+ transitionToDoneState(FINISHED);
+ }
+
+ public void cancel() {
+ transitionToDoneState(CANCELLED);
+ }
+
+ public void abort() {
+ transitionToDoneState(ABORTED);
+ }
+
+ public void failed(Throwable cause) {
+ failureCauses.add(cause);
+ transitionToDoneState(FAILED);
+ }
+
+ private void transitionToDoneState(FragmentInstanceState doneState) {
+ requireNonNull(doneState, "doneState is null");
+ checkArgument(doneState.isDone(), "doneState %s is not a done state", doneState);
+
+ instanceState.setIf(doneState, currentState -> !currentState.isDone());
+ }
+
+ /**
+ * Listener is always notified asynchronously using a dedicated notification thread pool so, care
+ * should be taken to avoid leaking {@code this} when adding a listener in a constructor.
+ * Additionally, it is possible notifications are observed out of order due to the asynchronous
+ * execution.
+ */
+ public void addStateChangeListener(
+ StateChangeListener<FragmentInstanceState> stateChangeListener) {
+ instanceState.addStateChangeListener(stateChangeListener);
+ }
+
+ public void addSourceTaskFailureListener(FragmentInstanceFailureListener listener) {
+ Map<FragmentInstanceId, Throwable> failures;
+ synchronized (this) {
+ sourceInstanceFailureListeners.add(listener);
+ failures = ImmutableMap.copyOf(sourceInstanceFailures);
+ }
+ executor.execute(
+ () -> {
+ failures.forEach(listener::onTaskFailed);
+ });
+ }
+
+ public void sourceTaskFailed(FragmentInstanceId instanceId, Throwable failure) {
+ List<FragmentInstanceFailureListener> listeners;
+ synchronized (this) {
+ sourceInstanceFailures.putIfAbsent(instanceId, failure);
+ listeners = ImmutableList.copyOf(sourceInstanceFailureListeners);
+ }
+ executor.execute(
+ () -> {
+ for (FragmentInstanceFailureListener listener : listeners) {
+ listener.onTaskFailed(instanceId, failure);
+ }
+ });
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this)
+ .add("FragmentInstanceId", instanceId)
+ .add("FragmentInstanceState", instanceState)
+ .add("failureCauses", failureCauses)
+ .toString();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/IDriver.java
similarity index 58%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/IDriver.java
index 8c20a2c334..54c31901a2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/IDriver.java
@@ -18,33 +18,23 @@
*/
package org.apache.iotdb.db.mpp.execution;
+import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
-public class DriverContext {
+import com.google.common.util.concurrent.ListenableFuture;
+import io.airlift.units.Duration;
- private final FragmentInstanceContext fragmentInstanceContext;
+public interface IDriver {
- public DriverContext(FragmentInstanceContext fragmentInstanceContext) {
- this.fragmentInstanceContext = fragmentInstanceContext;
- }
+ boolean isFinished();
- public FragmentInstanceId getId() {
- return fragmentInstanceContext.getId();
- }
+ ListenableFuture<Void> processFor(Duration duration);
- public FragmentInstanceContext getFragmentInstanceContext() {
- return fragmentInstanceContext;
- }
+ FragmentInstanceId getInfo();
- public void failed(Throwable cause) {
- fragmentInstanceContext.failed(cause);
- }
+ void close();
- public void finish() {
- fragmentInstanceContext.finish();
- }
+ void failed(Throwable t);
- public void flushing() {
- fragmentInstanceContext.flushing();
- }
+ ISinkHandle getSinkHandle();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
index 844fe417ba..258065cd80 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
@@ -19,155 +19,27 @@
package org.apache.iotdb.db.mpp.execution;
import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
-import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.operator.Operator;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
-import io.airlift.units.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.NotThreadSafe;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
-import static org.apache.iotdb.db.mpp.operator.Operator.NOT_BLOCKED;
-
/** One SchemaDriver is used to execute one FragmentInstance which is for metadata query. */
@NotThreadSafe
-public class SchemaDriver implements Driver {
-
- private static final Logger logger = LoggerFactory.getLogger(SchemaDriver.class);
-
- private final Operator root;
- private final ISinkHandle sinkHandle;
- private final SchemaDriverContext driverContext;
-
- private final AtomicReference<SettableFuture<Void>> driverBlockedFuture = new AtomicReference<>();
-
- private boolean closed = false;
+public class SchemaDriver extends Driver {
public SchemaDriver(Operator root, ISinkHandle sinkHandle, SchemaDriverContext driverContext) {
- this.root = root;
- this.sinkHandle = sinkHandle;
- this.driverContext = driverContext;
- // initially the driverBlockedFuture is not blocked (it is completed)
- SettableFuture<Void> future = SettableFuture.create();
- future.set(null);
- driverBlockedFuture.set(future);
- }
-
- @Override
- public boolean isFinished() {
- try {
- boolean isFinished = driverBlockedFuture.get().isDone() && root != null && root.isFinished();
- if (isFinished) {
- close();
- }
- return isFinished;
- } catch (Throwable t) {
- logger.error(
- "Failed to query whether the schema driver {} is finished", driverContext.getId(), t);
- driverContext.failed(t);
- return true;
- }
- }
-
- @Override
- public ListenableFuture<Void> processFor(Duration duration) {
- // if the driver is blocked we don't need to continue
- SettableFuture<Void> blockedFuture = driverBlockedFuture.get();
- if (!blockedFuture.isDone()) {
- return blockedFuture;
- }
-
- long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS);
-
- long start = System.nanoTime();
- try {
- do {
- ListenableFuture<Void> future = processInternal();
- if (!future.isDone()) {
- return updateDriverBlockedFuture(future);
- }
- } while (System.nanoTime() - start < maxRuntime && !root.isFinished());
- } catch (Throwable t) {
- logger.error("Failed to execute fragment instance {}", driverContext.getId(), t);
- driverContext.failed(t);
- close();
- blockedFuture.setException(t);
- return blockedFuture;
- }
- return NOT_BLOCKED;
- }
-
- private ListenableFuture<Void> processInternal() throws IOException {
- ListenableFuture<Void> blocked = root.isBlocked();
- if (!blocked.isDone()) {
- return blocked;
- }
- blocked = sinkHandle.isFull();
- if (!blocked.isDone()) {
- return blocked;
- }
- if (root.hasNext()) {
- TsBlock tsBlock = root.next();
- if (tsBlock != null && !tsBlock.isEmpty()) {
- sinkHandle.send(Collections.singletonList(tsBlock));
- }
- }
- return NOT_BLOCKED;
- }
-
- private ListenableFuture<Void> updateDriverBlockedFuture(
- ListenableFuture<Void> sourceBlockedFuture) {
- // driverBlockedFuture will be completed as soon as the sourceBlockedFuture is completed
- // or any of the operators gets a memory revocation request
- SettableFuture<Void> newDriverBlockedFuture = SettableFuture.create();
- driverBlockedFuture.set(newDriverBlockedFuture);
- sourceBlockedFuture.addListener(() -> newDriverBlockedFuture.set(null), directExecutor());
-
- // TODO Although we don't have memory management for operator now, we should consider it for
- // future
- // it's possible that memory revoking is requested for some operator
- // before we update driverBlockedFuture above and we don't want to miss that
- // notification, so we check to see whether that's the case before returning.
-
- return newDriverBlockedFuture;
- }
-
- @Override
- public FragmentInstanceId getInfo() {
- return driverContext.getId();
+ super(root, sinkHandle, driverContext);
}
@Override
- public void close() {
- if (closed) {
- return;
- }
- closed = true;
- try {
- if (root != null) {
- root.close();
- }
- if (sinkHandle != null) {
- sinkHandle.close();
- }
- } catch (Throwable t) {
- logger.error("Failed to closed driver {}", driverContext.getId(), t);
- driverContext.failed(t);
- }
+ protected boolean init(SettableFuture<Void> blockedFuture) {
+ return true;
}
@Override
- public void failed(Throwable t) {
- driverContext.failed(t);
+ protected void releaseResource() {
+ // do nothing
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
index c8cccc0520..398b2f03c4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
@@ -22,8 +22,6 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.ListenableFuture;
-import java.io.IOException;
-
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
public interface Operator extends AutoCloseable {
@@ -53,5 +51,5 @@ public interface Operator extends AutoCloseable {
/**
* Is this operator completely finished processing and no more output TsBlock will be produced.
*/
- boolean isFinished() throws IOException;
+ boolean isFinished();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
index a226637146..24711217c4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
@@ -24,8 +24,6 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.ListenableFuture;
-import java.io.IOException;
-
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
@@ -76,7 +74,7 @@ public class LimitOperator implements ProcessOperator {
}
@Override
- public boolean isFinished() throws IOException {
+ public boolean isFinished() {
return remainingLimit == 0 || child.isFinished();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java
index 48d0c71185..cd3355d395 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java
@@ -217,7 +217,7 @@ public class TransformOperator implements ProcessOperator {
}
@Override
- public boolean isFinished() throws IOException {
+ public boolean isFinished() {
return !hasNext();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/SchemaFetchOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/SchemaFetchOperator.java
index 003da76f2d..34fb44373e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/SchemaFetchOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/SchemaFetchOperator.java
@@ -35,7 +35,6 @@ import org.apache.iotdb.tsfile.utils.Binary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.List;
@@ -92,7 +91,7 @@ public class SchemaFetchOperator implements SourceOperator {
}
@Override
- public boolean isFinished() throws IOException {
+ public boolean isFinished() {
return isFinished;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/ExchangeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/ExchangeOperator.java
index 722f63543d..397b7dc461 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/ExchangeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/ExchangeOperator.java
@@ -65,7 +65,7 @@ public class ExchangeOperator implements SourceOperator {
}
@Override
- public boolean isFinished() throws IOException {
+ public boolean isFinished() {
return sourceHandle.isFinished();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceAbortedException.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceAbortedException.java
index 20017340f6..7a25e4209a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceAbortedException.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceAbortedException.java
@@ -19,9 +19,9 @@
package org.apache.iotdb.db.mpp.schedule;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
-import org.apache.iotdb.db.mpp.execution.Driver;
+import org.apache.iotdb.db.mpp.execution.IDriver;
-/** A common exception to pass to {@link Driver#failed(Throwable)} */
+/** A common exception to pass to {@link IDriver#failed(Throwable)} */
public class FragmentInstanceAbortedException extends Exception {
public static final String BY_TIMEOUT = "timeout";
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
index 4422e81ba8..880041924a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.db.mpp.buffer.DataBlockService;
import org.apache.iotdb.db.mpp.buffer.IDataBlockManager;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.execution.Driver;
+import org.apache.iotdb.db.mpp.execution.IDriver;
import org.apache.iotdb.db.mpp.schedule.queue.IndexedBlockingQueue;
import org.apache.iotdb.db.mpp.schedule.queue.L1PriorityQueue;
import org.apache.iotdb.db.mpp.schedule.queue.L2PriorityQueue;
@@ -121,7 +121,7 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
}
@Override
- public void submitFragmentInstances(QueryId queryId, List<Driver> instances) {
+ public void submitFragmentInstances(QueryId queryId, List<IDriver> instances) {
List<FragmentInstanceTask> tasks =
instances.stream()
.map(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java
index fd19b67ee0..25b4e29a0a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.db.mpp.schedule;
-import org.apache.iotdb.db.mpp.execution.Driver;
+import org.apache.iotdb.db.mpp.execution.IDriver;
import org.apache.iotdb.db.mpp.schedule.queue.IndexedBlockingQueue;
import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
import org.apache.iotdb.db.utils.stats.CpuTimer;
@@ -52,7 +52,7 @@ public class FragmentInstanceTaskExecutor extends AbstractExecutor {
if (!scheduler.readyToRunning(task)) {
return;
}
- Driver instance = task.getFragmentInstance();
+ IDriver instance = task.getFragmentInstance();
CpuTimer timer = new CpuTimer();
ListenableFuture<Void> future = instance.processFor(EXECUTION_TIME_SLICE);
CpuTimer.CpuDuration duration = timer.elapsedTime();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceScheduler.java
index 2c597c15e7..b8d5970ee9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceScheduler.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.db.mpp.schedule;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.execution.Driver;
+import org.apache.iotdb.db.mpp.execution.IDriver;
import java.util.List;
@@ -28,13 +28,13 @@ import java.util.List;
public interface IFragmentInstanceScheduler {
/**
- * Submit one or more {@link org.apache.iotdb.db.mpp.execution.Driver} in one query for later
+ * Submit one or more {@link org.apache.iotdb.db.mpp.execution.IDriver} in one query for later
* scheduling.
*
* @param queryId the queryId these instances belong to.
* @param instances the submitted instances.
*/
- void submitFragmentInstances(QueryId queryId, List<Driver> instances);
+ void submitFragmentInstances(QueryId queryId, List<IDriver> instances);
/**
* Abort all the instances in this query.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
index abebdaf30d..fc322fee28 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
@@ -18,10 +18,11 @@
*/
package org.apache.iotdb.db.mpp.schedule.task;
+import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.execution.Driver;
+import org.apache.iotdb.db.mpp.execution.IDriver;
import org.apache.iotdb.db.mpp.schedule.ExecutionContext;
import org.apache.iotdb.db.mpp.schedule.FragmentInstanceTaskExecutor;
import org.apache.iotdb.db.mpp.schedule.queue.ID;
@@ -43,7 +44,7 @@ public class FragmentInstanceTask implements IDIndexedAccessible {
private FragmentInstanceTaskID id;
private FragmentInstanceTaskStatus status;
- private final Driver fragmentInstance;
+ private final IDriver fragmentInstance;
// the higher this field is, the higher probability it will be scheduled.
private volatile double schedulePriority;
@@ -60,7 +61,7 @@ public class FragmentInstanceTask implements IDIndexedAccessible {
this(new StubFragmentInstance(), 0L, null);
}
- public FragmentInstanceTask(Driver instance, long timeoutMs, FragmentInstanceTaskStatus status) {
+ public FragmentInstanceTask(IDriver instance, long timeoutMs, FragmentInstanceTaskStatus status) {
this.fragmentInstance = instance;
this.id = new FragmentInstanceTaskID(instance.getInfo());
this.setStatus(status);
@@ -87,7 +88,7 @@ public class FragmentInstanceTask implements IDIndexedAccessible {
|| status == FragmentInstanceTaskStatus.FINISHED;
}
- public Driver getFragmentInstance() {
+ public IDriver getFragmentInstance() {
return fragmentInstance;
}
@@ -185,7 +186,7 @@ public class FragmentInstanceTask implements IDIndexedAccessible {
}
}
- private static class StubFragmentInstance implements Driver {
+ private static class StubFragmentInstance implements IDriver {
private static final QueryId stubQueryId = new QueryId("stub_query");
private static final FragmentInstanceId stubInstance =
@@ -211,5 +212,10 @@ public class FragmentInstanceTask implements IDIndexedAccessible {
@Override
public void failed(Throwable t) {}
+
+ @Override
+ public ISinkHandle getSinkHandle() {
+ return null;
+ }
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
index e8a7767100..126faaeef5 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
@@ -36,7 +36,6 @@ import org.junit.Test;
import org.mockito.Mockito;
import java.io.IOException;
-import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -96,12 +95,7 @@ public class SinkHandleTest {
Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks());
// Send tsblocks.
- try {
- sinkHandle.send(mockTsBlocks);
- } catch (IOException e) {
- e.printStackTrace();
- Assert.fail();
- }
+ sinkHandle.send(mockTsBlocks);
sinkHandle.setNoMoreTsBlocks();
Assert.assertTrue(sinkHandle.isFull().isDone());
Assert.assertFalse(sinkHandle.isFinished());
@@ -150,12 +144,7 @@ public class SinkHandleTest {
Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onFinish(sinkHandle);
// Close the SinkHandle.
- try {
- sinkHandle.close();
- } catch (IOException e) {
- e.printStackTrace();
- Assert.fail();
- }
+ sinkHandle.close();
Assert.assertTrue(sinkHandle.isClosed());
Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onClosed(sinkHandle);
try {
@@ -229,12 +218,7 @@ public class SinkHandleTest {
Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks());
// Send tsblocks.
- try {
- sinkHandle.send(mockTsBlocks);
- } catch (IOException e) {
- e.printStackTrace();
- Assert.fail();
- }
+ sinkHandle.send(mockTsBlocks);
Assert.assertFalse(sinkHandle.isFull().isDone());
Assert.assertFalse(sinkHandle.isFinished());
Assert.assertFalse(sinkHandle.isClosed());
@@ -281,12 +265,7 @@ public class SinkHandleTest {
.free(queryId, numOfMockTsBlock * mockTsBlockSize);
// Send tsblocks.
- try {
- sinkHandle.send(mockTsBlocks);
- } catch (IOException e) {
- e.printStackTrace();
- Assert.fail();
- }
+ sinkHandle.send(mockTsBlocks);
Assert.assertFalse(sinkHandle.isFull().isDone());
Assert.assertFalse(sinkHandle.isFinished());
Assert.assertFalse(sinkHandle.isClosed());
@@ -314,12 +293,7 @@ public class SinkHandleTest {
// Close the SinkHandle.
sinkHandle.setNoMoreTsBlocks();
Assert.assertFalse(sinkHandle.isFinished());
- try {
- sinkHandle.close();
- } catch (IOException e) {
- e.printStackTrace();
- Assert.fail();
- }
+ sinkHandle.close();
Assert.assertTrue(sinkHandle.isClosed());
Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onClosed(sinkHandle);
try {
@@ -381,11 +355,12 @@ public class SinkHandleTest {
List<TsBlock> mockTsBlocks = Utils.createMockTsBlocks(numOfMockTsBlock, mockTsBlockSize);
// Construct a mock client.
Client mockClient = Mockito.mock(Client.class);
+ TException exception = new TException("Mock exception");
try {
- Mockito.doThrow(new TException("Mock exception"))
+ Mockito.doThrow(exception)
.when(mockClient)
.onEndOfDataBlockEvent(Mockito.any(TEndOfDataBlockEvent.class));
- Mockito.doThrow(new TException("Mock exception"))
+ Mockito.doThrow(exception)
.when(mockClient)
.onNewDataBlockEvent(Mockito.any(TNewDataBlockEvent.class));
} catch (TException e) {
@@ -412,12 +387,7 @@ public class SinkHandleTest {
Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks());
// Send tsblocks.
- try {
- sinkHandle.send(mockTsBlocks);
- } catch (IOException e) {
- e.printStackTrace();
- Assert.fail();
- }
+ sinkHandle.send(mockTsBlocks);
sinkHandle.setNoMoreTsBlocks();
Assert.assertFalse(sinkHandle.isFull().isDone());
Assert.assertFalse(sinkHandle.isFinished());
@@ -443,19 +413,14 @@ public class SinkHandleTest {
Assert.fail();
}
- try {
- sinkHandle.send(Collections.singletonList(Mockito.mock(TsBlock.class)));
- Assert.fail("Expect an IOException.");
- } catch (IOException e) {
- Assert.assertEquals("org.apache.thrift.TException: Mock exception", e.getMessage());
- }
+ Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onFailure(exception);
// Close the SinkHandle.
try {
sinkHandle.close();
- Assert.fail("Expect an IOException.");
- } catch (IOException e) {
- Assert.assertEquals("org.apache.thrift.TException: Mock exception", e.getMessage());
+ Assert.fail("Expect an RuntimeException.");
+ } catch (RuntimeException e) {
+ Assert.assertEquals("Send EndOfDataBlockEvent failed", e.getMessage());
}
Assert.assertFalse(sinkHandle.isClosed());
Mockito.verify(mockSinkHandleListener, Mockito.times(0)).onClosed(sinkHandle);
@@ -522,12 +487,7 @@ public class SinkHandleTest {
Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks());
// Send tsblocks.
- try {
- sinkHandle.send(mockTsBlocks);
- } catch (IOException e) {
- e.printStackTrace();
- Assert.fail();
- }
+ sinkHandle.send(mockTsBlocks);
Future<?> blocked = sinkHandle.isFull();
Assert.assertFalse(blocked.isDone());
Assert.assertFalse(blocked.isCancelled());
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
index 2703d0aa37..88b5e57a54 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.execution;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -55,8 +56,9 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.ExecutorService;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceContext.createFragmentInstanceContext;
import static org.apache.iotdb.db.mpp.schedule.FragmentInstanceTaskExecutor.EXECUTION_TIME_SLICE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -85,6 +87,8 @@ public class DataDriverTest {
@Test
public void batchTest() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
MeasurementPath measurementPath1 =
new MeasurementPath(DATA_DRIVER_TEST_SG + ".device0.sensor0", TSDataType.INT32);
@@ -92,11 +96,12 @@ public class DataDriverTest {
allSensors.add("sensor0");
allSensors.add("sensor1");
QueryId queryId = new QueryId("stub_query");
- AtomicReference<FragmentInstanceState> state =
- new AtomicReference<>(FragmentInstanceState.RUNNING);
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
FragmentInstanceContext fragmentInstanceContext =
- new FragmentInstanceContext(
- new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
+ createFragmentInstanceContext(instanceId, stateMachine);
PlanNodeId planNodeId1 = new PlanNodeId("1");
fragmentInstanceContext.addOperatorContext(
1, planNodeId1, SeriesScanOperator.class.getSimpleName());
@@ -159,19 +164,20 @@ public class DataDriverTest {
ImmutableList.of(seriesScanOperator1, seriesScanOperator2));
StubSinkHandle sinkHandle = new StubSinkHandle(fragmentInstanceContext);
-
- try (Driver dataDriver = new DataDriver(limitOperator, sinkHandle, driverContext)) {
+ IDriver dataDriver = null;
+ try {
+ dataDriver = new DataDriver(limitOperator, sinkHandle, driverContext);
assertEquals(fragmentInstanceContext.getId(), dataDriver.getInfo());
assertFalse(dataDriver.isFinished());
while (!dataDriver.isFinished()) {
- assertEquals(FragmentInstanceState.RUNNING, state.get());
+ assertEquals(FragmentInstanceState.RUNNING, stateMachine.getState());
ListenableFuture<Void> blocked = dataDriver.processFor(EXECUTION_TIME_SLICE);
assertTrue(blocked.isDone());
}
- assertEquals(FragmentInstanceState.FLUSHING, state.get());
+ assertEquals(FragmentInstanceState.FLUSHING, stateMachine.getState());
List<TsBlock> result = sinkHandle.getTsBlocks();
assertEquals(13, result.size());
@@ -204,10 +210,16 @@ public class DataDriverTest {
}
}
}
+ } finally {
+ if (dataDriver != null) {
+ dataDriver.close();
+ }
}
} catch (IllegalPathException | QueryProcessException e) {
e.printStackTrace();
fail();
+ } finally {
+ instanceNotificationExecutor.shutdown();
}
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
index 6ad74bd110..a2516c0ab0 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.operator;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -27,7 +28,7 @@ import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
-import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
import org.apache.iotdb.db.mpp.operator.process.LimitOperator;
import org.apache.iotdb.db.mpp.operator.process.TimeJoinOperator;
import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
@@ -50,8 +51,9 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.ExecutorService;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceContext.createFragmentInstanceContext;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -78,6 +80,8 @@ public class LimitOperatorTest {
@Test
public void batchTest() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
MeasurementPath measurementPath1 =
new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
@@ -85,11 +89,12 @@ public class LimitOperatorTest {
allSensors.add("sensor0");
allSensors.add("sensor1");
QueryId queryId = new QueryId("stub_query");
- AtomicReference<FragmentInstanceState> state =
- new AtomicReference<>(FragmentInstanceState.RUNNING);
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
FragmentInstanceContext fragmentInstanceContext =
- new FragmentInstanceContext(
- new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
+ createFragmentInstanceContext(instanceId, stateMachine);
PlanNodeId planNodeId1 = new PlanNodeId("1");
fragmentInstanceContext.addOperatorContext(
1, planNodeId1, SeriesScanOperator.class.getSimpleName());
@@ -169,6 +174,8 @@ public class LimitOperatorTest {
} catch (IllegalPathException e) {
e.printStackTrace();
fail();
+ } finally {
+ instanceNotificationExecutor.shutdown();
}
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
index d1c71f2eb4..d2045e0d16 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.operator;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -28,6 +29,7 @@ import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
import org.apache.iotdb.db.mpp.operator.source.SeriesAggregateScanOperator;
import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -52,8 +54,10 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceContext.createFragmentInstanceContext;
import static org.junit.Assert.assertEquals;
public class SeriesAggregateScanOperatorTest {
@@ -64,16 +68,20 @@ public class SeriesAggregateScanOperatorTest {
private final List<TsFileResource> seqResources = new ArrayList<>();
private final List<TsFileResource> unSeqResources = new ArrayList<>();
+ private ExecutorService instanceNotificationExecutor;
@Before
public void setUp() throws MetadataException, IOException, WriteProcessException {
SeriesReaderTestUtil.setUp(
measurementSchemas, deviceIds, seqResources, unSeqResources, SERIES_SCAN_OPERATOR_TEST_SG);
+ this.instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
}
@After
public void tearDown() throws IOException {
SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+ instanceNotificationExecutor.shutdown();
}
@Test
@@ -349,9 +357,12 @@ public class SeriesAggregateScanOperatorTest {
QueryId queryId = new QueryId("stub_query");
AtomicReference<FragmentInstanceState> state =
new AtomicReference<>(FragmentInstanceState.RUNNING);
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
FragmentInstanceContext fragmentInstanceContext =
- new FragmentInstanceContext(
- new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
+ createFragmentInstanceContext(instanceId, stateMachine);
PlanNodeId planNodeId = new PlanNodeId("1");
fragmentInstanceContext.addOperatorContext(
1, planNodeId, SeriesScanOperator.class.getSimpleName());
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
index 78e6821bcd..a9b4b452c5 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.operator;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -27,7 +28,7 @@ import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
-import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
@@ -46,8 +47,9 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.ExecutorService;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceContext.createFragmentInstanceContext;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -73,16 +75,19 @@ public class SeriesScanOperatorTest {
@Test
public void batchTest() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
MeasurementPath measurementPath =
new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
Set<String> allSensors = Sets.newHashSet("sensor0");
QueryId queryId = new QueryId("stub_query");
- AtomicReference<FragmentInstanceState> state =
- new AtomicReference<>(FragmentInstanceState.RUNNING);
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
FragmentInstanceContext fragmentInstanceContext =
- new FragmentInstanceContext(
- new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
+ createFragmentInstanceContext(instanceId, stateMachine);
PlanNodeId planNodeId = new PlanNodeId("1");
fragmentInstanceContext.addOperatorContext(
1, planNodeId, SeriesScanOperator.class.getSimpleName());
@@ -123,6 +128,8 @@ public class SeriesScanOperatorTest {
} catch (IllegalPathException e) {
e.printStackTrace();
fail();
+ } finally {
+ instanceNotificationExecutor.shutdown();
}
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
index 5534418b84..4ee1ed95d3 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.operator;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -27,7 +28,7 @@ import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
-import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
import org.apache.iotdb.db.mpp.operator.process.TimeJoinOperator;
import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -49,8 +50,9 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.ExecutorService;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceContext.createFragmentInstanceContext;
import static org.junit.Assert.*;
public class TimeJoinOperatorTest {
@@ -74,6 +76,8 @@ public class TimeJoinOperatorTest {
@Test
public void batchTest() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
MeasurementPath measurementPath1 =
new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
@@ -81,11 +85,12 @@ public class TimeJoinOperatorTest {
allSensors.add("sensor0");
allSensors.add("sensor1");
QueryId queryId = new QueryId("stub_query");
- AtomicReference<FragmentInstanceState> state =
- new AtomicReference<>(FragmentInstanceState.RUNNING);
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
FragmentInstanceContext fragmentInstanceContext =
- new FragmentInstanceContext(
- new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
+ createFragmentInstanceContext(instanceId, stateMachine);
PlanNodeId planNodeId1 = new PlanNodeId("1");
fragmentInstanceContext.addOperatorContext(
1, planNodeId1, SeriesScanOperator.class.getSimpleName());
@@ -155,6 +160,8 @@ public class TimeJoinOperatorTest {
} catch (IllegalPathException e) {
e.printStackTrace();
fail();
+ } finally {
+ instanceNotificationExecutor.shutdown();
}
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java
index 609dc9befa..d4532ff167 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.operator.schema;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.LocalConfigNode;
@@ -28,7 +29,7 @@ import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
-import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
import org.apache.iotdb.db.mpp.execution.SchemaDriverContext;
import org.apache.iotdb.db.mpp.operator.OperatorContext;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -52,7 +53,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.ExecutorService;
import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_ATTRIBUTES;
import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_DEVICES;
@@ -64,6 +65,7 @@ import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES_ALIA
import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES_COMPRESSION;
import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES_DATATYPE;
import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES_ENCODING;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceContext.createFragmentInstanceContext;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -90,13 +92,16 @@ public class SchemaScanOperatorTest {
@Test
public void testDeviceMetaScanOperator() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
QueryId queryId = new QueryId("stub_query");
- AtomicReference<FragmentInstanceState> state =
- new AtomicReference<>(FragmentInstanceState.RUNNING);
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
FragmentInstanceContext fragmentInstanceContext =
- new FragmentInstanceContext(
- new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
+ createFragmentInstanceContext(instanceId, stateMachine);
PlanNodeId planNodeId = queryId.genPlanNodeId();
OperatorContext operatorContext =
fragmentInstanceContext.addOperatorContext(
@@ -149,18 +154,23 @@ public class SchemaScanOperatorTest {
} catch (MetadataException e) {
e.printStackTrace();
fail();
+ } finally {
+ instanceNotificationExecutor.shutdown();
}
}
@Test
public void testTimeSeriesMetaScanOperator() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
QueryId queryId = new QueryId("stub_query");
- AtomicReference<FragmentInstanceState> state =
- new AtomicReference<>(FragmentInstanceState.RUNNING);
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
FragmentInstanceContext fragmentInstanceContext =
- new FragmentInstanceContext(
- new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
+ createFragmentInstanceContext(instanceId, stateMachine);
PlanNodeId planNodeId = queryId.genPlanNodeId();
OperatorContext operatorContext =
fragmentInstanceContext.addOperatorContext(
@@ -237,6 +247,8 @@ public class SchemaScanOperatorTest {
} catch (MetadataException e) {
e.printStackTrace();
fail();
+ } finally {
+ instanceNotificationExecutor.shutdown();
}
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java
index 82ca5dfd2c..a93ab988ea 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java
@@ -22,7 +22,7 @@ import org.apache.iotdb.db.mpp.buffer.IDataBlockManager;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.execution.Driver;
+import org.apache.iotdb.db.mpp.execution.IDriver;
import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTaskStatus;
import org.apache.iotdb.db.utils.stats.CpuTimer;
@@ -52,7 +52,7 @@ public class DefaultTaskSchedulerTest {
IDataBlockManager mockDataBlockManager = Mockito.mock(IDataBlockManager.class);
manager.setBlockManager(mockDataBlockManager);
ITaskScheduler defaultScheduler = manager.getScheduler();
- Driver mockDriver = Mockito.mock(Driver.class);
+ IDriver mockDriver = Mockito.mock(IDriver.class);
QueryId queryId = new QueryId("test");
FragmentInstanceId instanceId =
new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
@@ -103,7 +103,7 @@ public class DefaultTaskSchedulerTest {
IDataBlockManager mockDataBlockManager = Mockito.mock(IDataBlockManager.class);
manager.setBlockManager(mockDataBlockManager);
ITaskScheduler defaultScheduler = manager.getScheduler();
- Driver mockDriver = Mockito.mock(Driver.class);
+ IDriver mockDriver = Mockito.mock(IDriver.class);
QueryId queryId = new QueryId("test");
FragmentInstanceId instanceId =
@@ -151,7 +151,7 @@ public class DefaultTaskSchedulerTest {
IDataBlockManager mockDataBlockManager = Mockito.mock(IDataBlockManager.class);
manager.setBlockManager(mockDataBlockManager);
ITaskScheduler defaultScheduler = manager.getScheduler();
- Driver mockDriver = Mockito.mock(Driver.class);
+ IDriver mockDriver = Mockito.mock(IDriver.class);
QueryId queryId = new QueryId("test");
FragmentInstanceId instanceId =
new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
@@ -204,7 +204,7 @@ public class DefaultTaskSchedulerTest {
IDataBlockManager mockDataBlockManager = Mockito.mock(IDataBlockManager.class);
manager.setBlockManager(mockDataBlockManager);
ITaskScheduler defaultScheduler = manager.getScheduler();
- Driver mockDriver = Mockito.mock(Driver.class);
+ IDriver mockDriver = Mockito.mock(IDriver.class);
QueryId queryId = new QueryId("test");
FragmentInstanceId instanceId =
new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
@@ -257,7 +257,7 @@ public class DefaultTaskSchedulerTest {
IDataBlockManager mockDataBlockManager = Mockito.mock(IDataBlockManager.class);
manager.setBlockManager(mockDataBlockManager);
ITaskScheduler defaultScheduler = manager.getScheduler();
- Driver mockDriver = Mockito.mock(Driver.class);
+ IDriver mockDriver = Mockito.mock(IDriver.class);
QueryId queryId = new QueryId("test");
FragmentInstanceId instanceId =
new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
@@ -313,9 +313,9 @@ public class DefaultTaskSchedulerTest {
QueryId queryId = new QueryId("test");
FragmentInstanceId instanceId1 =
new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
- Driver mockDriver1 = Mockito.mock(Driver.class);
+ IDriver mockDriver1 = Mockito.mock(IDriver.class);
Mockito.when(mockDriver1.getInfo()).thenReturn(instanceId1);
- Driver mockDriver2 = Mockito.mock(Driver.class);
+ IDriver mockDriver2 = Mockito.mock(IDriver.class);
FragmentInstanceId instanceId2 =
new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-1");
Mockito.when(mockDriver2.getInfo()).thenReturn(instanceId2);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java
index 56435496ab..f83d1213c6 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java
@@ -22,7 +22,7 @@ import org.apache.iotdb.db.mpp.buffer.IDataBlockManager;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.execution.Driver;
+import org.apache.iotdb.db.mpp.execution.IDriver;
import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTaskID;
import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTaskStatus;
@@ -56,12 +56,12 @@ public class FragmentInstanceSchedulerTest {
QueryId queryId = new QueryId("test");
PlanFragmentId fragmentId = new PlanFragmentId(queryId, 0);
FragmentInstanceId instanceId1 = new FragmentInstanceId(fragmentId, "inst-0");
- Driver mockDriver1 = Mockito.mock(Driver.class);
+ IDriver mockDriver1 = Mockito.mock(IDriver.class);
Mockito.when(mockDriver1.getInfo()).thenReturn(instanceId1);
FragmentInstanceId instanceId2 = new FragmentInstanceId(fragmentId, "inst-1");
- Driver mockDriver2 = Mockito.mock(Driver.class);
+ IDriver mockDriver2 = Mockito.mock(IDriver.class);
Mockito.when(mockDriver2.getInfo()).thenReturn(instanceId2);
- List<Driver> instances = Arrays.asList(mockDriver1, mockDriver2);
+ List<IDriver> instances = Arrays.asList(mockDriver1, mockDriver2);
manager.submitFragmentInstances(queryId, instances);
Assert.assertTrue(manager.getBlockedTasks().isEmpty());
Assert.assertEquals(1, manager.getQueryMap().size());
@@ -81,7 +81,7 @@ public class FragmentInstanceSchedulerTest {
Assert.assertEquals(FragmentInstanceTaskStatus.READY, task2.getStatus());
// Submit another task of the same query
- Driver mockDriver3 = Mockito.mock(Driver.class);
+ IDriver mockDriver3 = Mockito.mock(IDriver.class);
FragmentInstanceId instanceId3 = new FragmentInstanceId(fragmentId, "inst-2");
Mockito.when(mockDriver3.getInfo()).thenReturn(instanceId3);
manager.submitFragmentInstances(queryId, Collections.singletonList(mockDriver3));
@@ -101,7 +101,7 @@ public class FragmentInstanceSchedulerTest {
QueryId queryId2 = new QueryId("test2");
PlanFragmentId fragmentId2 = new PlanFragmentId(queryId2, 0);
FragmentInstanceId instanceId4 = new FragmentInstanceId(fragmentId2, "inst-0");
- Driver mockDriver4 = Mockito.mock(Driver.class);
+ IDriver mockDriver4 = Mockito.mock(IDriver.class);
Mockito.when(mockDriver4.getInfo()).thenReturn(instanceId4);
manager.submitFragmentInstances(queryId2, Collections.singletonList(mockDriver4));
Assert.assertTrue(manager.getBlockedTasks().isEmpty());
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinelTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinelTest.java
index 862f4ca207..28365962b8 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinelTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinelTest.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.mpp.schedule;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.execution.Driver;
+import org.apache.iotdb.db.mpp.execution.IDriver;
import org.apache.iotdb.db.mpp.schedule.queue.IndexedBlockingQueue;
import org.apache.iotdb.db.mpp.schedule.queue.L1PriorityQueue;
import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
@@ -57,7 +57,7 @@ public class FragmentInstanceTimeoutSentinelTest {
IndexedBlockingQueue<FragmentInstanceTask> taskQueue =
new L1PriorityQueue<>(
100, new FragmentInstanceTask.TimeoutComparator(), new FragmentInstanceTask());
- Driver mockDriver = Mockito.mock(Driver.class);
+ IDriver mockDriver = Mockito.mock(IDriver.class);
Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
AbstractExecutor executor =
@@ -116,7 +116,7 @@ public class FragmentInstanceTimeoutSentinelTest {
100, new FragmentInstanceTask.TimeoutComparator(), new FragmentInstanceTask());
// Mock the instance with a cancelled future
- Driver mockDriver = Mockito.mock(Driver.class);
+ IDriver mockDriver = Mockito.mock(IDriver.class);
QueryId queryId = new QueryId("test");
PlanFragmentId fragmentId = new PlanFragmentId(queryId, 0);
FragmentInstanceId instanceId = new FragmentInstanceId(fragmentId, "inst-0");
@@ -158,7 +158,7 @@ public class FragmentInstanceTimeoutSentinelTest {
100, new FragmentInstanceTask.TimeoutComparator(), new FragmentInstanceTask());
// Mock the instance with a cancelled future
- Driver mockDriver = Mockito.mock(Driver.class);
+ IDriver mockDriver = Mockito.mock(IDriver.class);
QueryId queryId = new QueryId("test");
PlanFragmentId fragmentId = new PlanFragmentId(queryId, 0);
FragmentInstanceId instanceId = new FragmentInstanceId(fragmentId, "inst-0");
@@ -209,7 +209,7 @@ public class FragmentInstanceTimeoutSentinelTest {
})
.when(mockFuture)
.addListener(Mockito.any(), Mockito.any());
- Driver mockDriver = Mockito.mock(Driver.class);
+ IDriver mockDriver = Mockito.mock(IDriver.class);
QueryId queryId = new QueryId("test");
PlanFragmentId fragmentId = new PlanFragmentId(queryId, 0);
FragmentInstanceId instanceId = new FragmentInstanceId(fragmentId, "inst-0");
@@ -260,7 +260,7 @@ public class FragmentInstanceTimeoutSentinelTest {
})
.when(mockFuture)
.addListener(Mockito.any(), Mockito.any());
- Driver mockDriver = Mockito.mock(Driver.class);
+ IDriver mockDriver = Mockito.mock(IDriver.class);
QueryId queryId = new QueryId("test");
PlanFragmentId fragmentId = new PlanFragmentId(queryId, 0);
FragmentInstanceId instanceId = new FragmentInstanceId(fragmentId, "inst-0");