You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/07/18 11:13:57 UTC

[iotdb] branch master updated: [IOTDB-3830] Refactor schema fetch (#6678)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new adedb34e4f [IOTDB-3830] Refactor schema fetch (#6678)
adedb34e4f is described below

commit adedb34e4f9ef0204058a092d58757c668948f9d
Author: Marcos_Zyk <38...@users.noreply.github.com>
AuthorDate: Mon Jul 18 19:13:51 2022 +0800

    [IOTDB-3830] Refactor schema fetch (#6678)
---
 .../operator/schema/SchemaFetchMergeOperator.java  | 53 ++++++++++++-
 .../operator/schema/SchemaFetchScanOperator.java   |  4 +
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  | 59 ++++++---------
 .../db/mpp/plan/analyze/ClusterSchemaFetcher.java  | 88 ++++++++++------------
 .../db/mpp/plan/analyze/FakeSchemaFetcherImpl.java |  6 --
 .../iotdb/db/mpp/plan/analyze/ISchemaFetcher.java  |  3 -
 .../mpp/plan/analyze/StandaloneSchemaFetcher.java  |  6 --
 .../db/mpp/plan/planner/LocalExecutionPlanner.java |  2 +-
 .../db/mpp/plan/planner/LogicalPlanBuilder.java    |  4 +-
 .../db/mpp/plan/planner/LogicalPlanVisitor.java    |  9 +--
 .../node/metedata/read/SchemaFetchMergeNode.java   | 29 ++++++-
 .../statement/internal/SchemaFetchStatement.java   | 13 +---
 .../schema/SchemaFetchScanOperatorTest.java        |  6 +-
 13 files changed, 152 insertions(+), 130 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
index 31575a5be9..416abb9c77 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
@@ -23,24 +23,41 @@ import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.util.List;
+import java.util.Optional;
 
 public class SchemaFetchMergeOperator implements ProcessOperator {
 
   private final OperatorContext operatorContext;
   private final List<Operator> children;
-
   private final int childrenCount;
+
   private int currentIndex;
 
-  public SchemaFetchMergeOperator(OperatorContext operatorContext, List<Operator> children) {
+  private boolean isReadingStorageGroupInfo;
+
+  private final List<String> storageGroupList;
+
+  public SchemaFetchMergeOperator(
+      OperatorContext operatorContext, List<Operator> children, List<String> storageGroupList) {
     this.operatorContext = operatorContext;
     this.children = children;
     this.childrenCount = children.size();
+
     this.currentIndex = 0;
+
+    this.isReadingStorageGroupInfo = true;
+
+    this.storageGroupList = storageGroupList;
   }
 
   @Override
@@ -50,6 +67,11 @@ public class SchemaFetchMergeOperator implements ProcessOperator {
 
   @Override
   public TsBlock next() {
+    if (isReadingStorageGroupInfo) {
+      isReadingStorageGroupInfo = false;
+      return generateStorageGroupInfo();
+    }
+
     if (children.get(currentIndex).hasNext()) {
       return children.get(currentIndex).next();
     } else {
@@ -60,12 +82,14 @@ public class SchemaFetchMergeOperator implements ProcessOperator {
 
   @Override
   public boolean hasNext() {
-    return currentIndex < childrenCount;
+    return isReadingStorageGroupInfo || currentIndex < childrenCount;
   }
 
   @Override
   public ListenableFuture<?> isBlocked() {
-    return currentIndex < children.size() ? children.get(currentIndex).isBlocked() : NOT_BLOCKED;
+    return isReadingStorageGroupInfo || currentIndex >= children.size()
+        ? NOT_BLOCKED
+        : children.get(currentIndex).isBlocked();
   }
 
   @Override
@@ -79,4 +103,25 @@ public class SchemaFetchMergeOperator implements ProcessOperator {
       child.close();
     }
   }
+
+  private TsBlock generateStorageGroupInfo() {
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    try {
+      // to indicate this binary data is storage group info
+      ReadWriteIOUtils.write((byte) 0, outputStream);
+
+      ReadWriteIOUtils.write(storageGroupList.size(), outputStream);
+      for (String storageGroup : storageGroupList) {
+        ReadWriteIOUtils.write(storageGroup, outputStream);
+      }
+    } catch (IOException e) {
+      // Totally memory operation. This case won't happen.
+    }
+    return new TsBlock(
+        new TimeColumn(1, new long[] {0}),
+        new BinaryColumn(
+            1,
+            Optional.of(new boolean[] {false}),
+            new Binary[] {new Binary(outputStream.toByteArray())}));
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java
index b21aac7ef3..6437f7eee5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
 import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -108,6 +109,9 @@ public class SchemaFetchScanOperator implements SourceOperator {
 
     ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
     try {
+      // to indicate this binary data is storage group info
+      ReadWriteIOUtils.write((byte) 1, outputStream);
+
       schemaTree.serialize(outputStream);
     } catch (IOException e) {
       // Totally memory operation. This case won't happen.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 7d07655575..59ad88efa7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -18,15 +18,12 @@
  */
 package org.apache.iotdb.db.mpp.plan.analyze;
 
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
 import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
 import org.apache.iotdb.commons.partition.SchemaPartition;
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.sql.MeasurementNotExistException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
@@ -1151,7 +1148,15 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
       SchemaFetchStatement schemaFetchStatement, MPPQueryContext context) {
     Analysis analysis = new Analysis();
     analysis.setStatement(schemaFetchStatement);
-    analysis.setSchemaPartitionInfo(schemaFetchStatement.getSchemaPartition());
+
+    SchemaPartition schemaPartition =
+        partitionFetcher.getSchemaPartition(schemaFetchStatement.getPatternTree());
+    analysis.setSchemaPartitionInfo(schemaPartition);
+
+    if (schemaPartition.isEmpty()) {
+      analysis.setFinishQueryAfterAnalyze(true);
+    }
+
     return analysis;
   }
 
@@ -1288,43 +1293,23 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
       patternTree.appendPathPattern(pathPattern);
     }
 
-    SchemaPartition schemaPartition = partitionFetcher.getSchemaPartition(patternTree);
-
-    SchemaTree schemaTree = schemaFetcher.fetchSchema(patternTree, schemaPartition);
+    SchemaTree schemaTree = schemaFetcher.fetchSchema(patternTree);
     analysis.setSchemaTree(schemaTree);
 
     Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
 
-    Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> schemaPartitionMap =
-        schemaPartition.getSchemaPartitionMap();
-
-    // todo keep the behaviour consistency of cluster and standalone,
-    // the behaviour of standalone fetcher and LocalConfigNode is not consistent with that of
-    // cluster mode's
-    if (IoTDBDescriptor.getInstance().getConfig().isClusterMode()) {
-      for (String storageGroup : schemaPartitionMap.keySet()) {
-        sgNameToQueryParamsMap.put(
-            storageGroup,
-            schemaPartitionMap.get(storageGroup).keySet().stream()
-                .map(DataPartitionQueryParam::new)
-                .collect(Collectors.toList()));
-      }
-    } else {
-      // the StandalonePartitionFetcher and LocalConfigNode now doesn't support partition fetch
-      // via slotId
-      schemaTree
-          .getMatchedDevices(new PartialPath(ALL_RESULT_NODES))
-          .forEach(
-              deviceSchemaInfo -> {
-                PartialPath devicePath = deviceSchemaInfo.getDevicePath();
-                DataPartitionQueryParam queryParam = new DataPartitionQueryParam();
-                queryParam.setDevicePath(devicePath.getFullPath());
-                sgNameToQueryParamsMap
-                    .computeIfAbsent(
-                        schemaTree.getBelongedStorageGroup(devicePath), key -> new ArrayList<>())
-                    .add(queryParam);
-              });
-    }
+    schemaTree
+        .getMatchedDevices(new PartialPath(ALL_RESULT_NODES))
+        .forEach(
+            deviceSchemaInfo -> {
+              PartialPath devicePath = deviceSchemaInfo.getDevicePath();
+              DataPartitionQueryParam queryParam = new DataPartitionQueryParam();
+              queryParam.setDevicePath(devicePath.getFullPath());
+              sgNameToQueryParamsMap
+                  .computeIfAbsent(
+                      schemaTree.getBelongedStorageGroup(devicePath), key -> new ArrayList<>())
+                  .add(queryParam);
+            });
 
     DataPartition dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
     analysis.setDataPartitionInfo(dataPartition);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
index 19356f6471..44193ec8e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
@@ -18,10 +18,8 @@
  */
 package org.apache.iotdb.db.mpp.plan.analyze;
 
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
-import org.apache.iotdb.commons.partition.SchemaPartition;
+import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -45,18 +43,19 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import io.airlift.concurrent.SetThreadName;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -68,7 +67,6 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
   private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
   private final Coordinator coordinator = Coordinator.getInstance();
-  private final IPartitionFetcher partitionFetcher = ClusterPartitionFetcher.getInstance();
   private final DataNodeSchemaCache schemaCache = DataNodeSchemaCache.getInstance();
 
   private static final class ClusterSchemaFetcherHolder {
@@ -85,28 +83,15 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
 
   @Override
   public SchemaTree fetchSchema(PathPatternTree patternTree) {
-    return fetchSchema(patternTree, partitionFetcher.getSchemaPartition(patternTree));
-  }
-
-  @Override
-  public SchemaTree fetchSchema(PathPatternTree patternTree, SchemaPartition schemaPartition) {
-    Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> schemaPartitionMap =
-        schemaPartition.getSchemaPartitionMap();
-    List<String> storageGroups = new ArrayList<>(schemaPartitionMap.keySet());
-
-    SchemaFetchStatement schemaFetchStatement = new SchemaFetchStatement(patternTree);
-    schemaFetchStatement.setSchemaPartition(schemaPartition);
-
-    SchemaTree result = executeSchemaFetchQuery(schemaFetchStatement);
-    result.setStorageGroups(storageGroups);
-    return result;
+    return executeSchemaFetchQuery(new SchemaFetchStatement(patternTree));
   }
 
   private SchemaTree executeSchemaFetchQuery(SchemaFetchStatement schemaFetchStatement) {
     long queryId = SessionManager.getInstance().requestQueryId(false);
     try {
       ExecutionResult executionResult =
-          coordinator.execute(schemaFetchStatement, queryId, null, "", partitionFetcher, this);
+          coordinator.execute(
+              schemaFetchStatement, queryId, null, "", ClusterPartitionFetcher.getInstance(), this);
       // TODO: (xingtanzjr) throw exception
       if (executionResult.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         throw new RuntimeException(
@@ -116,6 +101,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
       }
       try (SetThreadName threadName = new SetThreadName(executionResult.queryId.getId())) {
         SchemaTree result = new SchemaTree();
+        List<String> storageGroupList = new ArrayList<>();
         while (coordinator.getQueryExecution(queryId).hasNextResult()) {
           // The query will be transited to FINISHED when invoking getBatchResult() at the last time
           // So we don't need to clean up it manually
@@ -123,20 +109,12 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
           if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) {
             break;
           }
-          Binary binary;
-          SchemaTree fetchedSchemaTree;
           Column column = tsBlock.get().getColumn(0);
           for (int i = 0; i < column.getPositionCount(); i++) {
-            binary = column.getBinary(i);
-            try {
-              fetchedSchemaTree =
-                  SchemaTree.deserialize(new ByteArrayInputStream(binary.getValues()));
-              result.mergeSchemaTree(fetchedSchemaTree);
-            } catch (IOException e) {
-              // Totally memory operation. This case won't happen.
-            }
+            parseFetchedData(column.getBinary(i), result, storageGroupList);
           }
         }
+        result.setStorageGroups(storageGroupList);
         return result;
       }
     } finally {
@@ -144,6 +122,27 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
     }
   }
 
+  private void parseFetchedData(
+      Binary data, SchemaTree resultSchemaTree, List<String> storageGroupList) {
+    InputStream inputStream = new ByteArrayInputStream(data.getValues());
+    try {
+      byte type = ReadWriteIOUtils.readByte(inputStream);
+      if (type == 0) {
+        int size = ReadWriteIOUtils.readInt(inputStream);
+        for (int i = 0; i < size; i++) {
+          storageGroupList.add(ReadWriteIOUtils.readString(inputStream));
+        }
+      } else if (type == 1) {
+        resultSchemaTree.mergeSchemaTree(SchemaTree.deserialize(inputStream));
+      } else {
+        throw new RuntimeException(
+            new MetadataException("Failed to fetch schema because of unrecognized data"));
+      }
+    } catch (IOException e) {
+      // Totally memory operation. This case won't happen.
+    }
+  }
+
   @Override
   public SchemaTree fetchSchemaWithAutoCreate(
       PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, boolean isAligned) {
@@ -161,20 +160,14 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
       return schemaTree;
     }
 
-    SchemaTree remoteSchemaTree;
+    SchemaTree remoteSchemaTree = fetchSchema(patternTree);
+    schemaTree.mergeSchemaTree(remoteSchemaTree);
+    schemaCache.put(remoteSchemaTree);
 
     if (!config.isAutoCreateSchemaEnabled()) {
-      remoteSchemaTree = fetchSchema(patternTree, partitionFetcher.getSchemaPartition(patternTree));
-      schemaTree.mergeSchemaTree(remoteSchemaTree);
-      schemaCache.put(remoteSchemaTree);
       return schemaTree;
     }
 
-    remoteSchemaTree =
-        fetchSchema(patternTree, partitionFetcher.getOrCreateSchemaPartition(patternTree));
-    schemaTree.mergeSchemaTree(remoteSchemaTree);
-    schemaCache.put(remoteSchemaTree);
-
     SchemaTree missingSchemaTree =
         checkAndAutoCreateMissingMeasurements(
             remoteSchemaTree,
@@ -216,20 +209,14 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
       return schemaTree;
     }
 
-    SchemaTree remoteSchemaTree;
+    SchemaTree remoteSchemaTree = fetchSchema(patternTree);
+    schemaTree.mergeSchemaTree(remoteSchemaTree);
+    schemaCache.put(remoteSchemaTree);
 
     if (!config.isAutoCreateSchemaEnabled()) {
-      remoteSchemaTree = fetchSchema(patternTree, partitionFetcher.getSchemaPartition(patternTree));
-      schemaTree.mergeSchemaTree(remoteSchemaTree);
-      schemaCache.put(remoteSchemaTree);
       return schemaTree;
     }
 
-    remoteSchemaTree =
-        fetchSchema(patternTree, partitionFetcher.getOrCreateSchemaPartition(patternTree));
-    schemaTree.mergeSchemaTree(remoteSchemaTree);
-    schemaCache.put(remoteSchemaTree);
-
     SchemaTree missingSchemaTree;
     for (int i = 0; i < devicePathList.size(); i++) {
       missingSchemaTree =
@@ -336,7 +323,8 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
       InternalCreateTimeSeriesStatement statement) {
     long queryId = SessionManager.getInstance().requestQueryId(false);
     ExecutionResult executionResult =
-        coordinator.execute(statement, queryId, null, "", partitionFetcher, this);
+        coordinator.execute(
+            statement, queryId, null, "", ClusterPartitionFetcher.getInstance(), this);
     // TODO: throw exception
     int statusCode = executionResult.status.getCode();
     if (statusCode == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
index 286a615d06..e2dfe00b0c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.mpp.plan.analyze;
 
-import org.apache.iotdb.commons.partition.SchemaPartition;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
 import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
@@ -43,11 +42,6 @@ public class FakeSchemaFetcherImpl implements ISchemaFetcher {
     return schemaTree;
   }
 
-  @Override
-  public SchemaTree fetchSchema(PathPatternTree patternTree, SchemaPartition schemaPartition) {
-    return null;
-  }
-
   @Override
   public SchemaTree fetchSchemaWithAutoCreate(
       PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, boolean aligned) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java
index 182222ebeb..44a583fb9c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.mpp.plan.analyze;
 
-import org.apache.iotdb.commons.partition.SchemaPartition;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
 import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
@@ -34,8 +33,6 @@ public interface ISchemaFetcher {
 
   SchemaTree fetchSchema(PathPatternTree patternTree);
 
-  SchemaTree fetchSchema(PathPatternTree patternTree, SchemaPartition schemaPartition);
-
   SchemaTree fetchSchemaWithAutoCreate(
       PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, boolean aligned);
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
index 2d190a79c3..dbe9f08e51 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.db.mpp.plan.analyze;
 
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.commons.exception.MetadataException;
-import org.apache.iotdb.commons.partition.SchemaPartition;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -83,11 +82,6 @@ public class StandaloneSchemaFetcher implements ISchemaFetcher {
     return schemaTree;
   }
 
-  @Override
-  public SchemaTree fetchSchema(PathPatternTree patternTree, SchemaPartition schemaPartition) {
-    return fetchSchema(patternTree);
-  }
-
   @Override
   public SchemaTree fetchSchemaWithAutoCreate(
       PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, boolean aligned) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 2cddb29cc5..fa1cc2fb11 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -1160,7 +1160,7 @@ public class LocalExecutionPlanner {
               node.getPlanNodeId(),
               SchemaFetchMergeOperator.class.getSimpleName());
       context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
-      return new SchemaFetchMergeOperator(operatorContext, children);
+      return new SchemaFetchMergeOperator(operatorContext, children, node.getStorageGroupList());
     }
 
     @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index 8b4c3c39f1..b65d87b4ea 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -701,8 +701,8 @@ public class LogicalPlanBuilder {
     return this;
   }
 
-  public LogicalPlanBuilder planSchemaFetchMerge() {
-    this.root = new SchemaFetchMergeNode(context.getQueryId().genPlanNodeId());
+  public LogicalPlanBuilder planSchemaFetchMerge(List<String> storageGroupList) {
+    this.root = new SchemaFetchMergeNode(context.getQueryId().genPlanNodeId(), storageGroupList);
     return this;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
index 1575c5d635..38044f2fe2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
@@ -634,12 +634,11 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
   public PlanNode visitSchemaFetch(
       SchemaFetchStatement schemaFetchStatement, MPPQueryContext context) {
     LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context);
+    List<String> storageGroupList =
+        new ArrayList<>(analysis.getSchemaPartitionInfo().getSchemaPartitionMap().keySet());
     return planBuilder
-        .planSchemaFetchMerge()
-        .planSchemaFetchSource(
-            new ArrayList<>(
-                schemaFetchStatement.getSchemaPartition().getSchemaPartitionMap().keySet()),
-            schemaFetchStatement.getPatternTree())
+        .planSchemaFetchMerge(storageGroupList)
+        .planSchemaFetchSource(storageGroupList, schemaFetchStatement.getPatternTree())
         .getRoot();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaFetchMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaFetchMergeNode.java
index 8d1b999ef9..be235cf920 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaFetchMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaFetchMergeNode.java
@@ -23,21 +23,35 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 
 /** This class defines the scanned result merge task of schema fetcher. */
 public class SchemaFetchMergeNode extends AbstractSchemaMergeNode {
 
-  public SchemaFetchMergeNode(PlanNodeId id) {
+  private List<String> storageGroupList;
+
+  public SchemaFetchMergeNode(PlanNodeId id, List<String> storageGroupList) {
     super(id);
+    this.storageGroupList = storageGroupList;
+  }
+
+  public List<String> getStorageGroupList() {
+    return storageGroupList;
+  }
+
+  public void setStorageGroupList(List<String> storageGroupList) {
+    this.storageGroupList = storageGroupList;
   }
 
   @Override
   public PlanNode clone() {
-    return new SchemaFetchMergeNode(getPlanNodeId());
+    return new SchemaFetchMergeNode(getPlanNodeId(), storageGroupList);
   }
 
   @Override
@@ -48,11 +62,20 @@ public class SchemaFetchMergeNode extends AbstractSchemaMergeNode {
   @Override
   protected void serializeAttributes(DataOutputStream stream) throws IOException {
     PlanNodeType.SCHEMA_FETCH_MERGE.serialize(stream);
+    ReadWriteIOUtils.write(storageGroupList.size(), stream);
+    for (String storageGroup : storageGroupList) {
+      ReadWriteIOUtils.write(storageGroup, stream);
+    }
   }
 
   public static PlanNode deserialize(ByteBuffer byteBuffer) {
+    int size = ReadWriteIOUtils.readInt(byteBuffer);
+    List<String> storageGroupList = new ArrayList<>(size);
+    for (int i = 0; i < size; i++) {
+      storageGroupList.add(ReadWriteIOUtils.readString(byteBuffer));
+    }
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
-    return new SchemaFetchMergeNode(planNodeId);
+    return new SchemaFetchMergeNode(planNodeId, storageGroupList);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/internal/SchemaFetchStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/internal/SchemaFetchStatement.java
index fde6a01a08..92eca3fa58 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/internal/SchemaFetchStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/internal/SchemaFetchStatement.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.mpp.plan.statement.internal;
 
-import org.apache.iotdb.commons.partition.SchemaPartition;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
 import org.apache.iotdb.db.mpp.plan.constant.StatementType;
@@ -30,9 +29,7 @@ import java.util.List;
 
 public class SchemaFetchStatement extends Statement {
 
-  private PathPatternTree patternTree;
-
-  private SchemaPartition schemaPartition;
+  private final PathPatternTree patternTree;
 
   public SchemaFetchStatement(PathPatternTree patternTree) {
     super();
@@ -44,14 +41,6 @@ public class SchemaFetchStatement extends Statement {
     return patternTree;
   }
 
-  public SchemaPartition getSchemaPartition() {
-    return schemaPartition;
-  }
-
-  public void setSchemaPartition(SchemaPartition schemaPartition) {
-    this.schemaPartition = schemaPartition;
-  }
-
   @Override
   public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
     return visitor.visitSchemaFetch(this, context);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperatorTest.java
index 7a2ce68018..724efd2b98 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperatorTest.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import org.junit.After;
@@ -44,6 +45,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
+import java.io.InputStream;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -80,7 +82,9 @@ public class SchemaFetchScanOperatorTest {
     Assert.assertFalse(schemaFetchScanOperator.hasNext());
 
     Binary binary = tsBlock.getColumn(0).getBinary(0);
-    SchemaTree schemaTree = SchemaTree.deserialize(new ByteArrayInputStream(binary.getValues()));
+    InputStream inputStream = new ByteArrayInputStream(binary.getValues());
+    Assert.assertEquals(1, ReadWriteIOUtils.readByte(inputStream));
+    SchemaTree schemaTree = SchemaTree.deserialize(inputStream);
 
     DeviceSchemaInfo deviceSchemaInfo =
         schemaTree.searchDeviceSchemaInfo(