You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/18 05:04:58 UTC

[iotdb] branch xingtanzjr/mpp_issues updated: add failure reason to QueryStateMachine

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/mpp_issues
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/xingtanzjr/mpp_issues by this push:
     new 0d6a9743f9 add failure reason to QueryStateMachine
0d6a9743f9 is described below

commit 0d6a9743f9999317bf69e510e58ec1b6b9d72310
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Apr 18 12:56:34 2022 +0800

    add failure reason to QueryStateMachine
---
 .../iotdb/commons/partition/DataPartition.java     |   4 +-
 .../iotdb/commons/partition/SchemaPartition.java   |   4 +-
 .../apache/iotdb/db/mpp/execution/Coordinator.java |   7 +-
 .../iotdb/db/mpp/execution/QueryExecution.java     |   3 +-
 .../iotdb/db/mpp/execution/QueryStateMachine.java  |  14 ++-
 .../db/mpp/execution/config/ConfigExecution.java   |  52 ++++-----
 .../config/ConfigTaskVisitor.java}                 |  24 ++--
 .../iotdb/db/mpp/execution/config/IConfigTask.java |   2 +-
 .../mpp/execution/scheduler/ClusterScheduler.java  |  27 ++---
 .../scheduler/SimpleFragInstanceDispatcher.java    |  45 ++++----
 .../statement/ConfigStatement.java}                |  12 +-
 .../db/mpp/sql/statement/StatementVisitor.java     |   5 +
 .../metadata/SetStorageGroupStatement.java         |  10 +-
 .../db/mpp/execution/ConfigExecutionTest.java      | 125 +++++++++++++++++++++
 14 files changed, 224 insertions(+), 110 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
index 27df43e6be..e1d24db22d 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
@@ -18,8 +18,6 @@
  */
 package org.apache.iotdb.commons.partition;
 
-import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -28,7 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-public class DataPartition extends Partition{
+public class DataPartition extends Partition {
 
   // Map<StorageGroup, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionMessage>>>>
   private Map<String, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>>
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
index 7795fb9b2e..b3f8c61986 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
@@ -18,14 +18,12 @@
  */
 package org.apache.iotdb.commons.partition;
 
-import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
-
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class SchemaPartition extends Partition{
+public class SchemaPartition extends Partition {
 
   // Map<StorageGroup, Map<SeriesPartitionSlot, SchemaRegionPlaceInfo>>
   private Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> schemaPartitionMap;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index 51f3a6d6f3..0d45e76390 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -28,8 +28,8 @@ import org.apache.iotdb.db.mpp.execution.config.ConfigExecution;
 import org.apache.iotdb.db.mpp.sql.analyze.IPartitionFetcher;
 import org.apache.iotdb.db.mpp.sql.analyze.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
+import org.apache.iotdb.db.mpp.sql.statement.ConfigStatement;
 import org.apache.iotdb.db.mpp.sql.statement.Statement;
-import org.apache.iotdb.db.mpp.sql.statement.metadata.SetStorageGroupStatement;
 
 import org.apache.commons.lang3.Validate;
 
@@ -71,7 +71,7 @@ public class Coordinator {
       MPPQueryContext queryContext,
       IPartitionFetcher partitionFetcher,
       ISchemaFetcher schemaFetcher) {
-    if (statement instanceof SetStorageGroupStatement) {
+    if (statement instanceof ConfigStatement) {
       queryContext.setQueryType(QueryType.WRITE);
       return new ConfigExecution(queryContext, statement, executor);
     }
@@ -125,7 +125,4 @@ public class Coordinator {
   public static Coordinator getInstance() {
     return INSTANCE;
   }
-  //    private TQueryResponse executeQuery(TQueryRequest request) {
-  //
-  //    }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index e7e4995995..6e991039fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -185,10 +185,11 @@ public class QueryExecution implements IQueryExecution {
       return resultHandle.receive();
 
     } catch (ExecutionException | IOException e) {
+      stateMachine.transitionToFailed(e);
       throwIfUnchecked(e.getCause());
       throw new RuntimeException(e.getCause());
     } catch (InterruptedException e) {
-      stateMachine.transitionToFailed();
+      stateMachine.transitionToFailed(e);
       Thread.currentThread().interrupt();
       throw new RuntimeException(new SQLException("ResultSet thread was interrupted", e));
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index c5bc48ef42..e9093abf32 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -39,6 +39,7 @@ public class QueryStateMachine {
 
   // The executor will be used in all the state machines belonged to this query.
   private Executor stateMachineExecutor;
+  private Throwable failureException;
 
   public QueryStateMachine(QueryId queryId, ExecutorService executor) {
     this.name = String.format("QueryStateMachine[%s]", queryId);
@@ -60,7 +61,8 @@ public class QueryStateMachine {
     this.fragInstanceStateMap.put(id, state);
     // TODO: (xingtanzjr) we need to distinguish the Timeout situation
     if (state.isFailed()) {
-      transitionToFailed();
+      transitionToFailed(
+          new RuntimeException(String.format("FragmentInstance[%s] is failed.", id)));
     }
     boolean allFinished =
         fragInstanceStateMap.values().stream()
@@ -120,10 +122,18 @@ public class QueryStateMachine {
     queryState.set(QueryState.ABORTED);
   }
 
-  public void transitionToFailed() {
+  public void transitionToFailed(Throwable throwable) {
     if (queryState.get().isDone()) {
       return;
     }
+    this.failureException = throwable;
     queryState.set(QueryState.FAILED);
   }
+
+  public String getFailureMessage() {
+    if (failureException != null) {
+      return failureException.getMessage();
+    }
+    return "no detailed failure reason in QueryStateMachine";
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java
index 21afde5d29..be1c95871f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.mpp.execution.config;
 
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
 import org.apache.iotdb.db.mpp.execution.ExecutionResult;
@@ -26,10 +27,8 @@ import org.apache.iotdb.db.mpp.execution.IQueryExecution;
 import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
 import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.statement.Statement;
-import org.apache.iotdb.db.mpp.sql.statement.metadata.SetStorageGroupStatement;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.FutureCallback;
@@ -41,8 +40,6 @@ import org.jetbrains.annotations.NotNull;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 
-import static com.google.common.base.Throwables.throwIfInstanceOf;
-
 public class ConfigExecution implements IQueryExecution {
 
   private final MPPQueryContext context;
@@ -52,17 +49,30 @@ public class ConfigExecution implements IQueryExecution {
   private final QueryStateMachine stateMachine;
   private final SettableFuture<Boolean> result;
 
+  private final IConfigTask task;
+
   public ConfigExecution(MPPQueryContext context, Statement statement, ExecutorService executor) {
     this.context = context;
     this.statement = statement;
     this.executor = executor;
     this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
     this.result = SettableFuture.create();
+    this.task = statement.accept(new ConfigTaskVisitor(), new ConfigTaskVisitor.TaskContext());
+  }
+
+  @TestOnly
+  public ConfigExecution(
+      MPPQueryContext context, Statement statement, ExecutorService executor, IConfigTask task) {
+    this.context = context;
+    this.statement = statement;
+    this.executor = executor;
+    this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
+    this.result = SettableFuture.create();
+    this.task = task;
   }
 
   @Override
   public void start() {
-    IConfigTask task = getTask(statement);
     try {
       ListenableFuture<Void> future = task.execute();
       Futures.addCallback(
@@ -82,13 +92,12 @@ public class ConfigExecution implements IQueryExecution {
           executor);
     } catch (Throwable e) {
       fail(e);
-      throwIfInstanceOf(e, Error.class);
     }
   }
 
   public void fail(Throwable cause) {
-    stateMachine.transitionToFailed();
-    result.cancel(false);
+    stateMachine.transitionToFailed(cause);
+    result.set(false);
   }
 
   @Override
@@ -97,19 +106,16 @@ public class ConfigExecution implements IQueryExecution {
   @Override
   public ExecutionResult getStatus() {
     try {
-      if (result.isCancelled()) {
-        return new ExecutionResult(
-            context.getQueryId(), RpcUtils.getStatus(TSStatusCode.QUERY_PROCESS_ERROR));
-      }
       Boolean success = result.get();
       TSStatusCode statusCode =
           success ? TSStatusCode.SUCCESS_STATUS : TSStatusCode.QUERY_PROCESS_ERROR;
-      return new ExecutionResult(context.getQueryId(), RpcUtils.getStatus(statusCode));
-
+      String message = success ? "" : stateMachine.getFailureMessage();
+      return new ExecutionResult(context.getQueryId(), RpcUtils.getStatus(statusCode, message));
     } catch (InterruptedException | ExecutionException e) {
       Thread.currentThread().interrupt();
       return new ExecutionResult(
-          context.getQueryId(), RpcUtils.getStatus(TSStatusCode.QUERY_PROCESS_ERROR));
+          context.getQueryId(),
+          RpcUtils.getStatus(TSStatusCode.QUERY_PROCESS_ERROR, e.getMessage()));
     }
   }
 
@@ -140,20 +146,4 @@ public class ConfigExecution implements IQueryExecution {
   public boolean isQuery() {
     return context.getQueryType() == QueryType.READ;
   }
-
-  // TODO: consider a more suitable implementation for it
-  // Generate the corresponding IConfigTask by statement.
-  // Each type of statement will has a ConfigTask
-  private IConfigTask getTask(Statement statement) {
-    try {
-      switch (statement.getType()) {
-        case SET_STORAGE_GROUP:
-          return new SetStorageGroupTask((SetStorageGroupStatement) statement);
-        default:
-          throw new NotImplementedException();
-      }
-    } catch (ClassCastException classCastException) {
-      throw new NotImplementedException();
-    }
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/SetStorageGroupStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigTaskVisitor.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/SetStorageGroupStatement.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigTaskVisitor.java
index de54b604e7..30cf56e998 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/SetStorageGroupStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigTaskVisitor.java
@@ -17,25 +17,23 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.sql.statement.metadata;
+package org.apache.iotdb.db.mpp.execution.config;
 
-import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.mpp.sql.constant.StatementType;
 import org.apache.iotdb.db.mpp.sql.statement.Statement;
+import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
+import org.apache.iotdb.db.mpp.sql.statement.metadata.SetStorageGroupStatement;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
 
-public class SetStorageGroupStatement extends Statement {
-  private PartialPath storageGroupPath;
+public class ConfigTaskVisitor
+    extends StatementVisitor<IConfigTask, ConfigTaskVisitor.TaskContext> {
 
-  public SetStorageGroupStatement() {
-    super();
-    statementType = StatementType.SET_STORAGE_GROUP;
+  public IConfigTask visitStatement(Statement statement, TaskContext context) {
+    throw new NotImplementedException("ConfigTask is not implemented for: " + statement);
   }
 
-  public PartialPath getStorageGroupPath() {
-    return storageGroupPath;
+  public IConfigTask visitSetStorageGroup(SetStorageGroupStatement statement, TaskContext context) {
+    return new SetStorageGroupTask(statement);
   }
 
-  public void setStorageGroupPath(PartialPath storageGroupPath) {
-    this.storageGroupPath = storageGroupPath;
-  }
+  public static class TaskContext {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java
index cb3f9b7a02..fac0575fc3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java
@@ -22,5 +22,5 @@ package org.apache.iotdb.db.mpp.execution.config;
 import com.google.common.util.concurrent.ListenableFuture;
 
 public interface IConfigTask {
-  ListenableFuture<Void> execute();
+  ListenableFuture<Void> execute() throws InterruptedException;
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
index 29d05136e8..8b819d6cad 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
@@ -79,16 +79,20 @@ public class ClusterScheduler implements IScheduler {
 
   @Override
   public void start() {
-    // TODO: consider where the state transition should be put
     stateMachine.transitionToDispatching();
     Future<FragInstanceDispatchResult> dispatchResultFuture = dispatcher.dispatch(instances);
 
     // NOTICE: the FragmentInstance may be dispatched to another Host due to consensus redirect.
     // So we need to start the state fetcher after the dispatching stage.
-    boolean success = waitDispatchingFinished(dispatchResultFuture);
-    // If the dispatch failed, we make the QueryState as failed, and return.
-    if (!success) {
-      stateMachine.transitionToFailed();
+    try {
+      FragInstanceDispatchResult result = dispatchResultFuture.get();
+      if (!result.isSuccessful()) {
+        stateMachine.transitionToFailed(new IllegalStateException("Fragment cannot be dispatched"));
+        return;
+      }
+    } catch (InterruptedException | ExecutionException e) {
+      // If the dispatch failed, we make the QueryState as failed, and return.
+      stateMachine.transitionToFailed(e);
       return;
     }
 
@@ -110,19 +114,6 @@ public class ClusterScheduler implements IScheduler {
     this.stateTracker.start();
   }
 
-  private boolean waitDispatchingFinished(Future<FragInstanceDispatchResult> dispatchResultFuture) {
-    try {
-      FragInstanceDispatchResult result = dispatchResultFuture.get();
-      if (result.isSuccessful()) {
-        return true;
-      }
-    } catch (InterruptedException | ExecutionException e) {
-      Thread.currentThread().interrupt();
-      // TODO: (xingtanzjr) record the dispatch failure reason.
-    }
-    return false;
-  }
-
   @Override
   public void stop() {
     // TODO: It seems that it is unnecessary to check whether they are null or not. Is it a best
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
index bdece41164..286f3df3e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
@@ -46,32 +46,27 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
     return executor.submit(
         () -> {
           TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp(false);
-          try {
-            for (FragmentInstance instance : instances) {
-              InternalService.Iface client =
-                  InternalServiceClientFactory.getMppServiceClient(
-                      new Endpoint(
-                          instance.getHostEndpoint().getIp(),
-                          IoTDBDescriptor.getInstance().getConfig().getMppPort()));
-              // TODO: (xingtanzjr) consider how to handle the buffer here
-              ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
-              instance.serializeRequest(buffer);
-              buffer.flip();
-              TConsensusGroupId groupId =
-                  new TConsensusGroupId(
-                      instance.getRegionReplicaSet().getConsensusGroupId().getId(),
-                      instance.getRegionReplicaSet().getConsensusGroupId().getType().toString());
-              TSendFragmentInstanceReq req =
-                  new TSendFragmentInstanceReq(
-                      new TFragmentInstance(buffer), groupId, instance.getType().toString());
-              resp = client.sendFragmentInstance(req);
-              if (!resp.accepted) {
-                break;
-              }
+          for (FragmentInstance instance : instances) {
+            InternalService.Iface client =
+                InternalServiceClientFactory.getMppServiceClient(
+                    new Endpoint(
+                        instance.getHostEndpoint().getIp(),
+                        IoTDBDescriptor.getInstance().getConfig().getMppPort()));
+            // TODO: (xingtanzjr) consider how to handle the buffer here
+            ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
+            instance.serializeRequest(buffer);
+            buffer.flip();
+            TConsensusGroupId groupId =
+                new TConsensusGroupId(
+                    instance.getRegionReplicaSet().getConsensusGroupId().getId(),
+                    instance.getRegionReplicaSet().getConsensusGroupId().getType().toString());
+            TSendFragmentInstanceReq req =
+                new TSendFragmentInstanceReq(
+                    new TFragmentInstance(buffer), groupId, instance.getType().toString());
+            resp = client.sendFragmentInstance(req);
+            if (!resp.accepted) {
+              break;
             }
-          } catch (Exception e) {
-            // TODO: (xingtanzjr) add more details
-            return new FragInstanceDispatchResult(false);
           }
           return new FragInstanceDispatchResult(resp.accepted);
         });
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/ConfigStatement.java
similarity index 73%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/ConfigStatement.java
index cb3f9b7a02..304401bc0c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/ConfigStatement.java
@@ -17,10 +17,10 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.execution.config;
+package org.apache.iotdb.db.mpp.sql.statement;
 
-import com.google.common.util.concurrent.ListenableFuture;
-
-public interface IConfigTask {
-  ListenableFuture<Void> execute();
-}
+/**
+ * ConfigStatement represents the statement which should be executed by ConfigNode All the
+ * statements which need to be transformed into IConfigTask should extend this class
+ */
+public abstract class ConfigStatement extends Statement {}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java
index 42e88eaa23..14f4606d0d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.mpp.sql.statement.metadata.AlterTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateAlignedTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.sql.statement.metadata.SchemaFetchStatement;
+import org.apache.iotdb.db.mpp.sql.statement.metadata.SetStorageGroupStatement;
 import org.apache.iotdb.db.mpp.sql.statement.metadata.ShowDevicesStatement;
 import org.apache.iotdb.db.mpp.sql.statement.metadata.ShowStorageGroupStatement;
 import org.apache.iotdb.db.mpp.sql.statement.metadata.ShowTimeSeriesStatement;
@@ -83,6 +84,10 @@ public abstract class StatementVisitor<R, C> {
     return visitStatement(alterTimeSeriesStatement, context);
   }
 
+  public R visitSetStorageGroup(SetStorageGroupStatement alterTimeSeriesStatement, C context) {
+    return visitStatement(alterTimeSeriesStatement, context);
+  }
+
   /** Data Manipulation Language (DML) */
 
   // Select Statement
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/SetStorageGroupStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/SetStorageGroupStatement.java
index de54b604e7..b99b1363f2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/SetStorageGroupStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/SetStorageGroupStatement.java
@@ -21,9 +21,10 @@ package org.apache.iotdb.db.mpp.sql.statement.metadata;
 
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.sql.constant.StatementType;
-import org.apache.iotdb.db.mpp.sql.statement.Statement;
+import org.apache.iotdb.db.mpp.sql.statement.ConfigStatement;
+import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
 
-public class SetStorageGroupStatement extends Statement {
+public class SetStorageGroupStatement extends ConfigStatement {
   private PartialPath storageGroupPath;
 
   public SetStorageGroupStatement() {
@@ -35,6 +36,11 @@ public class SetStorageGroupStatement extends Statement {
     return storageGroupPath;
   }
 
+  @Override
+  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+    return visitor.visitSetStorageGroup(this, context);
+  }
+
   public void setStorageGroupPath(PartialPath storageGroupPath) {
     this.storageGroupPath = storageGroupPath;
   }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
new file mode 100644
index 0000000000..dcc1d2dff6
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.config.ConfigExecution;
+import org.apache.iotdb.db.mpp.execution.config.IConfigTask;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.ExecutorService;
+
+import static com.google.common.util.concurrent.Futures.immediateFuture;
+
+public class ConfigExecutionTest {
+
+  @Test
+  public void normalConfigTaskTest() {
+    IConfigTask task = () -> immediateFuture(null);
+    ConfigExecution execution =
+        new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
+    execution.start();
+    ExecutionResult result = execution.getStatus();
+    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), result.status.code);
+  }
+
+  @Test
+  public void exceptionConfigTaskTest() {
+    IConfigTask task =
+        () -> {
+          throw new RuntimeException("task throw exception when executing");
+        };
+    ConfigExecution execution =
+        new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
+    execution.start();
+    ExecutionResult result = execution.getStatus();
+    Assert.assertEquals(TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode(), result.status.code);
+  }
+
+  @Test
+  public void exceptionAfterInvokeGetStatusTest() throws InterruptedException {
+    IConfigTask task =
+        () -> {
+          throw new RuntimeException("task throw exception when executing");
+        };
+    ConfigExecution execution =
+        new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
+    Thread resultThread =
+        new Thread(
+            () -> {
+              ExecutionResult result = execution.getStatus();
+              Assert.assertEquals(
+                  TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode(), result.status.code);
+            });
+    resultThread.start();
+    execution.start();
+    resultThread.join();
+  }
+
+  @Test
+  public void configTaskCancelledTest() throws InterruptedException {
+    SettableFuture<Void> taskResult = SettableFuture.create();
+    class SimpleTask implements IConfigTask {
+      private final ListenableFuture<Void> result;
+
+      public SimpleTask(ListenableFuture<Void> future) {
+        this.result = future;
+      }
+
+      @Override
+      public ListenableFuture<Void> execute() throws InterruptedException {
+        return result;
+      }
+    }
+    IConfigTask task = new SimpleTask(taskResult);
+    ConfigExecution execution =
+        new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
+    execution.start();
+
+    Thread resultThread =
+        new Thread(
+            () -> {
+              ExecutionResult result = execution.getStatus();
+              Assert.assertEquals(
+                  TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode(), result.status.code);
+            });
+    resultThread.start();
+    taskResult.cancel(true);
+    resultThread.join();
+  }
+
+  private MPPQueryContext genMPPQueryContext() {
+    MPPQueryContext context = new MPPQueryContext(new QueryId("query1"));
+    context.setQueryType(QueryType.WRITE);
+    return context;
+  }
+
+  private ExecutorService getExecutor() {
+    return IoTDBThreadPoolFactory.newSingleThreadExecutor("ConfigExecutionTest");
+  }
+}