You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/20 09:55:13 UTC
[iotdb] 01/02: Add FragmentInstanceStateMachine
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch stable-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit fb52130d2bc1d0ea95b36055049f1c98c728dbb2
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Wed Apr 20 17:24:28 2022 +0800
Add FragmentInstanceStateMachine
---
pom.xml | 5 +
server/pom.xml | 4 +
.../apache/iotdb/db/mpp/buffer/ISinkHandle.java | 21 +-
.../org/apache/iotdb/db/mpp/buffer/SinkHandle.java | 16 +-
.../apache/iotdb/db/mpp/buffer/StubSinkHandle.java | 13 +-
.../apache/iotdb/db/mpp/execution/DataDriver.java | 186 ++-------
.../org/apache/iotdb/db/mpp/execution/Driver.java | 415 ++++++++++++++++++++-
.../iotdb/db/mpp/execution/DriverContext.java | 15 +-
.../db/mpp/execution/FragmentInstanceContext.java | 87 +++--
.../mpp/execution/FragmentInstanceExecution.java | 85 ++++-
...t.java => FragmentInstanceFailureListener.java} | 29 +-
.../db/mpp/execution/FragmentInstanceManager.java | 56 ++-
.../db/mpp/execution/FragmentInstanceState.java | 2 +-
.../execution/FragmentInstanceStateMachine.java | 171 +++++++++
.../iotdb/db/mpp/execution/SchemaDriver.java | 142 +------
.../org/apache/iotdb/db/mpp/operator/Operator.java | 2 +-
.../db/mpp/operator/process/TransformOperator.java | 2 +-
.../mpp/operator/schema/SchemaFetchOperator.java | 2 +-
.../db/mpp/operator/source/ExchangeOperator.java | 2 +-
.../db/mpp/schedule/task/FragmentInstanceTask.java | 25 +-
.../iotdb/db/mpp/execution/DataDriverTest.java | 25 +-
.../iotdb/db/mpp/operator/LimitOperatorTest.java | 13 +-
.../db/mpp/operator/SeriesScanOperatorTest.java | 13 +-
.../db/mpp/operator/TimeJoinOperatorTest.java | 13 +-
.../operator/schema/SchemaScanOperatorTest.java | 23 +-
25 files changed, 916 insertions(+), 451 deletions(-)
diff --git a/pom.xml b/pom.xml
index 90a8dbccb4..a54d70cdb4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -495,6 +495,11 @@
<artifactId>slice</artifactId>
<version>0.41</version>
</dependency>
+ <dependency>
+ <groupId>io.airlift</groupId>
+ <artifactId>stats</artifactId>
+ <version>214</version>
+ </dependency>
<dependency>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>
diff --git a/server/pom.xml b/server/pom.xml
index e6c9c29adf..fdc67576bc 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -99,6 +99,10 @@
<groupId>io.airlift</groupId>
<artifactId>units</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.airlift</groupId>
+ <artifactId>stats</artifactId>
+ </dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>airline</artifactId>
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
index 6300c5beef..1d037678ed 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
@@ -24,8 +24,9 @@ import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.util.List;
+import java.util.Optional;
-public interface ISinkHandle extends AutoCloseable {
+public interface ISinkHandle {
/** Get the total amount of memory used by buffered tsblocks. */
long getBufferRetainedSizeInBytes();
@@ -41,7 +42,7 @@ public interface ISinkHandle extends AutoCloseable {
* the send tsblock call is ignored. This can happen with limit queries. A {@link
* RuntimeException} will be thrown if any exception happened * during the data transmission.
*/
- void send(List<TsBlock> tsBlocks) throws IOException;
+ void send(List<TsBlock> tsBlocks);
/**
* Send a {@link TsBlock} to a specific partition. If no-more-tsblocks has been set, the send
@@ -57,22 +58,32 @@ public interface ISinkHandle extends AutoCloseable {
void setNoMoreTsBlocks();
/** If the handle is closed. */
- public boolean isClosed();
+ boolean isClosed();
/**
* If no more tsblocks will be sent and all the tsblocks have been fetched by downstream fragment
* instances.
*/
- public boolean isFinished();
+ boolean isFinished();
/**
* Close the handle. The output buffer will not be cleared until all tsblocks are fetched by
* downstream instances. A {@link RuntimeException} will be thrown if any exception happened
* during the data transmission.
*/
- @Override
void close() throws IOException;
/** Abort the sink handle, discarding all tsblocks which may still be in memory buffer. */
void abort();
+
+ /**
+ * @return whether this SinkHandle is failed
+ */
+ boolean isFailed();
+
+
+ /**
+ * Returns non empty failure cause if the sink handle is failed
+ */
+ Optional<Throwable> getFailureCause();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
index af4ecf089d..b2fd70459d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
@@ -41,6 +41,7 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.StringJoiner;
import java.util.concurrent.ExecutorService;
@@ -113,11 +114,8 @@ public class SinkHandle implements ISinkHandle {
}
@Override
- public void send(List<TsBlock> tsBlocks) throws IOException {
+ public void send(List<TsBlock> tsBlocks) {
Validate.notNull(tsBlocks, "tsBlocks is null");
- if (throwable != null) {
- throw new IOException(throwable);
- }
if (closed) {
throw new IllegalStateException("Sink handle is closed.");
}
@@ -229,6 +227,16 @@ public class SinkHandle implements ISinkHandle {
logger.info("Sink handle {} is aborted", this);
}
+ @Override
+ public boolean isFailed() {
+ return throwable != null;
+ }
+
+ @Override
+ public Optional<Throwable> getFailureCause() {
+ return Optional.ofNullable(throwable);
+ }
+
@Override
public synchronized void setNoMoreTsBlocks() {
noMoreTsBlocks = true;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
index 9f3b9240c2..c4bc0e377a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
@@ -26,6 +26,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
@@ -59,7 +60,7 @@ public class StubSinkHandle implements ISinkHandle {
}
@Override
- public void send(List<TsBlock> tsBlocks) throws IOException {
+ public void send(List<TsBlock> tsBlocks) {
this.tsBlocks.addAll(tsBlocks);
}
@@ -95,6 +96,16 @@ public class StubSinkHandle implements ISinkHandle {
tsBlocks.clear();
}
+ @Override
+ public boolean isFailed() {
+ return false;
+ }
+
+ @Override
+ public Optional<Throwable> getFailureCause() {
+ return Optional.empty();
+ }
+
public List<TsBlock> getTsBlocks() {
return tsBlocks;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
index 21aebe0214..2921c9d889 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.db.mpp.execution;
-import org.apache.iotdb.commons.exception.IoTDBException;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -26,164 +26,74 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.idtable.IDTable;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
-import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.operator.Operator;
import org.apache.iotdb.db.mpp.operator.source.DataSourceOperator;
import org.apache.iotdb.db.query.control.FileReaderManager;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import io.airlift.units.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.NotThreadSafe;
-
-import java.io.IOException;
-import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
-import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
-import static org.apache.iotdb.db.mpp.operator.Operator.NOT_BLOCKED;
@NotThreadSafe
-public class DataDriver implements Driver {
-
- private static final Logger logger = LoggerFactory.getLogger(DataDriver.class);
-
- private final Operator root;
- private final ISinkHandle sinkHandle;
- private final DataDriverContext driverContext;
+public class DataDriver extends Driver {
private boolean init;
- private boolean closed;
/** closed tsfile used in this fragment instance */
private Set<TsFileResource> closedFilePaths;
/** unClosed tsfile used in this fragment instance */
private Set<TsFileResource> unClosedFilePaths;
- private final AtomicReference<SettableFuture<Void>> driverBlockedFuture = new AtomicReference<>();
public DataDriver(Operator root, ISinkHandle sinkHandle, DataDriverContext driverContext) {
- this.root = root;
- this.sinkHandle = sinkHandle;
- this.driverContext = driverContext;
+ super(root, sinkHandle, driverContext);
this.closedFilePaths = new HashSet<>();
this.unClosedFilePaths = new HashSet<>();
- // initially the driverBlockedFuture is not blocked (it is completed)
- SettableFuture<Void> future = SettableFuture.create();
- future.set(null);
- driverBlockedFuture.set(future);
- }
-
- @Override
- public boolean isFinished() {
- try {
- boolean isFinished =
- closed || (driverBlockedFuture.get().isDone() && root != null && root.isFinished());
- if (isFinished) {
- close();
- }
- return isFinished;
- } catch (Throwable t) {
- logger.error(
- "Failed to query whether the data driver {} is finished", driverContext.getId(), t);
- driverContext.failed(t);
- close();
- return true;
- }
}
@Override
- public ListenableFuture<Void> processFor(Duration duration) {
-
- SettableFuture<Void> blockedFuture = driverBlockedFuture.get();
- // initialization may be time-consuming, so we keep it in the processFor method
- // in normal case, it won't cause deadlock and should finish soon, otherwise it will be a
- // critical bug
+ protected boolean init(SettableFuture<Void> blockedFuture) {
if (!init) {
try {
initialize();
} catch (Throwable t) {
- logger.error(
+ LOGGER.error(
"Failed to do the initialization for fragment instance {} ", driverContext.getId(), t);
driverContext.failed(t);
- close();
blockedFuture.setException(t);
- return blockedFuture;
+ return false;
}
}
-
- // if the driver is blocked we don't need to continue
- if (!blockedFuture.isDone()) {
- return blockedFuture;
- }
-
- long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS);
-
- long start = System.nanoTime();
- try {
- do {
- ListenableFuture<Void> future = processInternal();
- if (!future.isDone()) {
- return updateDriverBlockedFuture(future);
- }
- } while (System.nanoTime() - start < maxRuntime && !root.isFinished());
- } catch (Throwable t) {
- logger.error("Failed to execute fragment instance {}", driverContext.getId(), t);
- driverContext.failed(t);
- close();
- blockedFuture.setException(t);
- return blockedFuture;
- }
- return NOT_BLOCKED;
- }
-
- @Override
- public FragmentInstanceId getInfo() {
- return driverContext.getId();
+ return true;
}
+ /**
+ * All file paths used by this fragment instance must be cleared and thus the usage reference must
+ * be decreased.
+ */
@Override
- public void close() {
- if (closed) {
- return;
+ protected void releaseResource() {
+ for (TsFileResource tsFile : closedFilePaths) {
+ FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, true);
}
- closed = true;
- try {
- if (root != null) {
- root.close();
- }
- if (sinkHandle != null) {
- sinkHandle.close();
- }
- } catch (Throwable t) {
- logger.error("Failed to closed driver {}", driverContext.getId(), t);
- driverContext.failed(t);
- } finally {
- removeUsedFilesForQuery();
+ closedFilePaths = null;
+ for (TsFileResource tsFile : unClosedFilePaths) {
+ FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, true);
}
+ unClosedFilePaths = null;
}
- @Override
- public void failed(Throwable t) {
- driverContext.failed(t);
- }
/**
* init seq file list and unseq file list in QueryDataSource and set it into each SourceNode TODO
* we should change all the blocked lock operation into tryLock
*/
private void initialize() throws QueryProcessException {
- List<DataSourceOperator> sourceOperators = driverContext.getSourceOperators();
+ List<DataSourceOperator> sourceOperators = ((DataDriverContext)driverContext).getSourceOperators();
if (sourceOperators != null && !sourceOperators.isEmpty()) {
QueryDataSource dataSource = initQueryDataSourceCache();
sourceOperators.forEach(
@@ -206,11 +116,12 @@ public class DataDriver implements Driver {
* QueryDataSource needed for this query
*/
public QueryDataSource initQueryDataSourceCache() throws QueryProcessException {
- DataRegion dataRegion = driverContext.getDataRegion();
+ DataDriverContext context = (DataDriverContext) driverContext;
+ DataRegion dataRegion = context.getDataRegion();
dataRegion.readLock();
try {
List<PartialPath> pathList =
- driverContext.getPaths().stream()
+ context.getPaths().stream()
.map(IDTable::translateQueryPath)
.collect(Collectors.toList());
// when all the selected series are under the same device, the QueryDataSource will be
@@ -223,7 +134,7 @@ public class DataDriver implements Driver {
pathList,
selectedDeviceIdSet.size() == 1 ? selectedDeviceIdSet.iterator().next() : null,
driverContext.getFragmentInstanceContext(),
- driverContext.getTimeFilter());
+ context.getTimeFilter());
// used files should be added before mergeLock is unlocked, or they may be deleted by
// running merge
@@ -264,28 +175,13 @@ public class DataDriver implements Driver {
}
}
- /**
- * All file paths used by this fragment instance must be cleared and thus the usage reference must
- * be decreased.
- */
- private void removeUsedFilesForQuery() {
- for (TsFileResource tsFile : closedFilePaths) {
- FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, true);
- }
- closedFilePaths = null;
- for (TsFileResource tsFile : unClosedFilePaths) {
- FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, true);
- }
- unClosedFilePaths = null;
- }
-
/**
* Increase the usage reference of filePath of job id. Before the invoking of this method, <code>
* this.setqueryIdForCurrentRequestThread</code> has been invoked, so <code>
* sealedFilePathsMap.get(queryId)</code> or <code>unsealedFilePathsMap.get(queryId)</code> must
* not return null.
*/
- void addFilePathToMap(TsFileResource tsFile, boolean isClosed) {
+ private void addFilePathToMap(TsFileResource tsFile, boolean isClosed) {
Set<TsFileResource> pathSet = isClosed ? closedFilePaths : unClosedFilePaths;
if (!pathSet.contains(tsFile)) {
pathSet.add(tsFile);
@@ -293,38 +189,4 @@ public class DataDriver implements Driver {
}
}
- private ListenableFuture<Void> processInternal() throws IOException, IoTDBException {
- ListenableFuture<Void> blocked = root.isBlocked();
- if (!blocked.isDone()) {
- return blocked;
- }
- blocked = sinkHandle.isFull();
- if (!blocked.isDone()) {
- return blocked;
- }
- if (root.hasNext()) {
- TsBlock tsBlock = root.next();
- if (tsBlock != null && !tsBlock.isEmpty()) {
- sinkHandle.send(Collections.singletonList(tsBlock));
- }
- }
- return NOT_BLOCKED;
- }
-
- private ListenableFuture<Void> updateDriverBlockedFuture(
- ListenableFuture<Void> sourceBlockedFuture) {
- // driverBlockedFuture will be completed as soon as the sourceBlockedFuture is completed
- // or any of the operators gets a memory revocation request
- SettableFuture<Void> newDriverBlockedFuture = SettableFuture.create();
- driverBlockedFuture.set(newDriverBlockedFuture);
- sourceBlockedFuture.addListener(() -> newDriverBlockedFuture.set(null), directExecutor());
-
- // TODO Although we don't have memory management for operator now, we should consider it for
- // future
- // it's possible that memory revoking is requested for some operator
- // before we update driverBlockedFuture above and we don't want to miss that
- // notification, so we check to see whether that's the case before returning.
-
- return newDriverBlockedFuture;
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java
index f211ce593c..fd2fef8b3a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java
@@ -18,25 +18,88 @@
*/
package org.apache.iotdb.db.mpp.execution;
-import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
-
+import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import io.airlift.units.Duration;
+import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.operator.Operator;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.io.Closeable;
+import javax.annotation.concurrent.GuardedBy;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Throwables.throwIfUnchecked;
+import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
+import static java.lang.Boolean.TRUE;
+import static org.apache.iotdb.db.mpp.operator.Operator.NOT_BLOCKED;
/**
* Driver encapsulates some methods which are necessary for execution scheduler to run a fragment
* instance
*/
-public interface Driver extends Closeable {
+public abstract class Driver {
+
+ protected static final Logger LOGGER = LoggerFactory.getLogger(Driver.class);
+
+
+ protected final Operator root;
+ protected final ISinkHandle sinkHandle;
+ protected final DriverContext driverContext;
+ protected final AtomicReference<SettableFuture<Void>> driverBlockedFuture = new AtomicReference<>();
+ protected final AtomicReference<State> state = new AtomicReference<>(State.ALIVE);
+
+ protected final DriverLock exclusiveLock = new DriverLock();
+
+ protected enum State {
+ ALIVE, NEED_DESTRUCTION, DESTROYED
+ }
+
+ public Driver(Operator root, ISinkHandle sinkHandle, DriverContext driverContext) {
+ this.root = root;
+ this.sinkHandle = sinkHandle;
+ this.driverContext = driverContext;
+
+ // initially the driverBlockedFuture is not blocked (it is completed)
+ SettableFuture<Void> future = SettableFuture.create();
+ future.set(null);
+ driverBlockedFuture.set(future);
+ }
/**
* Used to judge whether this fragment instance should be scheduled for execution anymore
*
* @return true if the FragmentInstance is done or terminated due to failure, otherwise false.
*/
- boolean isFinished();
+ public boolean isFinished() {
+ checkLockNotHeld("Cannot check finished status while holding the driver lock");
+
+ // if we can get the lock, attempt a clean shutdown; otherwise someone else will shutdown
+ Optional<Boolean> result = tryWithLockUnInterruptibly(this::isFinishedInternal);
+ return result.orElseGet(() -> state.get() != State.ALIVE || driverContext.isDone());
+ }
+
+ /**
+ * do initialization
+ *
+ * @return true if init succeed, false otherwise
+ */
+ protected abstract boolean init(SettableFuture<Void> blockedFuture);
+
+ /**
+ * release resource this driver used
+ */
+ protected abstract void releaseResource();
/**
* run the fragment instance for {@param duration} time slice, the time of this run is likely not
@@ -44,27 +107,351 @@ public interface Driver extends Closeable {
*
* @param duration how long should this fragment instance run
* @return the returned ListenableFuture<Void> is used to represent status of this processing if
- * isDone() return true, meaning that this fragment instance is not blocked and is ready for
- * next processing otherwise, meaning that this fragment instance is blocked and not ready for
- * next processing.
+ * isDone() return true, meaning that this fragment instance is not blocked and is ready for
+ * next processing otherwise, meaning that this fragment instance is blocked and not ready for
+ * next processing.
*/
- ListenableFuture<Void> processFor(Duration duration);
+ public ListenableFuture<Void> processFor(Duration duration) {
+
+ SettableFuture<Void> blockedFuture = driverBlockedFuture.get();
+ // initialization may be time-consuming, so we keep it in the processFor method
+ // in normal case, it won't cause deadlock and should finish soon, otherwise it will be a
+ // critical bug
+ if (!init(blockedFuture)) {
+ return blockedFuture;
+ }
+
+ // if the driver is blocked we don't need to continue
+ if (!blockedFuture.isDone()) {
+ return blockedFuture;
+ }
+
+ long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS);
+
+ Optional<ListenableFuture<Void>> result = tryWithLock(100, TimeUnit.MILLISECONDS, true, () -> {
+ long start = System.nanoTime();
+ do {
+ ListenableFuture<Void> future = processInternal();
+ if (!future.isDone()) {
+ return updateDriverBlockedFuture(future);
+ }
+ }
+ while (System.nanoTime() - start < maxRuntime && !isFinishedInternal());
+ return NOT_BLOCKED;
+ });
+
+ return result.orElse(NOT_BLOCKED);
+ }
/**
* the id information about this Fragment Instance.
*
* @return a {@link FragmentInstanceId} instance.
*/
- FragmentInstanceId getInfo();
+ public FragmentInstanceId getInfo() {
+ return driverContext.getId();
+ }
+
+ /**
+ * clear resource used by this fragment instance
+ */
+ public void close() {
+ // mark the service for destruction
+ if (!state.compareAndSet(State.ALIVE, State.NEED_DESTRUCTION)) {
+ return;
+ }
- /** clear resource used by this fragment instance */
- @Override
- void close();
+ exclusiveLock.interruptCurrentOwner();
+
+ // if we can get the lock, attempt a clean shutdown; otherwise someone else will shutdown
+ tryWithLockUnInterruptibly(() -> TRUE);
+ }
/**
* fail current driver
*
* @param t reason cause this failure
*/
- void failed(Throwable t);
+ public void failed(Throwable t) {
+ driverContext.failed(t);
+ }
+
+ public ISinkHandle getSinkHandle() {
+ return sinkHandle;
+ }
+
+ @GuardedBy("exclusiveLock")
+ private boolean isFinishedInternal() {
+ checkLockHeld("Lock must be held to call isFinishedInternal");
+
+ boolean finished = state.get() != State.ALIVE || driverContext.isDone() || root == null || root.isFinished();
+ if (finished) {
+ state.compareAndSet(State.ALIVE, State.NEED_DESTRUCTION);
+ }
+ return finished;
+ }
+
+
+ private ListenableFuture<Void> processInternal() {
+ try {
+ ListenableFuture<Void> blocked = root.isBlocked();
+ if (!blocked.isDone()) {
+ return blocked;
+ }
+ blocked = sinkHandle.isFull();
+ if (!blocked.isDone()) {
+ return blocked;
+ }
+ if (root.hasNext()) {
+ TsBlock tsBlock = root.next();
+ if (tsBlock != null && !tsBlock.isEmpty()) {
+ sinkHandle.send(Collections.singletonList(tsBlock));
+ }
+ }
+ return NOT_BLOCKED;
+ } catch (Throwable t) {
+ LOGGER.error("Failed to execute fragment instance {}", driverContext.getId(), t);
+ List<StackTraceElement> interrupterStack = exclusiveLock.getInterrupterStack();
+ if (interrupterStack == null) {
+ driverContext.failed(t);
+ throw t;
+ }
+
+ // Driver thread was interrupted which should only happen if the task is already finished.
+ // If this becomes the actual cause of a failed query there is a bug in the task state machine.
+ Exception exception = new Exception("Interrupted By");
+ exception.setStackTrace(interrupterStack.toArray(new StackTraceElement[0]));
+ RuntimeException newException = new RuntimeException("Driver was interrupted", exception);
+ newException.addSuppressed(t);
+ driverContext.failed(newException);
+ throw newException;
+ }
+ }
+
+ private ListenableFuture<Void> updateDriverBlockedFuture(
+ ListenableFuture<Void> sourceBlockedFuture) {
+ // driverBlockedFuture will be completed as soon as the sourceBlockedFuture is completed
+ // or any of the operators gets a memory revocation request
+ SettableFuture<Void> newDriverBlockedFuture = SettableFuture.create();
+ driverBlockedFuture.set(newDriverBlockedFuture);
+ sourceBlockedFuture.addListener(() -> newDriverBlockedFuture.set(null), directExecutor());
+
+ // TODO Although we don't have memory management for operator now, we should consider it for
+ // future
+ // it's possible that memory revoking is requested for some operator
+ // before we update driverBlockedFuture above and we don't want to miss that
+ // notification, so we check to see whether that's the case before returning.
+
+ return newDriverBlockedFuture;
+ }
+
+
+ private synchronized void checkLockNotHeld(String message) {
+ checkState(!exclusiveLock.isHeldByCurrentThread(), message);
+ }
+
+ @GuardedBy("exclusiveLock")
+ private synchronized void checkLockHeld(String message) {
+ checkState(exclusiveLock.isHeldByCurrentThread(), message);
+ }
+
+ /**
+ * Try to acquire the {@code exclusiveLock} immediately and run a {@code task}
+ * The task will not be interrupted if the {@code Driver} is closed.
+ * <p>
+ * Note: task cannot return null
+ */
+ private <T> Optional<T> tryWithLockUnInterruptibly(Supplier<T> task) {
+ return tryWithLock(0, TimeUnit.MILLISECONDS, false, task);
+ }
+
+ /**
+ * Try to acquire the {@code exclusiveLock} with {@code timeout} and run a {@code task}.
+ * If the {@code interruptOnClose} flag is set to {@code true} the {@code task} will be
+ * interrupted if the {@code Driver} is closed.
+ * <p>
+ * Note: task cannot return null
+ */
+ private <T> Optional<T> tryWithLock(long timeout, TimeUnit unit, boolean interruptOnClose, Supplier<T> task) {
+ checkLockNotHeld("Lock cannot be reacquired");
+
+ boolean acquired = false;
+ try {
+ acquired = exclusiveLock.tryLock(timeout, unit, interruptOnClose);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ if (!acquired) {
+ return Optional.empty();
+ }
+
+ Optional<T> result;
+ try {
+ result = Optional.of(task.get());
+ } finally {
+ try {
+ destroyIfNecessary();
+ } finally {
+ exclusiveLock.unlock();
+ }
+ }
+
+ return result;
+ }
+
+ @GuardedBy("exclusiveLock")
+ private void destroyIfNecessary() {
+ checkLockHeld("Lock must be held to call destroyIfNecessary");
+
+ if (!state.compareAndSet(State.NEED_DESTRUCTION, State.DESTROYED)) {
+ return;
+ }
+
+ // if we get an error while closing a driver, record it and we will throw it at the end
+ Throwable inFlightException = null;
+ try {
+ inFlightException = closeAndDestroyOperators();
+ driverContext.finished();
+ } catch (Throwable t) {
+ // this shouldn't happen but be safe
+ inFlightException = addSuppressedException(
+ inFlightException,
+ t,
+ "Error destroying driver for task %s",
+ driverContext.getId());
+ } finally {
+ releaseResource();
+ }
+
+ if (inFlightException != null) {
+ // this will always be an Error or Runtime
+ throwIfUnchecked(inFlightException);
+ throw new RuntimeException(inFlightException);
+ }
+ }
+
+ private Throwable closeAndDestroyOperators() {
+ // record the current interrupted status (and clear the flag); we'll reset it later
+ boolean wasInterrupted = Thread.interrupted();
+
+ Throwable inFlightException = null;
+
+ try {
+ if (root != null) {
+ root.close();
+ }
+ if (sinkHandle != null) {
+ sinkHandle.close();
+ }
+ } catch (InterruptedException t) {
+ // don't record the stack
+ wasInterrupted = true;
+ } catch (Throwable t) {
+ // TODO currently, we won't know exact operator which is failed in closing
+ inFlightException = addSuppressedException(
+ inFlightException,
+ t,
+ "Error closing operator {} for fragment instance {}",
+ root.getOperatorContext().getOperatorId(),
+ driverContext.getId());
+ } finally {
+ // reset the interrupted flag
+ if (wasInterrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ return inFlightException;
+ }
+
+ private static Throwable addSuppressedException(Throwable inFlightException, Throwable newException, String message, Object... args) {
+ if (newException instanceof Error) {
+ if (inFlightException == null) {
+ inFlightException = newException;
+ } else {
+ // Self-suppression not permitted
+ if (inFlightException != newException) {
+ inFlightException.addSuppressed(newException);
+ }
+ }
+ } else {
+ // log normal exceptions instead of rethrowing them
+ LOGGER.error(message, args, newException);
+ }
+ return inFlightException;
+ }
+
+ private static class DriverLock {
+ private final ReentrantLock lock = new ReentrantLock();
+
+ @GuardedBy("this")
+ private Thread currentOwner;
+ @GuardedBy("this")
+ private boolean currentOwnerInterruptionAllowed;
+
+ @GuardedBy("this")
+ private List<StackTraceElement> interrupterStack;
+
+ public boolean isHeldByCurrentThread() {
+ return lock.isHeldByCurrentThread();
+ }
+
+ public boolean tryLock(boolean currentThreadInterruptionAllowed) {
+ checkState(!lock.isHeldByCurrentThread(), "Lock is not reentrant");
+ boolean acquired = lock.tryLock();
+ if (acquired) {
+ setOwner(currentThreadInterruptionAllowed);
+ }
+ return acquired;
+ }
+
+ public boolean tryLock(long timeout, TimeUnit unit, boolean currentThreadInterruptionAllowed)
+ throws InterruptedException {
+ checkState(!lock.isHeldByCurrentThread(), "Lock is not reentrant");
+ boolean acquired = lock.tryLock(timeout, unit);
+ if (acquired) {
+ setOwner(currentThreadInterruptionAllowed);
+ }
+ return acquired;
+ }
+
+ private synchronized void setOwner(boolean interruptionAllowed) {
+ checkState(lock.isHeldByCurrentThread(), "Current thread does not hold lock");
+ currentOwner = Thread.currentThread();
+ currentOwnerInterruptionAllowed = interruptionAllowed;
+ // NOTE: We do not use interrupted stack information to know that another
+ // thread has attempted to interrupt the driver, and interrupt this new lock
+ // owner. The interrupted stack information is for debugging purposes only.
+ // In the case of interruption, the caller should (and does) have a separate
+ // state to prevent further processing in the Driver.
+ }
+
+ public synchronized void unlock() {
+ checkState(lock.isHeldByCurrentThread(), "Current thread does not hold lock");
+ currentOwner = null;
+ currentOwnerInterruptionAllowed = false;
+ lock.unlock();
+ }
+
+ public synchronized List<StackTraceElement> getInterrupterStack() {
+ return interrupterStack;
+ }
+
+ public synchronized void interruptCurrentOwner() {
+ if (!currentOwnerInterruptionAllowed) {
+ return;
+ }
+ // there is a benign race condition here were the lock holder
+ // can be change between attempting to get lock and grabbing
+ // the synchronized lock here, but in either case we want to
+ // interrupt the lock holder thread
+ if (interrupterStack == null) {
+ interrupterStack = ImmutableList.copyOf(Thread.currentThread().getStackTrace());
+ }
+
+ if (currentOwner != null) {
+ currentOwner.interrupt();
+ }
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
index 8c20a2c334..d0bc3bb232 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
@@ -20,10 +20,15 @@ package org.apache.iotdb.db.mpp.execution;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import java.util.concurrent.atomic.AtomicBoolean;
+
public class DriverContext {
private final FragmentInstanceContext fragmentInstanceContext;
+ private final AtomicBoolean finished = new AtomicBoolean();
+
+
public DriverContext(FragmentInstanceContext fragmentInstanceContext) {
this.fragmentInstanceContext = fragmentInstanceContext;
}
@@ -38,13 +43,15 @@ public class DriverContext {
public void failed(Throwable cause) {
fragmentInstanceContext.failed(cause);
+ finished.set(true);
}
- public void finish() {
- fragmentInstanceContext.finish();
+ public void finished() {
+ finished.compareAndSet(false, true);
}
- public void flushing() {
- fragmentInstanceContext.flushing();
+
+ public boolean isDone() {
+ return finished.get();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
index b2e51be2bc..c3d33dbd74 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
@@ -28,9 +28,11 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
public class FragmentInstanceContext extends QueryContext {
@@ -41,14 +43,19 @@ public class FragmentInstanceContext extends QueryContext {
// TODO if we split one fragment instance into multiple pipelines to run, we need to replace it
// with CopyOnWriteArrayList or some other thread safe data structure
private final List<OperatorContext> operatorContexts = new ArrayList<>();
- private final long createNanos = System.nanoTime();
private DriverContext driverContext;
- // TODO we may use StateMachine<FragmentInstanceState> to replace it
- private final AtomicReference<FragmentInstanceState> state;
+ private final FragmentInstanceStateMachine stateMachine;
+
+ private final long createNanos = System.nanoTime();
+
+ private final AtomicLong startNanos = new AtomicLong();
+ private final AtomicLong endNanos = new AtomicLong();
- private long endTime = -1;
+ private final AtomicReference<Long> executionStartTime = new AtomicReference<>();
+ private final AtomicReference<Long> lastExecutionStartTime = new AtomicReference<>();
+ private final AtomicReference<Long> executionEndTime = new AtomicReference<>();
// private final GcMonitor gcMonitor;
// private final AtomicLong startNanos = new AtomicLong();
@@ -59,9 +66,43 @@ public class FragmentInstanceContext extends QueryContext {
// private final AtomicLong endFullGcTimeNanos = new AtomicLong(-1);
public FragmentInstanceContext(
- FragmentInstanceId id, AtomicReference<FragmentInstanceState> state) {
+ FragmentInstanceId id, FragmentInstanceStateMachine stateMachine) {
this.id = id;
- this.state = state;
+ this.stateMachine = stateMachine;
+ }
+
+ public void start() {
+ long now = System.currentTimeMillis();
+ executionStartTime.compareAndSet(null, now);
+ startNanos.compareAndSet(0, System.nanoTime());
+
+ // always update last execution start time
+ lastExecutionStartTime.set(now);
+ }
+
+ // the state change listener is added here in a separate initialize() method
+ // instead of the constructor to prevent leaking the "this" reference to
+ // another thread, which will cause unsafe publication of this instance.
+ private void initialize() {
+ stateMachine.addStateChangeListener(this::updateStatsIfDone);
+ }
+
+ private void updateStatsIfDone(FragmentInstanceState newState) {
+ if (newState.isDone()) {
+ long now = System.currentTimeMillis();
+
+ // before setting the end times, make sure a start has been recorded
+ executionStartTime.compareAndSet(null, now);
+ startNanos.compareAndSet(0, System.nanoTime());
+
+ // Only update last start time, if the nothing was started
+ lastExecutionStartTime.compareAndSet(null, now);
+
+ // use compare and set from initial value to avoid overwriting if there
+ // were a duplicate notification, which shouldn't happen
+ executionEndTime.compareAndSet(null, now);
+ endNanos.compareAndSet(0, System.nanoTime());
+ }
}
public OperatorContext addOperatorContext(
@@ -98,40 +139,10 @@ public class FragmentInstanceContext extends QueryContext {
}
public void failed(Throwable cause) {
- LOGGER.warn("Fragment Instance {} failed.", id, cause);
- state.set(FragmentInstanceState.FAILED);
- }
-
- public void cancel() {
- state.set(FragmentInstanceState.CANCELED);
- this.endTime = System.currentTimeMillis();
- }
-
- public void abort() {
- state.set(FragmentInstanceState.ABORTED);
- this.endTime = System.currentTimeMillis();
- }
-
- public void finish() {
- if (state.get().isDone()) {
- return;
- }
- state.set(FragmentInstanceState.FINISHED);
- this.endTime = System.currentTimeMillis();
- }
-
- public void flushing() {
- if (state.get().isDone()) {
- return;
- }
- state.set(FragmentInstanceState.FLUSHING);
+ stateMachine.failed(cause);
}
public long getEndTime() {
- return endTime;
- }
-
- public void setEndTime(long endTime) {
- this.endTime = endTime;
+ return executionEndTime.get();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
index 21e0cc3d50..b00ddcadb3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
@@ -18,14 +18,17 @@
*/
package org.apache.iotdb.db.mpp.execution;
+import io.airlift.stats.CounterStat;
+import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.schedule.IFragmentInstanceScheduler;
import com.google.common.collect.ImmutableList;
-import java.util.concurrent.atomic.AtomicReference;
-
import static java.util.Objects.requireNonNull;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.ABORTED;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.CANCELLED;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.FAILED;
public class FragmentInstanceExecution {
@@ -36,23 +39,35 @@ public class FragmentInstanceExecution {
private final Driver driver;
- // TODO we may use StateMachine<FragmentInstanceState> to replace it
- private final AtomicReference<FragmentInstanceState> state;
+ private final ISinkHandle sinkHandle;
+
+ private final FragmentInstanceStateMachine stateMachine;
private long lastHeartbeat;
- public FragmentInstanceExecution(
+ public static FragmentInstanceExecution createFragmentInstanceExecution(IFragmentInstanceScheduler scheduler,
+ FragmentInstanceId instanceId,
+ FragmentInstanceContext context,
+ Driver driver,
+ FragmentInstanceStateMachine stateMachine,
+ CounterStat failedInstances) {
+ FragmentInstanceExecution execution = new FragmentInstanceExecution(scheduler, instanceId, context, driver, stateMachine);
+ execution.initialize(failedInstances, scheduler, instanceId, driver);
+ return execution;
+ }
+
+ private FragmentInstanceExecution(
IFragmentInstanceScheduler scheduler,
FragmentInstanceId instanceId,
FragmentInstanceContext context,
Driver driver,
- AtomicReference<FragmentInstanceState> state) {
+ FragmentInstanceStateMachine stateMachine) {
this.scheduler = scheduler;
this.instanceId = instanceId;
this.context = context;
this.driver = driver;
- this.state = state;
- state.set(FragmentInstanceState.RUNNING);
+ this.sinkHandle = driver.getSinkHandle();
+ this.stateMachine = stateMachine;
scheduler.submitFragmentInstances(instanceId.getQueryId(), ImmutableList.of(driver));
}
@@ -65,24 +80,66 @@ public class FragmentInstanceExecution {
}
public FragmentInstanceState getInstanceState() {
- return state.get();
+ return stateMachine.getState();
}
public FragmentInstanceInfo getInstanceInfo() {
- return new FragmentInstanceInfo(state.get(), context.getEndTime());
+ return new FragmentInstanceInfo(stateMachine.getState(), context.getEndTime());
}
public void failed(Throwable cause) {
requireNonNull(cause, "cause is null");
- context.failed(cause);
+ stateMachine.failed(cause);
}
public void cancel() {
- context.cancel();
+ stateMachine.cancel();
}
public void abort() {
- scheduler.abortFragmentInstance(instanceId);
- context.abort();
+ stateMachine.abort();
+ }
+
+ // this is a separate method to ensure that the `this` reference is not leaked during construction
+ private void initialize(CounterStat failedInstances, IFragmentInstanceScheduler scheduler, FragmentInstanceId instanceId, Driver driver) {
+ requireNonNull(failedInstances, "failedInstances is null");
+ stateMachine.addStateChangeListener(newState -> {
+
+ if (!newState.isDone()) {
+ return;
+ }
+
+ // Update failed tasks counter
+ if (newState == FAILED) {
+ failedInstances.update(1);
+ }
+
+ driver.close();
+ sinkHandle.abort();
+ scheduler.abortFragmentInstance(instanceId);
+ });
+ }
+
+ private synchronized void instanceCompletion() {
+ if (stateMachine.getState().isDone()) {
+ return;
+ }
+
+ if (!sinkHandle.isFinished()) {
+ stateMachine.transitionToFlushing();
+ return;
+ }
+
+ if (sinkHandle.isFinished()) {
+ // Cool! All done!
+ stateMachine.finished();
+ return;
+ }
+
+ if (sinkHandle.isFailed()) {
+ Throwable failureCause = sinkHandle.getFailureCause()
+ .orElseGet(() -> new RuntimeException("Fragment Instance " + instanceId + " 's SinkHandle is failed but the failure cause is missing"));
+ stateMachine.failed(failureCause);
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceFailureListener.java
similarity index 58%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceFailureListener.java
index 8c20a2c334..3f99848c29 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceFailureListener.java
@@ -20,31 +20,6 @@ package org.apache.iotdb.db.mpp.execution;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
-public class DriverContext {
-
- private final FragmentInstanceContext fragmentInstanceContext;
-
- public DriverContext(FragmentInstanceContext fragmentInstanceContext) {
- this.fragmentInstanceContext = fragmentInstanceContext;
- }
-
- public FragmentInstanceId getId() {
- return fragmentInstanceContext.getId();
- }
-
- public FragmentInstanceContext getFragmentInstanceContext() {
- return fragmentInstanceContext;
- }
-
- public void failed(Throwable cause) {
- fragmentInstanceContext.failed(cause);
- }
-
- public void finish() {
- fragmentInstanceContext.finish();
- }
-
- public void flushing() {
- fragmentInstanceContext.flushing();
- }
+public interface FragmentInstanceFailureListener {
+ void onTaskFailed(FragmentInstanceId taskId, Throwable failure);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
index 537f330f04..5f49fa47ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.execution;
+import io.airlift.stats.CounterStat;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
@@ -33,11 +34,12 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
import static java.util.Objects.requireNonNull;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceExecution.createFragmentInstanceExecution;
public class FragmentInstanceManager {
@@ -49,9 +51,14 @@ public class FragmentInstanceManager {
private final IFragmentInstanceScheduler scheduler = FragmentInstanceScheduler.getInstance();
private final ScheduledExecutorService instanceManagementExecutor;
+ private final ExecutorService instanceNotificationExecutor;
+
private final Duration infoCacheTime;
+ // record failed instances count
+ private final CounterStat failedInstances = new CounterStat();
+
public static FragmentInstanceManager getInstance() {
return FragmentInstanceManager.InstanceHolder.INSTANCE;
}
@@ -61,13 +68,14 @@ public class FragmentInstanceManager {
this.instanceExecution = new ConcurrentHashMap<>();
this.instanceManagementExecutor =
IoTDBThreadPoolFactory.newScheduledThreadPool(1, "instance-management");
+ this.instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(4, "instance-notification");
this.infoCacheTime = new Duration(15, TimeUnit.MINUTES);
instanceManagementExecutor.scheduleWithFixedDelay(
() -> {
try {
- removeOldTasks();
+ removeOldInstances();
} catch (Throwable e) {
logger.warn("Error removing old tasks", e);
}
@@ -85,13 +93,13 @@ public class FragmentInstanceManager {
instanceExecution.computeIfAbsent(
instanceId,
id -> {
- AtomicReference<FragmentInstanceState> state = new AtomicReference<>();
- state.set(FragmentInstanceState.PLANNED);
+
+ FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
FragmentInstanceContext context =
instanceContext.computeIfAbsent(
instanceId,
- fragmentInstanceId -> new FragmentInstanceContext(fragmentInstanceId, state));
+ fragmentInstanceId -> new FragmentInstanceContext(fragmentInstanceId, stateMachine));
try {
DataDriver driver =
@@ -100,9 +108,9 @@ public class FragmentInstanceManager {
context,
instance.getTimeFilter(),
dataRegion);
- return new FragmentInstanceExecution(scheduler, instanceId, context, driver, state);
+ return createFragmentInstanceExecution(scheduler, instanceId, context, driver, stateMachine, failedInstances);
} catch (Throwable t) {
- context.failed(t);
+ stateMachine.failed(t);
return null;
}
});
@@ -118,26 +126,29 @@ public class FragmentInstanceManager {
instanceExecution.computeIfAbsent(
instanceId,
id -> {
- AtomicReference<FragmentInstanceState> state = new AtomicReference<>();
- state.set(FragmentInstanceState.PLANNED);
+ FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+
FragmentInstanceContext context =
instanceContext.computeIfAbsent(
instanceId,
- fragmentInstanceId -> new FragmentInstanceContext(fragmentInstanceId, state));
+ fragmentInstanceId -> new FragmentInstanceContext(fragmentInstanceId, stateMachine));
try {
SchemaDriver driver =
planner.plan(instance.getFragment().getRoot(), context, schemaRegion);
- return new FragmentInstanceExecution(scheduler, instanceId, context, driver, state);
+ return createFragmentInstanceExecution(scheduler, instanceId, context, driver, stateMachine, failedInstances);
} catch (Throwable t) {
- context.failed(t);
+ stateMachine.failed(t);
return null;
}
});
return execution != null ? execution.getInstanceInfo() : createFailedInstanceInfo(instanceId);
}
+ /**
+ * Aborts a FragmentInstance.
+ */
public FragmentInstanceInfo abortFragmentInstance(FragmentInstanceId fragmentInstanceId) {
FragmentInstanceExecution execution = instanceExecution.remove(fragmentInstanceId);
if (execution != null) {
@@ -148,6 +159,21 @@ public class FragmentInstanceManager {
return null;
}
+ /**
+ * Cancels a FragmentInstance.
+ */
+ public FragmentInstanceInfo cancelTask(FragmentInstanceId instanceId) {
+ requireNonNull(instanceId, "taskId is null");
+
+ FragmentInstanceExecution execution = instanceExecution.remove(instanceId);
+ if (execution != null) {
+ instanceContext.remove(instanceId);
+ execution.cancel();
+ return execution.getInstanceInfo();
+ }
+ return null;
+ }
+
/**
* Gets the info for the specified fragment instance.
*
@@ -163,12 +189,16 @@ public class FragmentInstanceManager {
return execution.getInstanceInfo();
}
+ public CounterStat getFailedInstances() {
+ return failedInstances;
+ }
+
private FragmentInstanceInfo createFailedInstanceInfo(FragmentInstanceId instanceId) {
return new FragmentInstanceInfo(
FragmentInstanceState.FAILED, instanceContext.get(instanceId).getEndTime());
}
- private void removeOldTasks() {
+ private void removeOldInstances() {
long oldestAllowedInstance = System.currentTimeMillis() - infoCacheTime.toMillis();
instanceContext
.entrySet()
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceState.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceState.java
index d55af83e98..d1a182bcbd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceState.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceState.java
@@ -40,7 +40,7 @@ public enum FragmentInstanceState {
/** Instance has finished executing and all output has been consumed. */
FINISHED(true, false),
/** Instance was canceled by a user. */
- CANCELED(true, true),
+ CANCELLED(true, true),
/** Instance was aborted due to a failure in the query. The failure was not in this instance. */
ABORTED(true, true),
/** Instance execution failed. */
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceStateMachine.java
new file mode 100644
index 0000000000..7f66d1a321
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceStateMachine.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.iotdb.db.mpp.execution.StateMachine.StateChangeListener;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.util.concurrent.Futures.immediateFuture;
+import static java.util.Objects.requireNonNull;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.ABORTED;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.CANCELLED;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.FAILED;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.FINISHED;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.FLUSHING;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.RUNNING;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.TERMINAL_INSTANCE_STATES;
+
+
+@ThreadSafe
+public class FragmentInstanceStateMachine {
+ private static final Logger LOGGER = LoggerFactory.getLogger(FragmentInstanceStateMachine.class);
+
+ private final long createdTime = System.currentTimeMillis();
+
+ private final FragmentInstanceId instanceId;
+ private final Executor executor;
+ private final StateMachine<FragmentInstanceState> instanceState;
+ private final LinkedBlockingQueue<Throwable> failureCauses = new LinkedBlockingQueue<>();
+
+ @GuardedBy("this")
+ private final Map<FragmentInstanceId, Throwable> sourceInstanceFailures = new HashMap<>();
+ @GuardedBy("this")
+ private final List<FragmentInstanceFailureListener> sourceInstanceFailureListeners = new ArrayList<>();
+
+ public FragmentInstanceStateMachine(FragmentInstanceId fragmentInstanceId, Executor executor) {
+ this.instanceId = requireNonNull(fragmentInstanceId, "fragmentInstanceId is null");
+ this.executor = requireNonNull(executor, "executor is null");
+ instanceState = new StateMachine<>("FragmentInstance " + fragmentInstanceId, executor, RUNNING, TERMINAL_INSTANCE_STATES);
+ instanceState.addStateChangeListener(newState -> LOGGER.debug("Fragment Instance {} is {}", fragmentInstanceId, newState));
+ }
+
+ public long getCreatedTime() {
+ return createdTime;
+ }
+
+ public FragmentInstanceId getFragmentInstanceId() {
+ return instanceId;
+ }
+
+ public FragmentInstanceState getState() {
+ return instanceState.get();
+ }
+
+ public ListenableFuture<FragmentInstanceState> getStateChange(FragmentInstanceState currentState) {
+ requireNonNull(currentState, "currentState is null");
+ checkArgument(!currentState.isDone(), "Current state is already done");
+
+ ListenableFuture<FragmentInstanceState> future = instanceState.getStateChange(currentState);
+ FragmentInstanceState state = instanceState.get();
+ if (state.isDone()) {
+ return immediateFuture(state);
+ }
+ return future;
+ }
+
+ public LinkedBlockingQueue<Throwable> getFailureCauses() {
+ return failureCauses;
+ }
+
+ public void transitionToFlushing() {
+ instanceState.setIf(FLUSHING, currentState -> currentState == RUNNING);
+ }
+
+ public void finished() {
+ transitionToDoneState(FINISHED);
+ }
+
+ public void cancel() {
+ transitionToDoneState(CANCELLED);
+ }
+
+ public void abort() {
+ transitionToDoneState(ABORTED);
+ }
+
+ public void failed(Throwable cause) {
+ failureCauses.add(cause);
+ transitionToDoneState(FAILED);
+ }
+
+ private void transitionToDoneState(FragmentInstanceState doneState) {
+ requireNonNull(doneState, "doneState is null");
+ checkArgument(doneState.isDone(), "doneState %s is not a done state", doneState);
+
+ instanceState.setIf(doneState, currentState -> !currentState.isDone());
+ }
+
+ /**
+ * Listener is always notified asynchronously using a dedicated notification thread pool so, care should
+ * be taken to avoid leaking {@code this} when adding a listener in a constructor. Additionally, it is
+ * possible notifications are observed out of order due to the asynchronous execution.
+ */
+ public void addStateChangeListener(StateChangeListener<FragmentInstanceState> stateChangeListener) {
+ instanceState.addStateChangeListener(stateChangeListener);
+ }
+
+ public void addSourceTaskFailureListener(FragmentInstanceFailureListener listener) {
+ Map<FragmentInstanceId, Throwable> failures;
+ synchronized (this) {
+ sourceInstanceFailureListeners.add(listener);
+ failures = ImmutableMap.copyOf(sourceInstanceFailures);
+ }
+ executor.execute(() -> {
+ failures.forEach(listener::onTaskFailed);
+ });
+ }
+
+ public void sourceTaskFailed(FragmentInstanceId instanceId, Throwable failure) {
+ List<FragmentInstanceFailureListener> listeners;
+ synchronized (this) {
+ sourceInstanceFailures.putIfAbsent(instanceId, failure);
+ listeners = ImmutableList.copyOf(sourceInstanceFailureListeners);
+ }
+ executor.execute(() -> {
+ for (FragmentInstanceFailureListener listener : listeners) {
+ listener.onTaskFailed(instanceId, failure);
+ }
+ });
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this)
+ .add("FragmentInstanceId", instanceId)
+ .add("FragmentInstanceState", instanceState)
+ .add("failureCauses", failureCauses)
+ .toString();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
index 4020db4593..6c45eb529a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
@@ -18,155 +18,27 @@
*/
package org.apache.iotdb.db.mpp.execution;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
-import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.operator.Operator;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import io.airlift.units.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.NotThreadSafe;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
-import static org.apache.iotdb.db.mpp.operator.Operator.NOT_BLOCKED;
-
@NotThreadSafe
-public class SchemaDriver implements Driver {
-
- private static final Logger logger = LoggerFactory.getLogger(SchemaDriver.class);
-
- private final Operator root;
- private final ISinkHandle sinkHandle;
- private final SchemaDriverContext driverContext;
-
- private final AtomicReference<SettableFuture<Void>> driverBlockedFuture = new AtomicReference<>();
-
- private boolean closed = false;
+public class SchemaDriver extends Driver {
public SchemaDriver(Operator root, ISinkHandle sinkHandle, SchemaDriverContext driverContext) {
- this.root = root;
- this.sinkHandle = sinkHandle;
- this.driverContext = driverContext;
- // initially the driverBlockedFuture is not blocked (it is completed)
- SettableFuture<Void> future = SettableFuture.create();
- future.set(null);
- driverBlockedFuture.set(future);
- }
-
- @Override
- public boolean isFinished() {
- try {
- boolean isFinished = driverBlockedFuture.get().isDone() && root != null && root.isFinished();
- if (isFinished) {
- close();
- }
- return isFinished;
- } catch (Throwable t) {
- logger.error(
- "Failed to query whether the schema driver {} is finished", driverContext.getId(), t);
- driverContext.failed(t);
- return true;
- }
+ super(root, sinkHandle, driverContext);
}
- @Override
- public ListenableFuture<Void> processFor(Duration duration) {
- // if the driver is blocked we don't need to continue
- SettableFuture<Void> blockedFuture = driverBlockedFuture.get();
- if (!blockedFuture.isDone()) {
- return blockedFuture;
- }
-
- long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS);
-
- long start = System.nanoTime();
- try {
- do {
- ListenableFuture<Void> future = processInternal();
- if (!future.isDone()) {
- return updateDriverBlockedFuture(future);
- }
- } while (System.nanoTime() - start < maxRuntime && !root.isFinished());
- } catch (Throwable t) {
- logger.error("Failed to execute fragment instance {}", driverContext.getId(), t);
- driverContext.failed(t);
- close();
- blockedFuture.setException(t);
- return blockedFuture;
- }
- return NOT_BLOCKED;
- }
-
- private ListenableFuture<Void> processInternal() throws IOException {
- ListenableFuture<Void> blocked = root.isBlocked();
- if (!blocked.isDone()) {
- return blocked;
- }
- blocked = sinkHandle.isFull();
- if (!blocked.isDone()) {
- return blocked;
- }
- if (root.hasNext()) {
- TsBlock tsBlock = root.next();
- if (tsBlock != null && !tsBlock.isEmpty()) {
- sinkHandle.send(Collections.singletonList(tsBlock));
- }
- }
- return NOT_BLOCKED;
- }
-
- private ListenableFuture<Void> updateDriverBlockedFuture(
- ListenableFuture<Void> sourceBlockedFuture) {
- // driverBlockedFuture will be completed as soon as the sourceBlockedFuture is completed
- // or any of the operators gets a memory revocation request
- SettableFuture<Void> newDriverBlockedFuture = SettableFuture.create();
- driverBlockedFuture.set(newDriverBlockedFuture);
- sourceBlockedFuture.addListener(() -> newDriverBlockedFuture.set(null), directExecutor());
-
- // TODO Although we don't have memory management for operator now, we should consider it for
- // future
- // it's possible that memory revoking is requested for some operator
- // before we update driverBlockedFuture above and we don't want to miss that
- // notification, so we check to see whether that's the case before returning.
-
- return newDriverBlockedFuture;
- }
-
- @Override
- public FragmentInstanceId getInfo() {
- return driverContext.getId();
- }
@Override
- public void close() {
- if (closed) {
- return;
- }
- closed = true;
- try {
- if (root != null) {
- root.close();
- }
- if (sinkHandle != null) {
- sinkHandle.close();
- }
- } catch (Throwable t) {
- logger.error("Failed to closed driver {}", driverContext.getId(), t);
- driverContext.failed(t);
- }
+ protected boolean init(SettableFuture<Void> blockedFuture) {
+ return true;
}
@Override
- public void failed(Throwable t) {
- driverContext.failed(t);
+ protected void releaseResource() {
+ // do nothing
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
index c8cccc0520..7305feee83 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
@@ -53,5 +53,5 @@ public interface Operator extends AutoCloseable {
/**
* Is this operator completely finished processing and no more output TsBlock will be produced.
*/
- boolean isFinished() throws IOException;
+ boolean isFinished();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java
index 48d0c71185..cd3355d395 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java
@@ -217,7 +217,7 @@ public class TransformOperator implements ProcessOperator {
}
@Override
- public boolean isFinished() throws IOException {
+ public boolean isFinished() {
return !hasNext();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/SchemaFetchOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/SchemaFetchOperator.java
index 003da76f2d..97da0a4935 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/SchemaFetchOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/SchemaFetchOperator.java
@@ -92,7 +92,7 @@ public class SchemaFetchOperator implements SourceOperator {
}
@Override
- public boolean isFinished() throws IOException {
+ public boolean isFinished() {
return isFinished;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/ExchangeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/ExchangeOperator.java
index 722f63543d..397b7dc461 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/ExchangeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/ExchangeOperator.java
@@ -65,7 +65,7 @@ public class ExchangeOperator implements SourceOperator {
}
@Override
- public boolean isFinished() throws IOException {
+ public boolean isFinished() {
return sourceHandle.isFinished();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
index e30b1f15bc..888f4ce5e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
@@ -18,10 +18,14 @@
*/
package org.apache.iotdb.db.mpp.schedule.task;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.Driver;
+import org.apache.iotdb.db.mpp.execution.DriverContext;
+import org.apache.iotdb.db.mpp.operator.Operator;
import org.apache.iotdb.db.mpp.schedule.ExecutionContext;
import org.apache.iotdb.db.mpp.schedule.FragmentInstanceTaskExecutor;
import org.apache.iotdb.db.mpp.schedule.queue.ID;
@@ -175,17 +179,31 @@ public class FragmentInstanceTask implements IDIndexedAccessible {
}
}
- private static class StubFragmentInstance implements Driver {
+ private static class StubFragmentInstance extends Driver {
private static final QueryId stubQueryId = new QueryId("stub_query");
private static final FragmentInstanceId stubInstance =
new FragmentInstanceId(new PlanFragmentId(stubQueryId, 0), "stub-instance");
+ public StubFragmentInstance() {
+ super(null, null, null);
+ }
+
@Override
public boolean isFinished() {
return false;
}
+ @Override
+ protected boolean init(SettableFuture<Void> blockedFuture) {
+ return true;
+ }
+
+ @Override
+ protected void releaseResource() {
+
+ }
+
@Override
public ListenableFuture<Void> processFor(Duration duration) {
return null;
@@ -201,5 +219,10 @@ public class FragmentInstanceTask implements IDIndexedAccessible {
@Override
public void failed(Throwable t) {}
+
+ @Override
+ public ISinkHandle getSinkHandle() {
+ return null;
+ }
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
index 2703d0aa37..db060ac405 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.execution;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -55,6 +56,7 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.iotdb.db.mpp.schedule.FragmentInstanceTaskExecutor.EXECUTION_TIME_SLICE;
@@ -85,6 +87,7 @@ public class DataDriverTest {
@Test
public void batchTest() {
+ ExecutorService instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
MeasurementPath measurementPath1 =
new MeasurementPath(DATA_DRIVER_TEST_SG + ".device0.sensor0", TSDataType.INT32);
@@ -92,11 +95,10 @@ public class DataDriverTest {
allSensors.add("sensor0");
allSensors.add("sensor1");
QueryId queryId = new QueryId("stub_query");
- AtomicReference<FragmentInstanceState> state =
- new AtomicReference<>(FragmentInstanceState.RUNNING);
+ FragmentInstanceId instanceId = new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
FragmentInstanceContext fragmentInstanceContext =
- new FragmentInstanceContext(
- new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
+ new FragmentInstanceContext(instanceId, stateMachine);
PlanNodeId planNodeId1 = new PlanNodeId("1");
fragmentInstanceContext.addOperatorContext(
1, planNodeId1, SeriesScanOperator.class.getSimpleName());
@@ -159,19 +161,20 @@ public class DataDriverTest {
ImmutableList.of(seriesScanOperator1, seriesScanOperator2));
StubSinkHandle sinkHandle = new StubSinkHandle(fragmentInstanceContext);
-
- try (Driver dataDriver = new DataDriver(limitOperator, sinkHandle, driverContext)) {
+ Driver dataDriver = null;
+ try {
+ dataDriver = new DataDriver(limitOperator, sinkHandle, driverContext);
assertEquals(fragmentInstanceContext.getId(), dataDriver.getInfo());
assertFalse(dataDriver.isFinished());
while (!dataDriver.isFinished()) {
- assertEquals(FragmentInstanceState.RUNNING, state.get());
+ assertEquals(FragmentInstanceState.RUNNING, stateMachine.getState());
ListenableFuture<Void> blocked = dataDriver.processFor(EXECUTION_TIME_SLICE);
assertTrue(blocked.isDone());
}
- assertEquals(FragmentInstanceState.FLUSHING, state.get());
+ assertEquals(FragmentInstanceState.FLUSHING, stateMachine.getState());
List<TsBlock> result = sinkHandle.getTsBlocks();
assertEquals(13, result.size());
@@ -204,10 +207,16 @@ public class DataDriverTest {
}
}
}
+ } finally {
+ if (dataDriver != null) {
+ dataDriver.close();
+ }
}
} catch (IllegalPathException | QueryProcessException e) {
e.printStackTrace();
fail();
+ } finally {
+ instanceNotificationExecutor.shutdown();
}
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
index 6ad74bd110..ab0849ddf2 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.operator;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -28,6 +29,7 @@ import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
import org.apache.iotdb.db.mpp.operator.process.LimitOperator;
import org.apache.iotdb.db.mpp.operator.process.TimeJoinOperator;
import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
@@ -50,6 +52,7 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
@@ -78,6 +81,7 @@ public class LimitOperatorTest {
@Test
public void batchTest() {
+ ExecutorService instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
MeasurementPath measurementPath1 =
new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
@@ -85,11 +89,10 @@ public class LimitOperatorTest {
allSensors.add("sensor0");
allSensors.add("sensor1");
QueryId queryId = new QueryId("stub_query");
- AtomicReference<FragmentInstanceState> state =
- new AtomicReference<>(FragmentInstanceState.RUNNING);
+ FragmentInstanceId instanceId = new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
FragmentInstanceContext fragmentInstanceContext =
- new FragmentInstanceContext(
- new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
+ new FragmentInstanceContext(instanceId, stateMachine);
PlanNodeId planNodeId1 = new PlanNodeId("1");
fragmentInstanceContext.addOperatorContext(
1, planNodeId1, SeriesScanOperator.class.getSimpleName());
@@ -169,6 +172,8 @@ public class LimitOperatorTest {
} catch (IllegalPathException e) {
e.printStackTrace();
fail();
+ } finally {
+ instanceNotificationExecutor.shutdown();
}
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
index 3793504970..dd28a90263 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.operator;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -28,6 +29,7 @@ import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
@@ -46,6 +48,7 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
@@ -73,17 +76,17 @@ public class SeriesScanOperatorTest {
@Test
public void batchTest() {
+ ExecutorService instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
MeasurementPath measurementPath =
new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
Set<String> allSensors = new HashSet<>();
allSensors.add("sensor0");
QueryId queryId = new QueryId("stub_query");
- AtomicReference<FragmentInstanceState> state =
- new AtomicReference<>(FragmentInstanceState.RUNNING);
+ FragmentInstanceId instanceId = new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
FragmentInstanceContext fragmentInstanceContext =
- new FragmentInstanceContext(
- new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
+ new FragmentInstanceContext(instanceId, stateMachine);
PlanNodeId planNodeId = new PlanNodeId("1");
fragmentInstanceContext.addOperatorContext(
1, planNodeId, SeriesScanOperator.class.getSimpleName());
@@ -123,6 +126,8 @@ public class SeriesScanOperatorTest {
} catch (IllegalPathException e) {
e.printStackTrace();
fail();
+ } finally {
+ instanceNotificationExecutor.shutdown();
}
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
index 5534418b84..70fe4def02 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.operator;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -28,6 +29,7 @@ import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
import org.apache.iotdb.db.mpp.operator.process.TimeJoinOperator;
import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -49,6 +51,7 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.*;
@@ -74,6 +77,7 @@ public class TimeJoinOperatorTest {
@Test
public void batchTest() {
+ ExecutorService instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
MeasurementPath measurementPath1 =
new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
@@ -81,11 +85,10 @@ public class TimeJoinOperatorTest {
allSensors.add("sensor0");
allSensors.add("sensor1");
QueryId queryId = new QueryId("stub_query");
- AtomicReference<FragmentInstanceState> state =
- new AtomicReference<>(FragmentInstanceState.RUNNING);
+ FragmentInstanceId instanceId = new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
FragmentInstanceContext fragmentInstanceContext =
- new FragmentInstanceContext(
- new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
+ new FragmentInstanceContext(instanceId, stateMachine);
PlanNodeId planNodeId1 = new PlanNodeId("1");
fragmentInstanceContext.addOperatorContext(
1, planNodeId1, SeriesScanOperator.class.getSimpleName());
@@ -155,6 +158,8 @@ public class TimeJoinOperatorTest {
} catch (IllegalPathException e) {
e.printStackTrace();
fail();
+ } finally {
+ instanceNotificationExecutor.shutdown();
}
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java
index 609dc9befa..320be3be21 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.operator.schema;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.LocalConfigNode;
@@ -29,6 +30,7 @@ import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
import org.apache.iotdb.db.mpp.execution.SchemaDriverContext;
import org.apache.iotdb.db.mpp.operator.OperatorContext;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -52,6 +54,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_ATTRIBUTES;
@@ -90,13 +93,13 @@ public class SchemaScanOperatorTest {
@Test
public void testDeviceMetaScanOperator() {
+ ExecutorService instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
QueryId queryId = new QueryId("stub_query");
- AtomicReference<FragmentInstanceState> state =
- new AtomicReference<>(FragmentInstanceState.RUNNING);
+ FragmentInstanceId instanceId = new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
FragmentInstanceContext fragmentInstanceContext =
- new FragmentInstanceContext(
- new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
+ new FragmentInstanceContext(instanceId, stateMachine);
PlanNodeId planNodeId = queryId.genPlanNodeId();
OperatorContext operatorContext =
fragmentInstanceContext.addOperatorContext(
@@ -149,18 +152,20 @@ public class SchemaScanOperatorTest {
} catch (MetadataException e) {
e.printStackTrace();
fail();
+ } finally {
+ instanceNotificationExecutor.shutdown();
}
}
@Test
public void testTimeSeriesMetaScanOperator() {
+ ExecutorService instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
QueryId queryId = new QueryId("stub_query");
- AtomicReference<FragmentInstanceState> state =
- new AtomicReference<>(FragmentInstanceState.RUNNING);
+ FragmentInstanceId instanceId = new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
FragmentInstanceContext fragmentInstanceContext =
- new FragmentInstanceContext(
- new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
+ new FragmentInstanceContext(instanceId, stateMachine);
PlanNodeId planNodeId = queryId.genPlanNodeId();
OperatorContext operatorContext =
fragmentInstanceContext.addOperatorContext(
@@ -237,6 +242,8 @@ public class SchemaScanOperatorTest {
} catch (MetadataException e) {
e.printStackTrace();
fail();
+ } finally {
+ instanceNotificationExecutor.shutdown();
}
}
}