You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/11 04:50:58 UTC

[iotdb] branch xingtanzjr/scheduler created (now d7f4a4a8ba)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a change to branch xingtanzjr/scheduler
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at d7f4a4a8ba complete basic verification

This branch includes the following new commits:

     new fbaaa4a273 tmp save
     new d7f4a4a8ba complete basic verification

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/02: tmp save

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/scheduler
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit fbaaa4a273f11e7e1eeaec6cc36dfaff4abca161
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Tue May 10 23:13:38 2022 +0800

    tmp save
---
 .../mpp/FragmentInstanceDispatchException.java}    |  22 +--
 .../execution/fragment/FragmentInstanceInfo.java   |  11 +-
 .../org/apache/iotdb/db/mpp/plan/Coordinator.java  |  11 ++
 .../db/mpp/plan/execution/QueryExecution.java      |   6 +-
 .../db/mpp/plan/scheduler/ClusterScheduler.java    |   6 +-
 .../scheduler/FragmentInstanceDispatcherImpl.java  | 191 +++++++++++++++++++++
 .../iotdb/db/mpp/plan/plan/QueryPlannerTest.java   |   1 +
 7 files changed, 227 insertions(+), 21 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceInfo.java b/server/src/main/java/org/apache/iotdb/db/exception/mpp/FragmentInstanceDispatchException.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceInfo.java
copy to server/src/main/java/org/apache/iotdb/db/exception/mpp/FragmentInstanceDispatchException.java
index 42d915b42b..8f32997d6f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/mpp/FragmentInstanceDispatchException.java
@@ -16,25 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.execution.fragment;
 
-import org.apache.iotdb.consensus.common.DataSet;
+package org.apache.iotdb.db.exception.mpp;
 
-public class FragmentInstanceInfo implements DataSet {
-  private final FragmentInstanceState state;
-
-  private final long endTime;
-
-  public FragmentInstanceInfo(FragmentInstanceState state, long endTime) {
-    this.state = state;
-    this.endTime = endTime;
-  }
-
-  public FragmentInstanceState getState() {
-    return state;
-  }
-
-  public long getEndTime() {
-    return endTime;
+public class FragmentInstanceDispatchException extends Exception {
+  public FragmentInstanceDispatchException(Throwable t) {
+    super(t);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceInfo.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceInfo.java
index 42d915b42b..585c2845c5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceInfo.java
@@ -22,7 +22,7 @@ import org.apache.iotdb.consensus.common.DataSet;
 
 public class FragmentInstanceInfo implements DataSet {
   private final FragmentInstanceState state;
-
+  private String message;
   private final long endTime;
 
   public FragmentInstanceInfo(FragmentInstanceState state, long endTime) {
@@ -30,6 +30,11 @@ public class FragmentInstanceInfo implements DataSet {
     this.endTime = endTime;
   }
 
+  public FragmentInstanceInfo(FragmentInstanceState state, long endTime, String message) {
+    this(state, endTime);
+    this.message = message;
+  }
+
   public FragmentInstanceState getState() {
     return state;
   }
@@ -37,4 +42,8 @@ public class FragmentInstanceInfo implements DataSet {
   public long getEndTime() {
     return endTime;
   }
+
+  public String getMessage() {
+    return message;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
index 23c417862c..c6f6c4a2b1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
@@ -53,6 +53,8 @@ public class Coordinator {
 
   private static final String COORDINATOR_EXECUTOR_NAME = "MPPCoordinator";
   private static final int COORDINATOR_EXECUTOR_SIZE = 10;
+  private static final String COORDINATOR_WRITE_EXECUTOR_NAME = "MPPCoordinatorWrite";
+  private static final int COORDINATOR_WRITE_EXECUTOR_SIZE = 10;
   private static final String COORDINATOR_SCHEDULED_EXECUTOR_NAME = "MPPCoordinatorScheduled";
   private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 1;
 
@@ -73,6 +75,7 @@ public class Coordinator {
                   new DataNodeClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
 
   private final ExecutorService executor;
+  private final ExecutorService writeOperationExecutor;
   private final ScheduledExecutorService scheduledExecutor;
 
   private static final Coordinator INSTANCE = new Coordinator();
@@ -82,6 +85,7 @@ public class Coordinator {
   private Coordinator() {
     this.queryExecutionMap = new ConcurrentHashMap<>();
     this.executor = getQueryExecutor();
+    this.writeOperationExecutor = getWriteExecutor();
     this.scheduledExecutor = getScheduledExecutor();
   }
 
@@ -98,6 +102,7 @@ public class Coordinator {
         statement,
         queryContext,
         executor,
+        writeOperationExecutor,
         scheduledExecutor,
         partitionFetcher,
         schemaFetcher,
@@ -138,6 +143,12 @@ public class Coordinator {
     return IoTDBThreadPoolFactory.newFixedThreadPool(
         COORDINATOR_EXECUTOR_SIZE, COORDINATOR_EXECUTOR_NAME);
   }
+
+  private ExecutorService getWriteExecutor() {
+    return IoTDBThreadPoolFactory.newFixedThreadPool(
+        COORDINATOR_WRITE_EXECUTOR_SIZE, COORDINATOR_WRITE_EXECUTOR_NAME);
+  }
+
   // TODO: (xingtanzjr) need to redo once we have a concrete policy for the threadPool management
   private ScheduledExecutorService getScheduledExecutor() {
     return IoTDBThreadPoolFactory.newScheduledThreadPool(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 9f4b2b9d9d..0e5da23ad1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -86,10 +86,11 @@ public class QueryExecution implements IQueryExecution {
   private DistributedQueryPlan distributedPlan;
 
   private final ExecutorService executor;
+  private final ExecutorService writeOperationExecutor;
   private final ScheduledExecutorService scheduledExecutor;
   // TODO need to use factory to decide standalone or cluster
   private final IPartitionFetcher partitionFetcher;
-  // TODO need to use factory to decide standalone or cluster
+  // TODO need to use factory to decide standalone or cluster,
   private final ISchemaFetcher schemaFetcher;
 
   // The result of QueryExecution will be written to the DataBlockManager in current Node.
@@ -103,11 +104,13 @@ public class QueryExecution implements IQueryExecution {
       Statement statement,
       MPPQueryContext context,
       ExecutorService executor,
+      ExecutorService writeOperationExecutor,
       ScheduledExecutorService scheduledExecutor,
       IPartitionFetcher partitionFetcher,
       ISchemaFetcher schemaFetcher,
       IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
     this.executor = executor;
+    this.writeOperationExecutor = writeOperationExecutor;
     this.scheduledExecutor = scheduledExecutor;
     this.context = context;
     this.planOptimizers = new ArrayList<>();
@@ -176,6 +179,7 @@ public class QueryExecution implements IQueryExecution {
                 distributedPlan.getInstances(),
                 context.getQueryType(),
                 executor,
+                writeOperationExecutor,
                 scheduledExecutor,
                 internalServiceClientManager)
             : new StandaloneScheduler(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index ced23a70ac..dd2ca46271 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -58,6 +58,7 @@ public class ClusterScheduler implements IScheduler {
   private List<FragmentInstance> instances;
 
   private ExecutorService executor;
+  private ExecutorService writeOperationExecutor;
   private ScheduledExecutorService scheduledExecutor;
 
   private IFragInstanceDispatcher dispatcher;
@@ -70,6 +71,7 @@ public class ClusterScheduler implements IScheduler {
       List<FragmentInstance> instances,
       QueryType queryType,
       ExecutorService executor,
+      ExecutorService writeOperationExecutor,
       ScheduledExecutorService scheduledExecutor,
       IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
     this.queryContext = queryContext;
@@ -78,7 +80,9 @@ public class ClusterScheduler implements IScheduler {
     this.queryType = queryType;
     this.executor = executor;
     this.scheduledExecutor = scheduledExecutor;
-    this.dispatcher = new SimpleFragInstanceDispatcher(executor, internalServiceClientManager);
+    this.dispatcher =
+        new FragmentInstanceDispatcherImpl(
+            queryType, executor, writeOperationExecutor, internalServiceClientManager);
     this.stateTracker =
         new FixedRateFragInsStateTracker(
             stateMachine, executor, scheduledExecutor, instances, internalServiceClientManager);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
new file mode 100644
index 0000000000..8922de59e3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.scheduler;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.consensus.ConsensusImpl;
+import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo;
+import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
+import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
+import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
+import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
+
+  private static final Logger logger =
+      LoggerFactory.getLogger(FragmentInstanceDispatcherImpl.class);
+  private final ExecutorService executor;
+  private final ExecutorService writeOperationExecutor;
+  private final QueryType type;
+  private final String localhostIpAddr;
+  private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+      internalServiceClientManager;
+
+  public FragmentInstanceDispatcherImpl(
+      QueryType type,
+      ExecutorService executor,
+      ExecutorService writeOperationExecutor,
+      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
+    this.type = type;
+    this.executor = executor;
+    this.writeOperationExecutor = writeOperationExecutor;
+    this.internalServiceClientManager = internalServiceClientManager;
+    this.localhostIpAddr = IoTDBDescriptor.getInstance().getConfig().getInternalIp();
+  }
+
+  @Override
+  public Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance> instances) {
+    if (type == QueryType.READ) {
+      return dispatchRead(instances);
+    } else {
+      return dispatchWrite(instances);
+    }
+  }
+
+  // TODO: (xingtanzjr) currently we use a sequential dispatch policy for READ, which is
+  //  unsafe for current FragmentInstance scheduler framework. We need to implement the
+  //  topological dispatch according to dependency relations between FragmentInstances
+  private Future<FragInstanceDispatchResult> dispatchRead(List<FragmentInstance> instances) {
+    return executor.submit(
+        () -> {
+          for (FragmentInstance instance : instances) {
+            boolean accepted = dispatchOneInstance(instance);
+            if (!accepted) {
+              return new FragInstanceDispatchResult(false);
+            }
+          }
+          return new FragInstanceDispatchResult(true);
+        });
+  }
+
+  // TODO: (xingtanzjr) Return the detailed write states for each FragmentInstance
+  private Future<FragInstanceDispatchResult> dispatchWrite(List<FragmentInstance> instances) {
+    List<Future<Boolean>> futures = new LinkedList<>();
+    for (FragmentInstance instance : instances) {
+      futures.add(writeOperationExecutor.submit(() -> dispatchOneInstance(instance)));
+    }
+    SettableFuture<FragInstanceDispatchResult> resultFuture = SettableFuture.create();
+    for (Future<Boolean> future : futures) {
+      try {
+        Boolean success = future.get();
+        if (!success) {
+          resultFuture.set(new FragInstanceDispatchResult(false));
+          break;
+        }
+      } catch (ExecutionException | InterruptedException e) {
+        if (e instanceof InterruptedException) {
+          Thread.currentThread().interrupt();
+        }
+        resultFuture.setException(e);
+        break;
+      }
+    }
+    resultFuture.set(new FragInstanceDispatchResult(true));
+    return resultFuture;
+  }
+
+  private boolean dispatchOneInstance(FragmentInstance instance)
+      throws FragmentInstanceDispatchException {
+    TEndPoint endPoint = instance.getHostDataNode().getInternalEndPoint();
+    if (isDispatchedToLocal(endPoint)) {
+      return dispatchLocally(instance);
+    } else {
+      return dispatchRemote(instance, endPoint);
+    }
+  }
+
+  private boolean isDispatchedToLocal(TEndPoint endPoint) {
+    return this.localhostIpAddr.equals(endPoint.getIp());
+  }
+
+  private boolean dispatchRemote(FragmentInstance instance, TEndPoint endPoint)
+      throws FragmentInstanceDispatchException {
+    try (SyncDataNodeInternalServiceClient client =
+        internalServiceClientManager.borrowClient(endPoint)) {
+      ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
+      instance.serializeRequest(buffer);
+      buffer.flip();
+      TConsensusGroupId groupId = instance.getRegionReplicaSet().getRegionId();
+      TSendFragmentInstanceReq req =
+          new TSendFragmentInstanceReq(
+              new TFragmentInstance(buffer), groupId, instance.getType().toString());
+      TSendFragmentInstanceResp resp = client.sendFragmentInstance(req);
+      return resp.accepted;
+    } catch (IOException | TException e) {
+      logger.error("can't connect to node {}", endPoint, e);
+      throw new FragmentInstanceDispatchException(e);
+    }
+  }
+
+  private boolean dispatchLocally(FragmentInstance instance)
+      throws FragmentInstanceDispatchException {
+    ConsensusGroupId groupId =
+        ConsensusGroupId.Factory.createFromTConsensusGroupId(
+            instance.getRegionReplicaSet().getRegionId());
+    switch (instance.getType()) {
+      case READ:
+        FragmentInstanceInfo info =
+            (FragmentInstanceInfo) ConsensusImpl.getInstance().read(groupId, instance).getDataset();
+        return !info.getState().isFailed();
+      case WRITE:
+        PlanNode planNode = instance.getFragment().getRoot();
+        if (planNode instanceof InsertNode) {
+          try {
+            SchemaValidator.validate((InsertNode) planNode);
+          } catch (SemanticException e) {
+            throw new FragmentInstanceDispatchException(e);
+          }
+        }
+        ConsensusWriteResponse resp = ConsensusImpl.getInstance().write(groupId, instance);
+        return TSStatusCode.SUCCESS_STATUS.getStatusCode() == resp.getStatus().getCode();
+    }
+    throw new UnsupportedOperationException(
+        String.format("unknown query type [%s]", instance.getType()));
+  }
+
+  @Override
+  public void abort() {}
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java
index c6694843a0..48b4137bb2 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java
@@ -78,6 +78,7 @@ public class QueryPlannerTest {
                 new TEndPoint(),
                 new TEndPoint()),
             IoTDBThreadPoolFactory.newSingleThreadExecutor("test_query"),
+            IoTDBThreadPoolFactory.newSingleThreadExecutor("test_write_operation"),
             IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("test_query_scheduled"),
             new FakePartitionFetcherImpl(),
             new FakeSchemaFetcherImpl(),


[iotdb] 02/02: complete basic verification

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/scheduler
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d7f4a4a8bad39b9ce8bddb071aa8f127c0a9320e
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed May 11 12:50:44 2022 +0800

    complete basic verification
---
 .../java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java    | 10 +++-------
 .../apache/iotdb/db/mpp/common/schematree/PathPatternTree.java |  1 +
 .../db/mpp/execution/fragment/FragmentInstanceManager.java     |  2 +-
 .../planner/plan/node/metedata/read/SchemaFetchScanNode.java   |  1 +
 .../db/mpp/plan/planner/plan/node/process/ExchangeNode.java    |  2 +-
 5 files changed, 7 insertions(+), 9 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
index 01cde7ef0b..62a0d0b692 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
@@ -82,18 +82,14 @@ public class PlanFragmentId {
 
   @Override
   public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
     PlanFragmentId that = (PlanFragmentId) o;
     return id == that.id && Objects.equals(queryId, that.queryId);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(queryId, id, nextFragmentInstanceId);
+    return Objects.hash(queryId, id);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/PathPatternTree.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/PathPatternTree.java
index 4002204237..d7e70aea7e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/PathPatternTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/PathPatternTree.java
@@ -48,6 +48,7 @@ public class PathPatternTree {
 
   public PathPatternTree(PathPatternNode root) {
     this.root = root;
+    this.pathList = new ArrayList<>();
   }
 
   public PathPatternTree(PartialPath devicePath, String[] measurements) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index dbbed705c5..29d41fd934 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -116,6 +116,7 @@ public class FragmentInstanceManager {
                 return createFragmentInstanceExecution(
                     scheduler, instanceId, context, driver, stateMachine, failedInstances);
               } catch (Throwable t) {
+                logger.error("error when create FragmentInstanceExecution.", t);
                 stateMachine.failed(t);
                 return null;
               }
@@ -127,7 +128,6 @@ public class FragmentInstanceManager {
   public FragmentInstanceInfo execSchemaQueryFragmentInstance(
       FragmentInstance instance, ISchemaRegion schemaRegion) {
     FragmentInstanceId instanceId = instance.getId();
-
     FragmentInstanceExecution execution =
         instanceExecution.computeIfAbsent(
             instanceId,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaFetchScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaFetchScanNode.java
index 3a360954dd..7f0f0e9310 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaFetchScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaFetchScanNode.java
@@ -47,6 +47,7 @@ public class SchemaFetchScanNode extends SourceNode {
     super(id);
     this.storageGroup = storageGroup;
     this.patternTree = patternTree;
+    this.patternTree.constructTree();
   }
 
   public PartialPath getStorageGroup() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java
index db68bb9624..fe110473c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java
@@ -129,7 +129,6 @@ public class ExchangeNode extends PlanNode {
     ReadWriteIOUtils.write(upstreamEndpoint.getPort(), byteBuffer);
     upstreamInstanceId.serialize(byteBuffer);
     upstreamPlanNodeId.serialize(byteBuffer);
-    List<String> outputColumnNames = remoteSourceNode.getOutputColumnNames();
     ReadWriteIOUtils.write(outputColumnNames.size(), byteBuffer);
     for (String outputColumnName : outputColumnNames) {
       ReadWriteIOUtils.write(outputColumnName, byteBuffer);
@@ -165,6 +164,7 @@ public class ExchangeNode extends PlanNode {
 
   public void setRemoteSourceNode(FragmentSinkNode remoteSourceNode) {
     this.remoteSourceNode = remoteSourceNode;
+    this.setOutputColumnNames(remoteSourceNode.getOutputColumnNames());
   }
 
   public void cleanChildren() {