You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/18 05:04:58 UTC
[iotdb] branch xingtanzjr/mpp_issues updated: add failure reason to QueryStateMachine
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/mpp_issues
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/xingtanzjr/mpp_issues by this push:
new 0d6a9743f9 add failure reason to QueryStateMachine
0d6a9743f9 is described below
commit 0d6a9743f9999317bf69e510e58ec1b6b9d72310
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Apr 18 12:56:34 2022 +0800
add failure reason to QueryStateMachine
---
.../iotdb/commons/partition/DataPartition.java | 4 +-
.../iotdb/commons/partition/SchemaPartition.java | 4 +-
.../apache/iotdb/db/mpp/execution/Coordinator.java | 7 +-
.../iotdb/db/mpp/execution/QueryExecution.java | 3 +-
.../iotdb/db/mpp/execution/QueryStateMachine.java | 14 ++-
.../db/mpp/execution/config/ConfigExecution.java | 52 ++++-----
.../config/ConfigTaskVisitor.java} | 24 ++--
.../iotdb/db/mpp/execution/config/IConfigTask.java | 2 +-
.../mpp/execution/scheduler/ClusterScheduler.java | 27 ++---
.../scheduler/SimpleFragInstanceDispatcher.java | 45 ++++----
.../statement/ConfigStatement.java} | 12 +-
.../db/mpp/sql/statement/StatementVisitor.java | 5 +
.../metadata/SetStorageGroupStatement.java | 10 +-
.../db/mpp/execution/ConfigExecutionTest.java | 125 +++++++++++++++++++++
14 files changed, 224 insertions(+), 110 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
index 27df43e6be..e1d24db22d 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
@@ -18,8 +18,6 @@
*/
package org.apache.iotdb.commons.partition;
-import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
-
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -28,7 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-public class DataPartition extends Partition{
+public class DataPartition extends Partition {
// Map<StorageGroup, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionMessage>>>>
private Map<String, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>>
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
index 7795fb9b2e..b3f8c61986 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
@@ -18,14 +18,12 @@
*/
package org.apache.iotdb.commons.partition;
-import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
-
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class SchemaPartition extends Partition{
+public class SchemaPartition extends Partition {
// Map<StorageGroup, Map<SeriesPartitionSlot, SchemaRegionPlaceInfo>>
private Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> schemaPartitionMap;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index 51f3a6d6f3..0d45e76390 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -28,8 +28,8 @@ import org.apache.iotdb.db.mpp.execution.config.ConfigExecution;
import org.apache.iotdb.db.mpp.sql.analyze.IPartitionFetcher;
import org.apache.iotdb.db.mpp.sql.analyze.ISchemaFetcher;
import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
+import org.apache.iotdb.db.mpp.sql.statement.ConfigStatement;
import org.apache.iotdb.db.mpp.sql.statement.Statement;
-import org.apache.iotdb.db.mpp.sql.statement.metadata.SetStorageGroupStatement;
import org.apache.commons.lang3.Validate;
@@ -71,7 +71,7 @@ public class Coordinator {
MPPQueryContext queryContext,
IPartitionFetcher partitionFetcher,
ISchemaFetcher schemaFetcher) {
- if (statement instanceof SetStorageGroupStatement) {
+ if (statement instanceof ConfigStatement) {
queryContext.setQueryType(QueryType.WRITE);
return new ConfigExecution(queryContext, statement, executor);
}
@@ -125,7 +125,4 @@ public class Coordinator {
public static Coordinator getInstance() {
return INSTANCE;
}
- // private TQueryResponse executeQuery(TQueryRequest request) {
- //
- // }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index e7e4995995..6e991039fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -185,10 +185,11 @@ public class QueryExecution implements IQueryExecution {
return resultHandle.receive();
} catch (ExecutionException | IOException e) {
+ stateMachine.transitionToFailed(e);
throwIfUnchecked(e.getCause());
throw new RuntimeException(e.getCause());
} catch (InterruptedException e) {
- stateMachine.transitionToFailed();
+ stateMachine.transitionToFailed(e);
Thread.currentThread().interrupt();
throw new RuntimeException(new SQLException("ResultSet thread was interrupted", e));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index c5bc48ef42..e9093abf32 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -39,6 +39,7 @@ public class QueryStateMachine {
// The executor will be used in all the state machines belonged to this query.
private Executor stateMachineExecutor;
+ private Throwable failureException;
public QueryStateMachine(QueryId queryId, ExecutorService executor) {
this.name = String.format("QueryStateMachine[%s]", queryId);
@@ -60,7 +61,8 @@ public class QueryStateMachine {
this.fragInstanceStateMap.put(id, state);
// TODO: (xingtanzjr) we need to distinguish the Timeout situation
if (state.isFailed()) {
- transitionToFailed();
+ transitionToFailed(
+ new RuntimeException(String.format("FragmentInstance[%s] is failed.", id)));
}
boolean allFinished =
fragInstanceStateMap.values().stream()
@@ -120,10 +122,18 @@ public class QueryStateMachine {
queryState.set(QueryState.ABORTED);
}
- public void transitionToFailed() {
+ public void transitionToFailed(Throwable throwable) {
if (queryState.get().isDone()) {
return;
}
+ this.failureException = throwable;
queryState.set(QueryState.FAILED);
}
+
+ public String getFailureMessage() {
+ if (failureException != null) {
+ return failureException.getMessage();
+ }
+ return "no detailed failure reason in QueryStateMachine";
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java
index 21afde5d29..be1c95871f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.execution.config;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
import org.apache.iotdb.db.mpp.execution.ExecutionResult;
@@ -26,10 +27,8 @@ import org.apache.iotdb.db.mpp.execution.IQueryExecution;
import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
import org.apache.iotdb.db.mpp.sql.statement.Statement;
-import org.apache.iotdb.db.mpp.sql.statement.metadata.SetStorageGroupStatement;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.FutureCallback;
@@ -41,8 +40,6 @@ import org.jetbrains.annotations.NotNull;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import static com.google.common.base.Throwables.throwIfInstanceOf;
-
public class ConfigExecution implements IQueryExecution {
private final MPPQueryContext context;
@@ -52,17 +49,30 @@ public class ConfigExecution implements IQueryExecution {
private final QueryStateMachine stateMachine;
private final SettableFuture<Boolean> result;
+ private final IConfigTask task;
+
public ConfigExecution(MPPQueryContext context, Statement statement, ExecutorService executor) {
this.context = context;
this.statement = statement;
this.executor = executor;
this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
this.result = SettableFuture.create();
+ this.task = statement.accept(new ConfigTaskVisitor(), new ConfigTaskVisitor.TaskContext());
+ }
+
+ @TestOnly
+ public ConfigExecution(
+ MPPQueryContext context, Statement statement, ExecutorService executor, IConfigTask task) {
+ this.context = context;
+ this.statement = statement;
+ this.executor = executor;
+ this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
+ this.result = SettableFuture.create();
+ this.task = task;
}
@Override
public void start() {
- IConfigTask task = getTask(statement);
try {
ListenableFuture<Void> future = task.execute();
Futures.addCallback(
@@ -82,13 +92,12 @@ public class ConfigExecution implements IQueryExecution {
executor);
} catch (Throwable e) {
fail(e);
- throwIfInstanceOf(e, Error.class);
}
}
public void fail(Throwable cause) {
- stateMachine.transitionToFailed();
- result.cancel(false);
+ stateMachine.transitionToFailed(cause);
+ result.set(false);
}
@Override
@@ -97,19 +106,16 @@ public class ConfigExecution implements IQueryExecution {
@Override
public ExecutionResult getStatus() {
try {
- if (result.isCancelled()) {
- return new ExecutionResult(
- context.getQueryId(), RpcUtils.getStatus(TSStatusCode.QUERY_PROCESS_ERROR));
- }
Boolean success = result.get();
TSStatusCode statusCode =
success ? TSStatusCode.SUCCESS_STATUS : TSStatusCode.QUERY_PROCESS_ERROR;
- return new ExecutionResult(context.getQueryId(), RpcUtils.getStatus(statusCode));
-
+ String message = success ? "" : stateMachine.getFailureMessage();
+ return new ExecutionResult(context.getQueryId(), RpcUtils.getStatus(statusCode, message));
} catch (InterruptedException | ExecutionException e) {
Thread.currentThread().interrupt();
return new ExecutionResult(
- context.getQueryId(), RpcUtils.getStatus(TSStatusCode.QUERY_PROCESS_ERROR));
+ context.getQueryId(),
+ RpcUtils.getStatus(TSStatusCode.QUERY_PROCESS_ERROR, e.getMessage()));
}
}
@@ -140,20 +146,4 @@ public class ConfigExecution implements IQueryExecution {
public boolean isQuery() {
return context.getQueryType() == QueryType.READ;
}
-
- // TODO: consider a more suitable implementation for it
- // Generate the corresponding IConfigTask by statement.
- // Each type of statement will has a ConfigTask
- private IConfigTask getTask(Statement statement) {
- try {
- switch (statement.getType()) {
- case SET_STORAGE_GROUP:
- return new SetStorageGroupTask((SetStorageGroupStatement) statement);
- default:
- throw new NotImplementedException();
- }
- } catch (ClassCastException classCastException) {
- throw new NotImplementedException();
- }
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/SetStorageGroupStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigTaskVisitor.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/SetStorageGroupStatement.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigTaskVisitor.java
index de54b604e7..30cf56e998 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/SetStorageGroupStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigTaskVisitor.java
@@ -17,25 +17,23 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.sql.statement.metadata;
+package org.apache.iotdb.db.mpp.execution.config;
-import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.mpp.sql.constant.StatementType;
import org.apache.iotdb.db.mpp.sql.statement.Statement;
+import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
+import org.apache.iotdb.db.mpp.sql.statement.metadata.SetStorageGroupStatement;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
-public class SetStorageGroupStatement extends Statement {
- private PartialPath storageGroupPath;
+public class ConfigTaskVisitor
+ extends StatementVisitor<IConfigTask, ConfigTaskVisitor.TaskContext> {
- public SetStorageGroupStatement() {
- super();
- statementType = StatementType.SET_STORAGE_GROUP;
+ public IConfigTask visitStatement(Statement statement, TaskContext context) {
+ throw new NotImplementedException("ConfigTask is not implemented for: " + statement);
}
- public PartialPath getStorageGroupPath() {
- return storageGroupPath;
+ public IConfigTask visitSetStorageGroup(SetStorageGroupStatement statement, TaskContext context) {
+ return new SetStorageGroupTask(statement);
}
- public void setStorageGroupPath(PartialPath storageGroupPath) {
- this.storageGroupPath = storageGroupPath;
- }
+ public static class TaskContext {}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java
index cb3f9b7a02..fac0575fc3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java
@@ -22,5 +22,5 @@ package org.apache.iotdb.db.mpp.execution.config;
import com.google.common.util.concurrent.ListenableFuture;
public interface IConfigTask {
- ListenableFuture<Void> execute();
+ ListenableFuture<Void> execute() throws InterruptedException;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
index 29d05136e8..8b819d6cad 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
@@ -79,16 +79,20 @@ public class ClusterScheduler implements IScheduler {
@Override
public void start() {
- // TODO: consider where the state transition should be put
stateMachine.transitionToDispatching();
Future<FragInstanceDispatchResult> dispatchResultFuture = dispatcher.dispatch(instances);
// NOTICE: the FragmentInstance may be dispatched to another Host due to consensus redirect.
// So we need to start the state fetcher after the dispatching stage.
- boolean success = waitDispatchingFinished(dispatchResultFuture);
- // If the dispatch failed, we make the QueryState as failed, and return.
- if (!success) {
- stateMachine.transitionToFailed();
+ try {
+ FragInstanceDispatchResult result = dispatchResultFuture.get();
+ if (!result.isSuccessful()) {
+ stateMachine.transitionToFailed(new IllegalStateException("Fragment cannot be dispatched"));
+ return;
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ // If the dispatch failed, we make the QueryState as failed, and return.
+ stateMachine.transitionToFailed(e);
return;
}
@@ -110,19 +114,6 @@ public class ClusterScheduler implements IScheduler {
this.stateTracker.start();
}
- private boolean waitDispatchingFinished(Future<FragInstanceDispatchResult> dispatchResultFuture) {
- try {
- FragInstanceDispatchResult result = dispatchResultFuture.get();
- if (result.isSuccessful()) {
- return true;
- }
- } catch (InterruptedException | ExecutionException e) {
- Thread.currentThread().interrupt();
- // TODO: (xingtanzjr) record the dispatch failure reason.
- }
- return false;
- }
-
@Override
public void stop() {
// TODO: It seems that it is unnecessary to check whether they are null or not. Is it a best
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
index bdece41164..286f3df3e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
@@ -46,32 +46,27 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
return executor.submit(
() -> {
TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp(false);
- try {
- for (FragmentInstance instance : instances) {
- InternalService.Iface client =
- InternalServiceClientFactory.getMppServiceClient(
- new Endpoint(
- instance.getHostEndpoint().getIp(),
- IoTDBDescriptor.getInstance().getConfig().getMppPort()));
- // TODO: (xingtanzjr) consider how to handle the buffer here
- ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
- instance.serializeRequest(buffer);
- buffer.flip();
- TConsensusGroupId groupId =
- new TConsensusGroupId(
- instance.getRegionReplicaSet().getConsensusGroupId().getId(),
- instance.getRegionReplicaSet().getConsensusGroupId().getType().toString());
- TSendFragmentInstanceReq req =
- new TSendFragmentInstanceReq(
- new TFragmentInstance(buffer), groupId, instance.getType().toString());
- resp = client.sendFragmentInstance(req);
- if (!resp.accepted) {
- break;
- }
+ for (FragmentInstance instance : instances) {
+ InternalService.Iface client =
+ InternalServiceClientFactory.getMppServiceClient(
+ new Endpoint(
+ instance.getHostEndpoint().getIp(),
+ IoTDBDescriptor.getInstance().getConfig().getMppPort()));
+ // TODO: (xingtanzjr) consider how to handle the buffer here
+ ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
+ instance.serializeRequest(buffer);
+ buffer.flip();
+ TConsensusGroupId groupId =
+ new TConsensusGroupId(
+ instance.getRegionReplicaSet().getConsensusGroupId().getId(),
+ instance.getRegionReplicaSet().getConsensusGroupId().getType().toString());
+ TSendFragmentInstanceReq req =
+ new TSendFragmentInstanceReq(
+ new TFragmentInstance(buffer), groupId, instance.getType().toString());
+ resp = client.sendFragmentInstance(req);
+ if (!resp.accepted) {
+ break;
}
- } catch (Exception e) {
- // TODO: (xingtanzjr) add more details
- return new FragInstanceDispatchResult(false);
}
return new FragInstanceDispatchResult(resp.accepted);
});
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/ConfigStatement.java
similarity index 73%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/ConfigStatement.java
index cb3f9b7a02..304401bc0c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/ConfigStatement.java
@@ -17,10 +17,10 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.execution.config;
+package org.apache.iotdb.db.mpp.sql.statement;
-import com.google.common.util.concurrent.ListenableFuture;
-
-public interface IConfigTask {
- ListenableFuture<Void> execute();
-}
+/**
+ * ConfigStatement represents the statement which should be executed by ConfigNode All the
+ * statements which need to be transformed into IConfigTask should extend this class
+ */
+public abstract class ConfigStatement extends Statement {}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java
index 42e88eaa23..14f4606d0d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.mpp.sql.statement.metadata.AlterTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateAlignedTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.SchemaFetchStatement;
+import org.apache.iotdb.db.mpp.sql.statement.metadata.SetStorageGroupStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.ShowDevicesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.ShowStorageGroupStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.ShowTimeSeriesStatement;
@@ -83,6 +84,10 @@ public abstract class StatementVisitor<R, C> {
return visitStatement(alterTimeSeriesStatement, context);
}
+ public R visitSetStorageGroup(SetStorageGroupStatement alterTimeSeriesStatement, C context) {
+ return visitStatement(alterTimeSeriesStatement, context);
+ }
+
/** Data Manipulation Language (DML) */
// Select Statement
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/SetStorageGroupStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/SetStorageGroupStatement.java
index de54b604e7..b99b1363f2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/SetStorageGroupStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/SetStorageGroupStatement.java
@@ -21,9 +21,10 @@ package org.apache.iotdb.db.mpp.sql.statement.metadata;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.sql.constant.StatementType;
-import org.apache.iotdb.db.mpp.sql.statement.Statement;
+import org.apache.iotdb.db.mpp.sql.statement.ConfigStatement;
+import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
-public class SetStorageGroupStatement extends Statement {
+public class SetStorageGroupStatement extends ConfigStatement {
private PartialPath storageGroupPath;
public SetStorageGroupStatement() {
@@ -35,6 +36,11 @@ public class SetStorageGroupStatement extends Statement {
return storageGroupPath;
}
+ @Override
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitSetStorageGroup(this, context);
+ }
+
public void setStorageGroupPath(PartialPath storageGroupPath) {
this.storageGroupPath = storageGroupPath;
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
new file mode 100644
index 0000000000..dcc1d2dff6
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.config.ConfigExecution;
+import org.apache.iotdb.db.mpp.execution.config.IConfigTask;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.ExecutorService;
+
+import static com.google.common.util.concurrent.Futures.immediateFuture;
+
+public class ConfigExecutionTest {
+
+ @Test
+ public void normalConfigTaskTest() {
+ IConfigTask task = () -> immediateFuture(null);
+ ConfigExecution execution =
+ new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
+ execution.start();
+ ExecutionResult result = execution.getStatus();
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), result.status.code);
+ }
+
+ @Test
+ public void exceptionConfigTaskTest() {
+ IConfigTask task =
+ () -> {
+ throw new RuntimeException("task throw exception when executing");
+ };
+ ConfigExecution execution =
+ new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
+ execution.start();
+ ExecutionResult result = execution.getStatus();
+ Assert.assertEquals(TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode(), result.status.code);
+ }
+
+ @Test
+ public void exceptionAfterInvokeGetStatusTest() throws InterruptedException {
+ IConfigTask task =
+ () -> {
+ throw new RuntimeException("task throw exception when executing");
+ };
+ ConfigExecution execution =
+ new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
+ Thread resultThread =
+ new Thread(
+ () -> {
+ ExecutionResult result = execution.getStatus();
+ Assert.assertEquals(
+ TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode(), result.status.code);
+ });
+ resultThread.start();
+ execution.start();
+ resultThread.join();
+ }
+
+ @Test
+ public void configTaskCancelledTest() throws InterruptedException {
+ SettableFuture<Void> taskResult = SettableFuture.create();
+ class SimpleTask implements IConfigTask {
+ private final ListenableFuture<Void> result;
+
+ public SimpleTask(ListenableFuture<Void> future) {
+ this.result = future;
+ }
+
+ @Override
+ public ListenableFuture<Void> execute() throws InterruptedException {
+ return result;
+ }
+ }
+ IConfigTask task = new SimpleTask(taskResult);
+ ConfigExecution execution =
+ new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
+ execution.start();
+
+ Thread resultThread =
+ new Thread(
+ () -> {
+ ExecutionResult result = execution.getStatus();
+ Assert.assertEquals(
+ TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode(), result.status.code);
+ });
+ resultThread.start();
+ taskResult.cancel(true);
+ resultThread.join();
+ }
+
+ private MPPQueryContext genMPPQueryContext() {
+ MPPQueryContext context = new MPPQueryContext(new QueryId("query1"));
+ context.setQueryType(QueryType.WRITE);
+ return context;
+ }
+
+ private ExecutorService getExecutor() {
+ return IoTDBThreadPoolFactory.newSingleThreadExecutor("ConfigExecutionTest");
+ }
+}