You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/20 02:23:57 UTC
[iotdb] branch master updated: Fix some issues in MPP framework (#5596)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1d0b0ec288 Fix some issues in MPP framework (#5596)
1d0b0ec288 is described below
commit 1d0b0ec28803de7b48f6bb995beddecd1f82fc01
Author: Zhang.Jinrui <xi...@gmail.com>
AuthorDate: Wed Apr 20 10:23:49 2022 +0800
Fix some issues in MPP framework (#5596)
---
.../iotdb/commons/partition/DataPartition.java | 17 +--
.../{PartitionInfo.java => Partition.java} | 27 +++--
.../iotdb/commons/partition/SchemaPartition.java | 17 +--
.../iotdb/db/mpp/buffer/DataBlockManager.java | 14 ++-
.../org/apache/iotdb/db/mpp/buffer/SinkHandle.java | 10 +-
.../apache/iotdb/db/mpp/buffer/SourceHandle.java | 27 ++---
.../apache/iotdb/db/mpp/execution/Coordinator.java | 16 +--
.../iotdb/db/mpp/execution/IQueryExecution.java | 2 +
.../iotdb/db/mpp/execution/QueryExecution.java | 43 ++++++-
.../iotdb/db/mpp/execution/QueryStateMachine.java | 14 ++-
.../db/mpp/execution/config/ConfigExecution.java | 58 ++++------
.../config/ConfigTaskVisitor.java} | 24 ++--
.../iotdb/db/mpp/execution/config/IConfigTask.java | 2 +-
.../mpp/execution/scheduler/ClusterScheduler.java | 31 +++--
.../mpp/execution/scheduler/IQueryTerminator.java | 4 +-
.../scheduler/SimpleFragInstanceDispatcher.java | 51 ++++-----
.../execution/scheduler/SimpleQueryTerminator.java | 50 ++++-----
.../db/mpp/operator/source/SeriesScanOperator.java | 2 +-
.../db/mpp/schedule/FragmentInstanceScheduler.java | 2 +-
.../db/mpp/sql/analyze/ClusterSchemaFetcher.java | 4 +-
.../db/mpp/sql/planner/DistributionPlanner.java | 20 ++++
.../statement/ConfigStatement.java} | 12 +-
.../db/mpp/sql/statement/StatementVisitor.java | 5 +
.../metadata/SetStorageGroupStatement.java | 10 +-
.../iotdb/db/query/control/SessionManager.java | 7 +-
.../thrift/impl/DataNodeTSIServiceImpl.java | 11 +-
.../apache/iotdb/db/utils/QueryDataSetUtils.java | 2 +-
.../iotdb/db/mpp/buffer/SourceHandleTest.java | 6 +-
.../db/mpp/execution/ConfigExecutionTest.java | 125 +++++++++++++++++++++
.../db/mpp/sql/plan/DistributionPlannerTest.java | 39 +++++++
30 files changed, 430 insertions(+), 222 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
index bb62f1cea4..e1d24db22d 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
@@ -18,8 +18,6 @@
*/
package org.apache.iotdb.commons.partition;
-import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
-
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -28,18 +26,14 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-public class DataPartition {
-
- private String seriesSlotExecutorName;
- private int seriesPartitionSlotNum;
+public class DataPartition extends Partition {
// Map<StorageGroup, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionMessage>>>>
private Map<String, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>>
dataPartitionMap;
public DataPartition(String seriesSlotExecutorName, int seriesPartitionSlotNum) {
- this.seriesSlotExecutorName = seriesSlotExecutorName;
- this.seriesPartitionSlotNum = seriesPartitionSlotNum;
+ super(seriesSlotExecutorName, seriesPartitionSlotNum);
}
public DataPartition(
@@ -104,13 +98,6 @@ public class DataPartition {
return regions.get(0);
}
- private SeriesPartitionSlot calculateDeviceGroupId(String deviceName) {
- SeriesPartitionExecutor executor =
- SeriesPartitionExecutor.getSeriesPartitionExecutor(
- seriesSlotExecutorName, seriesPartitionSlotNum);
- return executor.getSeriesPartitionSlot(deviceName);
- }
-
private String getStorageGroupByDevice(String deviceName) {
for (String storageGroup : dataPartitionMap.keySet()) {
if (deviceName.startsWith(storageGroup)) {
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/PartitionInfo.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java
similarity index 55%
rename from node-commons/src/main/java/org/apache/iotdb/commons/partition/PartitionInfo.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java
index 6e846ad6e3..e0c6e53bd1 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/PartitionInfo.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java
@@ -18,24 +18,23 @@
*/
package org.apache.iotdb.commons.partition;
-public class PartitionInfo {
+import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
- private DataPartition dataPartition;
- private SchemaPartition schemaPartition;
+public abstract class Partition {
+ protected String seriesSlotExecutorName;
+ protected int seriesPartitionSlotNum;
- public DataPartition getDataPartitionInfo() {
- return dataPartition;
- }
-
- public void setDataPartitionInfo(DataPartition dataPartition) {
- this.dataPartition = dataPartition;
- }
+ private final SeriesPartitionExecutor executor;
- public SchemaPartition getSchemaPartitionInfo() {
- return schemaPartition;
+ public Partition(String seriesSlotExecutorName, int seriesPartitionSlotNum) {
+ this.seriesSlotExecutorName = seriesSlotExecutorName;
+ this.seriesPartitionSlotNum = seriesPartitionSlotNum;
+ executor =
+ SeriesPartitionExecutor.getSeriesPartitionExecutor(
+ seriesSlotExecutorName, seriesPartitionSlotNum);
}
- public void setSchemaPartitionInfo(SchemaPartition schemaPartition) {
- this.schemaPartition = schemaPartition;
+ protected SeriesPartitionSlot calculateDeviceGroupId(String deviceName) {
+ return executor.getSeriesPartitionSlot(deviceName);
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
index f0ca88f774..b3f8c61986 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
@@ -18,24 +18,18 @@
*/
package org.apache.iotdb.commons.partition;
-import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
-
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class SchemaPartition {
-
- private String seriesSlotExecutorName;
- private int seriesPartitionSlotNum;
+public class SchemaPartition extends Partition {
// Map<StorageGroup, Map<SeriesPartitionSlot, SchemaRegionPlaceInfo>>
private Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> schemaPartitionMap;
public SchemaPartition(String seriesSlotExecutorName, int seriesPartitionSlotNum) {
- this.seriesSlotExecutorName = seriesSlotExecutorName;
- this.seriesPartitionSlotNum = seriesPartitionSlotNum;
+ super(seriesSlotExecutorName, seriesPartitionSlotNum);
}
public SchemaPartition(
@@ -64,13 +58,6 @@ public class SchemaPartition {
return schemaPartitionMap.get(storageGroup).get(seriesPartitionSlot);
}
- private SeriesPartitionSlot calculateDeviceGroupId(String deviceName) {
- SeriesPartitionExecutor executor =
- SeriesPartitionExecutor.getSeriesPartitionExecutor(
- seriesSlotExecutorName, seriesPartitionSlotNum);
- return executor.getSeriesPartitionSlot(deviceName);
- }
-
private String getStorageGroupByDevice(String deviceName) {
for (String storageGroup : schemaPartitionMap.keySet()) {
if (deviceName.startsWith(storageGroup)) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
index ab7daa5289..573771fe81 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
@@ -175,11 +175,13 @@ public class DataBlockManager implements IDataBlockManager {
.containsKey(sourceHandle.getLocalPlanNodeId())) {
logger.info(
"Resources of finished source handle {} has already been released", sourceHandle);
+ } else {
+ sourceHandles
+ .get(sourceHandle.getLocalFragmentInstanceId())
+ .remove(sourceHandle.getLocalPlanNodeId());
}
- sourceHandles
- .get(sourceHandle.getLocalFragmentInstanceId())
- .remove(sourceHandle.getLocalPlanNodeId());
- if (sourceHandles.get(sourceHandle.getLocalFragmentInstanceId()).isEmpty()) {
+ if (sourceHandles.containsKey(sourceHandle.getLocalFragmentInstanceId())
+ && sourceHandles.get(sourceHandle.getLocalFragmentInstanceId()).isEmpty()) {
sourceHandles.remove(sourceHandle.getLocalFragmentInstanceId());
}
}
@@ -264,7 +266,7 @@ public class DataBlockManager implements IDataBlockManager {
throw new IllegalStateException("Sink handle for " + localFragmentInstanceId + " exists.");
}
- logger.info(
+ logger.debug(
"Create sink handle to plan node {} of {} for {}",
remotePlanNodeId,
remoteFragmentInstanceId,
@@ -301,7 +303,7 @@ public class DataBlockManager implements IDataBlockManager {
+ " exists.");
}
- logger.info(
+ logger.debug(
"Create source handle from {} for plan node {} of {}",
remoteFragmentInstanceId,
localPlanNodeId,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
index af4ecf089d..a72dcb9cb6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
@@ -220,10 +220,12 @@ public class SinkHandle implements ISinkHandle {
synchronized (this) {
sequenceIdToTsBlock.clear();
closed = true;
- localMemoryManager
- .getQueryPool()
- .free(localFragmentInstanceId.getQueryId(), bufferRetainedSizeInBytes);
- bufferRetainedSizeInBytes = 0;
+ if (bufferRetainedSizeInBytes > 0) {
+ localMemoryManager
+ .getQueryPool()
+ .free(localFragmentInstanceId.getQueryId(), bufferRetainedSizeInBytes);
+ bufferRetainedSizeInBytes = 0;
+ }
}
sinkHandleListener.onAborted(this);
logger.info("Sink handle {} is aborted", this);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
index aece366b82..37a5bce1b0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
@@ -71,8 +71,6 @@ public class SourceHandle implements ISourceHandle {
private int currSequenceId = 0;
private int nextSequenceId = 0;
private int lastSequenceId = Integer.MAX_VALUE;
- private int numActiveGetDataBlocksTask = 0;
- private boolean noMoreTsBlocks;
private boolean closed;
private Throwable throwable;
@@ -160,7 +158,6 @@ public class SourceHandle implements ISourceHandle {
if (future.isDone()) {
nextSequenceId = endSequenceId;
executorService.submit(new GetDataBlocksTask(startSequenceId, endSequenceId, reservedBytes));
- numActiveGetDataBlocksTask += 1;
} else {
nextSequenceId = endSequenceId + 1;
// The future being not completed indicates,
@@ -175,7 +172,6 @@ public class SourceHandle implements ISourceHandle {
// Memory has been reserved. Submit a GetDataBlocksTask for these blocks.
executorService.submit(
new GetDataBlocksTask(startSequenceId, endSequenceId, reservedBytes));
- numActiveGetDataBlocksTask += 1;
}
// Submit a GetDataBlocksTask when memory is freed.
@@ -188,7 +184,6 @@ public class SourceHandle implements ISourceHandle {
sequenceIdOfUnReservedDataBlock,
sequenceIdOfUnReservedDataBlock + 1,
sizeOfUnReservedDataBlock));
- numActiveGetDataBlocksTask += 1;
bufferRetainedSizeInBytes += sizeOfUnReservedDataBlock;
},
executorService);
@@ -211,7 +206,9 @@ public class SourceHandle implements ISourceHandle {
synchronized void setNoMoreTsBlocks(int lastSequenceId) {
this.lastSequenceId = lastSequenceId;
- noMoreTsBlocks = true;
+ if (!blocked.isDone() && remoteTsBlockedConsumedUp()) {
+ blocked.set(null);
+ }
}
synchronized void updatePendingDataBlockInfo(int startSequenceId, List<Long> dataBlockSizes) {
@@ -226,6 +223,9 @@ public class SourceHandle implements ISourceHandle {
if (closed) {
return;
}
+ if (blocked != null && !blocked.isDone()) {
+ blocked.cancel(true);
+ }
sequenceIdToDataBlockSize.clear();
if (bufferRetainedSizeInBytes > 0) {
localMemoryManager
@@ -239,11 +239,14 @@ public class SourceHandle implements ISourceHandle {
@Override
public boolean isFinished() {
- return throwable == null
- && noMoreTsBlocks
- && numActiveGetDataBlocksTask == 0
- && currSequenceId - 1 == lastSequenceId
- && sequenceIdToTsBlock.isEmpty();
+ return throwable == null && remoteTsBlockedConsumedUp();
+ }
+
+ // Return true indicates two points:
+ // 1. Remote SinkHandle has told SourceHandle the total count of TsBlocks by lastSequenceId
+ // 2. All the TsBlocks has been consumed up
+ private boolean remoteTsBlockedConsumedUp() {
+ return currSequenceId - 1 == lastSequenceId;
}
String getRemoteHostname() {
@@ -355,8 +358,6 @@ public class SourceHandle implements ISourceHandle {
.free(localFragmentInstanceId.getQueryId(), reservedBytes);
}
}
- } finally {
- numActiveGetDataBlocksTask -= 1;
}
}
// TODO: try to issue another GetDataBlocksTask to make the query run faster.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index 1f97f3c93c..a7d09b3ccf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -28,10 +28,12 @@ import org.apache.iotdb.db.mpp.execution.config.ConfigExecution;
import org.apache.iotdb.db.mpp.sql.analyze.IPartitionFetcher;
import org.apache.iotdb.db.mpp.sql.analyze.ISchemaFetcher;
import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
+import org.apache.iotdb.db.mpp.sql.statement.ConfigStatement;
import org.apache.iotdb.db.mpp.sql.statement.Statement;
-import org.apache.iotdb.db.mpp.sql.statement.metadata.SetStorageGroupStatement;
import org.apache.commons.lang3.Validate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@@ -43,10 +45,12 @@ import java.util.concurrent.ScheduledExecutorService;
* QueryExecution.
*/
public class Coordinator {
+ private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class);
+
private static final String COORDINATOR_EXECUTOR_NAME = "MPPCoordinator";
- private static final int COORDINATOR_EXECUTOR_SIZE = 2;
+ private static final int COORDINATOR_EXECUTOR_SIZE = 1;
private static final String COORDINATOR_SCHEDULED_EXECUTOR_NAME = "MPPCoordinatorScheduled";
- private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 2;
+ private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 1;
private static final Endpoint LOCAL_HOST =
new Endpoint(
@@ -71,7 +75,7 @@ public class Coordinator {
MPPQueryContext queryContext,
IPartitionFetcher partitionFetcher,
ISchemaFetcher schemaFetcher) {
- if (statement instanceof SetStorageGroupStatement) {
+ if (statement instanceof ConfigStatement) {
queryContext.setQueryType(QueryType.WRITE);
return new ConfigExecution(queryContext, statement, executor);
}
@@ -94,7 +98,6 @@ public class Coordinator {
partitionFetcher,
schemaFetcher);
queryExecutionMap.put(queryId, execution);
-
execution.start();
return execution.getStatus();
@@ -125,7 +128,4 @@ public class Coordinator {
public static Coordinator getInstance() {
return INSTANCE;
}
- // private TQueryResponse executeQuery(TQueryRequest request) {
- //
- // }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
index 2e0bd9c76d..af9633ee89 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
@@ -28,6 +28,8 @@ public interface IQueryExecution {
void stop();
+ void stopAndCleanup();
+
ExecutionResult getStatus();
TsBlock getBatchResult();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index e7e4995995..a9bd605823 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -43,6 +43,8 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.SQLException;
@@ -61,6 +63,8 @@ import static com.google.common.base.Throwables.throwIfUnchecked;
* corresponding physical nodes. 3. Collect and monitor the progress/states of this query.
*/
public class QueryExecution implements IQueryExecution {
+ private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class);
+
private final MPPQueryContext context;
private IScheduler scheduler;
private final QueryStateMachine stateMachine;
@@ -97,8 +101,6 @@ public class QueryExecution implements IQueryExecution {
this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
this.partitionFetcher = partitionFetcher;
this.schemaFetcher = schemaFetcher;
- // TODO: (xingtanzjr) Initialize the result handle after the DataBlockManager is merged.
- // resultHandle = xxxx
// We add the abort logic inside the QueryExecution.
// So that the other components can only focus on the state change.
@@ -108,6 +110,13 @@ public class QueryExecution implements IQueryExecution {
return;
}
this.stop();
+ // TODO: (xingtanzjr) If the query is in abnormal state, the releaseResource() should be
+ // invoked
+ if (state == QueryState.FAILED
+ || state == QueryState.ABORTED
+ || state == QueryState.CANCELED) {
+ releaseResource();
+ }
});
}
@@ -140,7 +149,6 @@ public class QueryExecution implements IQueryExecution {
context.getQueryType(),
executor,
scheduledExecutor);
- // TODO: (xingtanzjr) how to make the schedule running asynchronously
this.scheduler.start();
}
@@ -156,16 +164,30 @@ public class QueryExecution implements IQueryExecution {
this.distributedPlan = planner.planFragments();
}
- /** Abort the query and do cleanup work including QuerySchedule aborting and resource releasing */
+ // Stop the workers for this query
public void stop() {
if (this.scheduler != null) {
this.scheduler.stop();
}
+ }
+
+ // Stop the query and clean up all the resources this query occupied
+ public void stopAndCleanup() {
+ stop();
releaseResource();
}
/** Release the resources that current QueryExecution hold. */
- private void releaseResource() {}
+ private void releaseResource() {
+ // close ResultHandle to unblock client's getResult request
+ // Actually, we should not close the ResultHandle when the QueryExecution is Finished.
+ // There are only two scenarios where the ResultHandle should be closed:
+ // 1. The client fetch all the result and the ResultHandle is finished.
+ // 2. The client's connection is closed that all owned QueryExecution should be cleaned up
+ if (resultHandle != null && resultHandle.isFinished()) {
+ resultHandle.close();
+ }
+ }
/**
* This method will be called by the request thread from client connection. This method will block
@@ -177,18 +199,23 @@ public class QueryExecution implements IQueryExecution {
@Override
public TsBlock getBatchResult() {
try {
+ if (resultHandle.isClosed() || resultHandle.isFinished()) {
+ return null;
+ }
ListenableFuture<Void> blocked = resultHandle.isBlocked();
blocked.get();
if (resultHandle.isFinished()) {
+ releaseResource();
return null;
}
return resultHandle.receive();
} catch (ExecutionException | IOException e) {
+ stateMachine.transitionToFailed(e);
throwIfUnchecked(e.getCause());
throw new RuntimeException(e.getCause());
} catch (InterruptedException e) {
- stateMachine.transitionToFailed();
+ stateMachine.transitionToFailed(e);
Thread.currentThread().interrupt();
throw new RuntimeException(new SQLException("ResultSet thread was interrupted", e));
}
@@ -272,4 +299,8 @@ public class QueryExecution implements IQueryExecution {
public boolean isQuery() {
return context.getQueryType() == QueryType.READ;
}
+
+ public String toString() {
+ return String.format("QueryExecution[%s]", context.getQueryId());
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index c5bc48ef42..e9093abf32 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -39,6 +39,7 @@ public class QueryStateMachine {
// The executor will be used in all the state machines belonged to this query.
private Executor stateMachineExecutor;
+ private Throwable failureException;
public QueryStateMachine(QueryId queryId, ExecutorService executor) {
this.name = String.format("QueryStateMachine[%s]", queryId);
@@ -60,7 +61,8 @@ public class QueryStateMachine {
this.fragInstanceStateMap.put(id, state);
// TODO: (xingtanzjr) we need to distinguish the Timeout situation
if (state.isFailed()) {
- transitionToFailed();
+ transitionToFailed(
+ new RuntimeException(String.format("FragmentInstance[%s] is failed.", id)));
}
boolean allFinished =
fragInstanceStateMap.values().stream()
@@ -120,10 +122,18 @@ public class QueryStateMachine {
queryState.set(QueryState.ABORTED);
}
- public void transitionToFailed() {
+ public void transitionToFailed(Throwable throwable) {
if (queryState.get().isDone()) {
return;
}
+ this.failureException = throwable;
queryState.set(QueryState.FAILED);
}
+
+ public String getFailureMessage() {
+ if (failureException != null) {
+ return failureException.getMessage();
+ }
+ return "no detailed failure reason in QueryStateMachine";
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java
index 7ceb542cef..b134d4896b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.execution.config;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
import org.apache.iotdb.db.mpp.execution.ExecutionResult;
@@ -26,11 +27,8 @@ import org.apache.iotdb.db.mpp.execution.IQueryExecution;
import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
import org.apache.iotdb.db.mpp.sql.statement.Statement;
-import org.apache.iotdb.db.mpp.sql.statement.metadata.SetStorageGroupStatement;
-import org.apache.iotdb.db.mpp.sql.statement.sys.AuthorStatement;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.FutureCallback;
@@ -42,8 +40,6 @@ import org.jetbrains.annotations.NotNull;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import static com.google.common.base.Throwables.throwIfInstanceOf;
-
public class ConfigExecution implements IQueryExecution {
private final MPPQueryContext context;
@@ -53,17 +49,30 @@ public class ConfigExecution implements IQueryExecution {
private final QueryStateMachine stateMachine;
private final SettableFuture<Boolean> result;
+ private final IConfigTask task;
+
public ConfigExecution(MPPQueryContext context, Statement statement, ExecutorService executor) {
this.context = context;
this.statement = statement;
this.executor = executor;
this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
this.result = SettableFuture.create();
+ this.task = statement.accept(new ConfigTaskVisitor(), new ConfigTaskVisitor.TaskContext());
+ }
+
+ @TestOnly
+ public ConfigExecution(
+ MPPQueryContext context, Statement statement, ExecutorService executor, IConfigTask task) {
+ this.context = context;
+ this.statement = statement;
+ this.executor = executor;
+ this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
+ this.result = SettableFuture.create();
+ this.task = task;
}
@Override
public void start() {
- IConfigTask task = getTask(statement);
try {
ListenableFuture<Void> future = task.execute();
Futures.addCallback(
@@ -83,34 +92,33 @@ public class ConfigExecution implements IQueryExecution {
executor);
} catch (Throwable e) {
fail(e);
- throwIfInstanceOf(e, Error.class);
}
}
public void fail(Throwable cause) {
- stateMachine.transitionToFailed();
- result.cancel(false);
+ stateMachine.transitionToFailed(cause);
+ result.set(false);
}
@Override
public void stop() {}
+ @Override
+ public void stopAndCleanup() {}
+
@Override
public ExecutionResult getStatus() {
try {
- if (result.isCancelled()) {
- return new ExecutionResult(
- context.getQueryId(), RpcUtils.getStatus(TSStatusCode.QUERY_PROCESS_ERROR));
- }
Boolean success = result.get();
TSStatusCode statusCode =
success ? TSStatusCode.SUCCESS_STATUS : TSStatusCode.QUERY_PROCESS_ERROR;
- return new ExecutionResult(context.getQueryId(), RpcUtils.getStatus(statusCode));
-
+ String message = success ? "" : stateMachine.getFailureMessage();
+ return new ExecutionResult(context.getQueryId(), RpcUtils.getStatus(statusCode, message));
} catch (InterruptedException | ExecutionException e) {
Thread.currentThread().interrupt();
return new ExecutionResult(
- context.getQueryId(), RpcUtils.getStatus(TSStatusCode.QUERY_PROCESS_ERROR));
+ context.getQueryId(),
+ RpcUtils.getStatus(TSStatusCode.QUERY_PROCESS_ERROR, e.getMessage()));
}
}
@@ -141,22 +149,4 @@ public class ConfigExecution implements IQueryExecution {
public boolean isQuery() {
return context.getQueryType() == QueryType.READ;
}
-
- // TODO: consider a more suitable implementation for it
- // Generate the corresponding IConfigTask by statement.
- // Each type of statement will has a ConfigTask
- private IConfigTask getTask(Statement statement) {
- try {
- switch (statement.getType()) {
- case SET_STORAGE_GROUP:
- return new SetStorageGroupTask((SetStorageGroupStatement) statement);
- case AUTHOR:
- return new AuthorizerConfigTask((AuthorStatement) statement);
- default:
- throw new NotImplementedException();
- }
- } catch (ClassCastException classCastException) {
- throw new NotImplementedException();
- }
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/SetStorageGroupStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigTaskVisitor.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/SetStorageGroupStatement.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigTaskVisitor.java
index de54b604e7..30cf56e998 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/SetStorageGroupStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigTaskVisitor.java
@@ -17,25 +17,23 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.sql.statement.metadata;
+package org.apache.iotdb.db.mpp.execution.config;
-import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.mpp.sql.constant.StatementType;
import org.apache.iotdb.db.mpp.sql.statement.Statement;
+import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
+import org.apache.iotdb.db.mpp.sql.statement.metadata.SetStorageGroupStatement;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
-public class SetStorageGroupStatement extends Statement {
- private PartialPath storageGroupPath;
+public class ConfigTaskVisitor
+ extends StatementVisitor<IConfigTask, ConfigTaskVisitor.TaskContext> {
- public SetStorageGroupStatement() {
- super();
- statementType = StatementType.SET_STORAGE_GROUP;
+ public IConfigTask visitStatement(Statement statement, TaskContext context) {
+ throw new NotImplementedException("ConfigTask is not implemented for: " + statement);
}
- public PartialPath getStorageGroupPath() {
- return storageGroupPath;
+ public IConfigTask visitSetStorageGroup(SetStorageGroupStatement statement, TaskContext context) {
+ return new SetStorageGroupTask(statement);
}
- public void setStorageGroupPath(PartialPath storageGroupPath) {
- this.storageGroupPath = storageGroupPath;
- }
+ public static class TaskContext {}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java
index cb3f9b7a02..fac0575fc3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java
@@ -22,5 +22,5 @@ package org.apache.iotdb.db.mpp.execution.config;
import com.google.common.util.concurrent.ListenableFuture;
public interface IConfigTask {
- ListenableFuture<Void> execute();
+ ListenableFuture<Void> execute() throws InterruptedException;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
index 29d05136e8..7fa8d3c0b6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
@@ -28,6 +28,8 @@ import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
import io.airlift.units.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.ExecutionException;
@@ -43,6 +45,8 @@ import java.util.concurrent.ScheduledExecutorService;
* this scheduler.
*/
public class ClusterScheduler implements IScheduler {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ClusterScheduler.class);
+
private MPPQueryContext queryContext;
// The stateMachine of the QueryExecution owned by this QueryScheduler
private QueryStateMachine stateMachine;
@@ -79,16 +83,20 @@ public class ClusterScheduler implements IScheduler {
@Override
public void start() {
- // TODO: consider where the state transition should be put
stateMachine.transitionToDispatching();
Future<FragInstanceDispatchResult> dispatchResultFuture = dispatcher.dispatch(instances);
// NOTICE: the FragmentInstance may be dispatched to another Host due to consensus redirect.
// So we need to start the state fetcher after the dispatching stage.
- boolean success = waitDispatchingFinished(dispatchResultFuture);
- // If the dispatch failed, we make the QueryState as failed, and return.
- if (!success) {
- stateMachine.transitionToFailed();
+ try {
+ FragInstanceDispatchResult result = dispatchResultFuture.get();
+ if (!result.isSuccessful()) {
+ stateMachine.transitionToFailed(new IllegalStateException("Fragment cannot be dispatched"));
+ return;
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ // If the dispatch failed, we make the QueryState as failed, and return.
+ stateMachine.transitionToFailed(e);
return;
}
@@ -110,19 +118,6 @@ public class ClusterScheduler implements IScheduler {
this.stateTracker.start();
}
- private boolean waitDispatchingFinished(Future<FragInstanceDispatchResult> dispatchResultFuture) {
- try {
- FragInstanceDispatchResult result = dispatchResultFuture.get();
- if (result.isSuccessful()) {
- return true;
- }
- } catch (InterruptedException | ExecutionException e) {
- Thread.currentThread().interrupt();
- // TODO: (xingtanzjr) record the dispatch failure reason.
- }
- return false;
- }
-
@Override
public void stop() {
// TODO: It seems that it is unnecessary to check whether they are null or not. Is it a best
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IQueryTerminator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IQueryTerminator.java
index 25ff71363b..29a1e27cfd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IQueryTerminator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IQueryTerminator.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.mpp.execution.scheduler;
+import java.util.concurrent.Future;
+
public interface IQueryTerminator {
- boolean terminate();
+ Future<Boolean> terminate();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
index 17a193d9b4..09ff1171b9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
@@ -28,13 +28,16 @@ import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
-
+ private static final Logger LOGGER = LoggerFactory.getLogger(SimpleFragInstanceDispatcher.class);
private final ExecutorService executor;
public SimpleFragInstanceDispatcher(ExecutorService exeutor) {
@@ -46,33 +49,27 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
return executor.submit(
() -> {
TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp(false);
- try {
- for (FragmentInstance instance : instances) {
- // TODO: (jackie tien) change the port
- InternalService.Iface client =
- InternalServiceClientFactory.getInternalServiceClient(
- new Endpoint(
- instance.getHostEndpoint().getIp(),
- IoTDBDescriptor.getInstance().getConfig().getInternalPort()));
- // TODO: (xingtanzjr) consider how to handle the buffer here
- ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
- instance.serializeRequest(buffer);
- buffer.flip();
- TConsensusGroupId groupId =
- new TConsensusGroupId(
- instance.getRegionReplicaSet().getConsensusGroupId().getId(),
- instance.getRegionReplicaSet().getConsensusGroupId().getType().toString());
- TSendFragmentInstanceReq req =
- new TSendFragmentInstanceReq(
- new TFragmentInstance(buffer), groupId, instance.getType().toString());
- resp = client.sendFragmentInstance(req);
- if (!resp.accepted) {
- break;
- }
+ for (FragmentInstance instance : instances) {
+ InternalService.Iface client =
+ InternalServiceClientFactory.getInternalServiceClient(
+ new Endpoint(
+ instance.getHostEndpoint().getIp(),
+ IoTDBDescriptor.getInstance().getConfig().getInternalPort()));
+ // TODO: (xingtanzjr) consider how to handle the buffer here
+ ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
+ instance.serializeRequest(buffer);
+ buffer.flip();
+ TConsensusGroupId groupId =
+ new TConsensusGroupId(
+ instance.getRegionReplicaSet().getConsensusGroupId().getId(),
+ instance.getRegionReplicaSet().getConsensusGroupId().getType().toString());
+ TSendFragmentInstanceReq req =
+ new TSendFragmentInstanceReq(
+ new TFragmentInstance(buffer), groupId, instance.getType().toString());
+ resp = client.sendFragmentInstance(req);
+ if (!resp.accepted) {
+ break;
}
- } catch (Exception e) {
- // TODO: (xingtanzjr) add more details
- return new FragInstanceDispatchResult(false);
}
return new FragInstanceDispatchResult(resp.accepted);
});
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
index 728966daea..5ffe02080a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
@@ -27,15 +27,16 @@ import org.apache.iotdb.mpp.rpc.thrift.InternalService;
import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.List;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
public class SimpleQueryTerminator implements IQueryTerminator {
-
+ private static final Logger LOGGER = LoggerFactory.getLogger(SimpleQueryTerminator.class);
private final ExecutorService executor;
private final QueryId queryId;
private final List<FragmentInstance> fragmentInstances;
@@ -48,33 +49,26 @@ public class SimpleQueryTerminator implements IQueryTerminator {
}
@Override
- public boolean terminate() {
+ public Future<Boolean> terminate() {
List<Endpoint> relatedHost = getRelatedHost(fragmentInstances);
- Future<Boolean> future =
- executor.submit(
- () -> {
- try {
- for (Endpoint endpoint : relatedHost) {
- // TODO (jackie tien) change the port
- InternalService.Iface client =
- InternalServiceClientFactory.getInternalServiceClient(
- new Endpoint(
- endpoint.getIp(),
- IoTDBDescriptor.getInstance().getConfig().getInternalPort()));
- client.cancelQuery(new TCancelQueryReq(queryId.getId()));
- }
- } catch (TException e) {
- return false;
- }
- return true;
- });
- try {
- return future.get();
- } catch (InterruptedException | ExecutionException e) {
- // TODO: (xingtanzjr) Record the error info with logger
- Thread.currentThread().interrupt();
- return false;
- }
+
+ return executor.submit(
+ () -> {
+ try {
+ for (Endpoint endpoint : relatedHost) {
+ // TODO (jackie tien) change the port
+ InternalService.Iface client =
+ InternalServiceClientFactory.getInternalServiceClient(
+ new Endpoint(
+ endpoint.getIp(),
+ IoTDBDescriptor.getInstance().getConfig().getInternalPort()));
+ client.cancelQuery(new TCancelQueryReq(queryId.getId()));
+ }
+ } catch (TException e) {
+ return false;
+ }
+ return true;
+ });
}
private List<Endpoint> getRelatedHost(List<FragmentInstance> instances) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
index 9e9946c323..01b39506fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
@@ -115,7 +115,7 @@ public class SeriesScanOperator implements DataSourceOperator {
@Override
public boolean isFinished() {
- return finished || (finished = hasNext());
+ return finished || (finished = !hasNext());
}
private boolean readChunkData() throws IOException {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
index c7bdb95285..875894f47f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
@@ -66,7 +66,7 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
private static final int MAX_CAPACITY = 1000; // TODO: load from config files
private static final int WORKER_THREAD_NUM = 4; // TODO: load from config files
- private static final int QUERY_TIMEOUT_MS = 60_000; // TODO: load from config files or requests
+ private static final int QUERY_TIMEOUT_MS = 10_000; // TODO: load from config files or requests
private final ThreadGroup workerGroups;
private final List<AbstractExecutor> threads;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
index 3877706717..929ca01a55 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
@@ -78,7 +78,9 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
throw new RuntimeException("cannot fetch schema, status is: " + executionResult.status);
}
TsBlock tsBlock = coordinator.getQueryExecution(queryId).getBatchResult();
- // TODO: (xingtanzjr) need to release this query's resource here
+ // TODO: (xingtanzjr) need to release this query's resource here. This is a temporary way
+ coordinator.getQueryExecution(queryId).stopAndCleanup();
+
SchemaTree result = new SchemaTree();
result.setStorageGroups(storageGroups);
Binary binary;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index ca42ec2336..83b5e950d6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -174,6 +174,26 @@ public class DistributionPlanner {
return root;
}
+ // TODO: (xingtanzjr) a temporary way to resolve the distribution of single SeriesScanNode issue
+ @Override
+ public PlanNode visitSeriesScan(SeriesScanNode node, DistributionPlanContext context) {
+ List<RegionReplicaSet> dataDistribution =
+ analysis.getPartitionInfo(node.getSeriesPath(), node.getTimeFilter());
+ if (dataDistribution.size() == 1) {
+ node.setRegionReplicaSet(dataDistribution.get(0));
+ return node;
+ }
+ TimeJoinNode timeJoinNode =
+ new TimeJoinNode(context.queryContext.getQueryId().genPlanNodeId(), node.getScanOrder());
+ for (RegionReplicaSet dataRegion : dataDistribution) {
+ SeriesScanNode split = (SeriesScanNode) node.clone();
+ split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+ split.setRegionReplicaSet(dataRegion);
+ timeJoinNode.addChild(split);
+ }
+ return timeJoinNode;
+ }
+
@Override
public PlanNode visitTimeJoin(TimeJoinNode node, DistributionPlanContext context) {
TimeJoinNode root = (TimeJoinNode) node.clone();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/ConfigStatement.java
similarity index 73%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/ConfigStatement.java
index cb3f9b7a02..304401bc0c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/ConfigStatement.java
@@ -17,10 +17,10 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.execution.config;
+package org.apache.iotdb.db.mpp.sql.statement;
-import com.google.common.util.concurrent.ListenableFuture;
-
-public interface IConfigTask {
- ListenableFuture<Void> execute();
-}
+/**
+ * ConfigStatement represents the statement which should be executed by ConfigNode All the
+ * statements which need to be transformed into IConfigTask should extend this class
+ */
+public abstract class ConfigStatement extends Statement {}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java
index 42e88eaa23..14f4606d0d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.mpp.sql.statement.metadata.AlterTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateAlignedTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.SchemaFetchStatement;
+import org.apache.iotdb.db.mpp.sql.statement.metadata.SetStorageGroupStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.ShowDevicesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.ShowStorageGroupStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.ShowTimeSeriesStatement;
@@ -83,6 +84,10 @@ public abstract class StatementVisitor<R, C> {
return visitStatement(alterTimeSeriesStatement, context);
}
+ public R visitSetStorageGroup(SetStorageGroupStatement alterTimeSeriesStatement, C context) {
+ return visitStatement(alterTimeSeriesStatement, context);
+ }
+
/** Data Manipulation Language (DML) */
// Select Statement
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/SetStorageGroupStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/SetStorageGroupStatement.java
index de54b604e7..b99b1363f2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/SetStorageGroupStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/SetStorageGroupStatement.java
@@ -21,9 +21,10 @@ package org.apache.iotdb.db.mpp.sql.statement.metadata;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.sql.constant.StatementType;
-import org.apache.iotdb.db.mpp.sql.statement.Statement;
+import org.apache.iotdb.db.mpp.sql.statement.ConfigStatement;
+import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
-public class SetStorageGroupStatement extends Statement {
+public class SetStorageGroupStatement extends ConfigStatement {
private PartialPath storageGroupPath;
public SetStorageGroupStatement() {
@@ -35,6 +36,11 @@ public class SetStorageGroupStatement extends Statement {
return storageGroupPath;
}
+ @Override
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitSetStorageGroup(this, context);
+ }
+
public void setStorageGroupPath(PartialPath storageGroupPath) {
this.storageGroupPath = storageGroupPath;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
index 20aa1d1b86..a8a0faca92 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -46,6 +46,7 @@ import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
@@ -215,6 +216,10 @@ public class SessionManager {
}
public boolean releaseSessionResource(long sessionId) {
+ return releaseSessionResource(sessionId, this::releaseQueryResourceNoExceptions);
+ }
+
+ public boolean releaseSessionResource(long sessionId, Consumer<Long> releaseQueryResource) {
sessionIdToZoneId.remove(sessionId);
sessionIdToClientVersion.remove(sessionId);
@@ -224,7 +229,7 @@ public class SessionManager {
Set<Long> queryIdSet = statementIdToQueryId.remove(statementId);
if (queryIdSet != null) {
for (Long queryId : queryIdSet) {
- releaseQueryResourceNoExceptions(queryId);
+ releaseQueryResource.accept(queryId);
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
index 2c13c68ad7..3de0a28cc2 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
@@ -562,7 +562,7 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
ExecutionResult result =
COORDINATOR.execute(
statement,
- new QueryId(String.valueOf(queryId)),
+ genQueryId(queryId),
SESSION_MANAGER.getSessionInfo(req.sessionId),
"",
PARTITION_FETCHER,
@@ -756,8 +756,17 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
public void handleClientExit() {
Long sessionId = SESSION_MANAGER.getCurrSessionId();
if (sessionId != null) {
+ SESSION_MANAGER.releaseSessionResource(sessionId, this::cleanupQueryExecution);
TSCloseSessionReq req = new TSCloseSessionReq(sessionId);
closeSession(req);
}
}
+
+ private void cleanupQueryExecution(Long queryId) {
+ COORDINATOR.getQueryExecution(genQueryId(queryId)).stopAndCleanup();
+ }
+
+ private QueryId genQueryId(long id) {
+ return new QueryId(String.valueOf(id));
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index 506f180049..eb07ba32cd 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -192,7 +192,7 @@ public class QueryDataSetUtils {
int rowCount = 0;
int[] valueOccupation = new int[columnNum];
- while (rowCount < fetchSize && queryExecution.hasNextResult()) {
+ while (rowCount < fetchSize) {
TsBlock tsBlock = queryExecution.getBatchResult();
if (tsBlock == null) {
break;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
index 3b9783023f..461fac1e13 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
@@ -152,7 +152,7 @@ public class SourceHandleTest {
// Receive EndOfDataBlock event from upstream fragment instance.
sourceHandle.setNoMoreTsBlocks(numOfMockTsBlock - 1);
- Assert.assertFalse(sourceHandle.isBlocked().isDone());
+ Assert.assertTrue(sourceHandle.isBlocked().isDone());
Assert.assertFalse(sourceHandle.isClosed());
Assert.assertTrue(sourceHandle.isFinished());
Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
@@ -298,7 +298,7 @@ public class SourceHandleTest {
// Receive EndOfDataBlock event from upstream fragment instance.
sourceHandle.setNoMoreTsBlocks(numOfMockTsBlock - 1);
- Assert.assertFalse(sourceHandle.isBlocked().isDone());
+ Assert.assertTrue(sourceHandle.isBlocked().isDone());
Assert.assertFalse(sourceHandle.isClosed());
Assert.assertTrue(sourceHandle.isFinished());
Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
@@ -488,7 +488,7 @@ public class SourceHandleTest {
// Receive EndOfDataBlock event from upstream fragment instance.
sourceHandle.setNoMoreTsBlocks(3 * numOfMockTsBlock - 1);
- Assert.assertFalse(sourceHandle.isBlocked().isDone());
+ Assert.assertTrue(sourceHandle.isBlocked().isDone());
Assert.assertFalse(sourceHandle.isClosed());
Assert.assertTrue(sourceHandle.isFinished());
Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
new file mode 100644
index 0000000000..dcc1d2dff6
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.config.ConfigExecution;
+import org.apache.iotdb.db.mpp.execution.config.IConfigTask;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.ExecutorService;
+
+import static com.google.common.util.concurrent.Futures.immediateFuture;
+
+public class ConfigExecutionTest {
+
+ @Test
+ public void normalConfigTaskTest() {
+ IConfigTask task = () -> immediateFuture(null);
+ ConfigExecution execution =
+ new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
+ execution.start();
+ ExecutionResult result = execution.getStatus();
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), result.status.code);
+ }
+
+ @Test
+ public void exceptionConfigTaskTest() {
+ IConfigTask task =
+ () -> {
+ throw new RuntimeException("task throw exception when executing");
+ };
+ ConfigExecution execution =
+ new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
+ execution.start();
+ ExecutionResult result = execution.getStatus();
+ Assert.assertEquals(TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode(), result.status.code);
+ }
+
+ @Test
+ public void exceptionAfterInvokeGetStatusTest() throws InterruptedException {
+ IConfigTask task =
+ () -> {
+ throw new RuntimeException("task throw exception when executing");
+ };
+ ConfigExecution execution =
+ new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
+ Thread resultThread =
+ new Thread(
+ () -> {
+ ExecutionResult result = execution.getStatus();
+ Assert.assertEquals(
+ TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode(), result.status.code);
+ });
+ resultThread.start();
+ execution.start();
+ resultThread.join();
+ }
+
+ @Test
+ public void configTaskCancelledTest() throws InterruptedException {
+ SettableFuture<Void> taskResult = SettableFuture.create();
+ class SimpleTask implements IConfigTask {
+ private final ListenableFuture<Void> result;
+
+ public SimpleTask(ListenableFuture<Void> future) {
+ this.result = future;
+ }
+
+ @Override
+ public ListenableFuture<Void> execute() throws InterruptedException {
+ return result;
+ }
+ }
+ IConfigTask task = new SimpleTask(taskResult);
+ ConfigExecution execution =
+ new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
+ execution.start();
+
+ Thread resultThread =
+ new Thread(
+ () -> {
+ ExecutionResult result = execution.getStatus();
+ Assert.assertEquals(
+ TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode(), result.status.code);
+ });
+ resultThread.start();
+ taskResult.cancel(true);
+ resultThread.join();
+ }
+
+ private MPPQueryContext genMPPQueryContext() {
+ MPPQueryContext context = new MPPQueryContext(new QueryId("query1"));
+ context.setQueryType(QueryType.WRITE);
+ return context;
+ }
+
+ private ExecutorService getExecutor() {
+ return IoTDBThreadPoolFactory.newSingleThreadExecutor("ConfigExecutionTest");
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index d317b9d5cc..ee4d5dedc9 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -68,6 +68,45 @@ import static org.junit.Assert.assertEquals;
public class DistributionPlannerTest {
+ @Test
+ public void TestSingleSeriesScan() throws IllegalPathException {
+ QueryId queryId = new QueryId("test_query");
+ SeriesScanNode root =
+ new SeriesScanNode(
+ queryId.genPlanNodeId(),
+ new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
+ Sets.newHashSet("s1", "s2"),
+ OrderBy.TIMESTAMP_ASC);
+
+ Analysis analysis = constructAnalysis();
+
+ MPPQueryContext context = new MPPQueryContext("", queryId, null, new Endpoint());
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
+ DistributedQueryPlan plan = planner.planFragments();
+ plan.getInstances().forEach(System.out::println);
+ assertEquals(2, plan.getInstances().size());
+ }
+
+ @Test
+ public void TestSingleSeriesScanRewriteSource() throws IllegalPathException {
+ QueryId queryId = new QueryId("test_query");
+ SeriesScanNode root =
+ new SeriesScanNode(
+ queryId.genPlanNodeId(),
+ new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
+ Sets.newHashSet("s1", "s2"),
+ OrderBy.TIMESTAMP_ASC);
+
+ Analysis analysis = constructAnalysis();
+
+ MPPQueryContext context = new MPPQueryContext("", queryId, null, new Endpoint());
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
+ PlanNode rootAfterRewrite = planner.rewriteSource();
+ assertEquals(2, rootAfterRewrite.getChildren().size());
+ }
+
@Test
public void TestRewriteSourceNode() throws IllegalPathException {
QueryId queryId = new QueryId("test_query");