You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/07/18 11:13:57 UTC
[iotdb] branch master updated: [IOTDB-3830] Refactor schema fetch (#6678)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new adedb34e4f [IOTDB-3830] Refactor schema fetch (#6678)
adedb34e4f is described below
commit adedb34e4f9ef0204058a092d58757c668948f9d
Author: Marcos_Zyk <38...@users.noreply.github.com>
AuthorDate: Mon Jul 18 19:13:51 2022 +0800
[IOTDB-3830] Refactor schema fetch (#6678)
---
.../operator/schema/SchemaFetchMergeOperator.java | 53 ++++++++++++-
.../operator/schema/SchemaFetchScanOperator.java | 4 +
.../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 59 ++++++---------
.../db/mpp/plan/analyze/ClusterSchemaFetcher.java | 88 ++++++++++------------
.../db/mpp/plan/analyze/FakeSchemaFetcherImpl.java | 6 --
.../iotdb/db/mpp/plan/analyze/ISchemaFetcher.java | 3 -
.../mpp/plan/analyze/StandaloneSchemaFetcher.java | 6 --
.../db/mpp/plan/planner/LocalExecutionPlanner.java | 2 +-
.../db/mpp/plan/planner/LogicalPlanBuilder.java | 4 +-
.../db/mpp/plan/planner/LogicalPlanVisitor.java | 9 +--
.../node/metedata/read/SchemaFetchMergeNode.java | 29 ++++++-
.../statement/internal/SchemaFetchStatement.java | 13 +---
.../schema/SchemaFetchScanOperatorTest.java | 6 +-
13 files changed, 152 insertions(+), 130 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
index 31575a5be9..416abb9c77 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
@@ -23,24 +23,41 @@ import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import com.google.common.util.concurrent.ListenableFuture;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.util.List;
+import java.util.Optional;
public class SchemaFetchMergeOperator implements ProcessOperator {
private final OperatorContext operatorContext;
private final List<Operator> children;
-
private final int childrenCount;
+
private int currentIndex;
- public SchemaFetchMergeOperator(OperatorContext operatorContext, List<Operator> children) {
+ private boolean isReadingStorageGroupInfo;
+
+ private final List<String> storageGroupList;
+
+ public SchemaFetchMergeOperator(
+ OperatorContext operatorContext, List<Operator> children, List<String> storageGroupList) {
this.operatorContext = operatorContext;
this.children = children;
this.childrenCount = children.size();
+
this.currentIndex = 0;
+
+ this.isReadingStorageGroupInfo = true;
+
+ this.storageGroupList = storageGroupList;
}
@Override
@@ -50,6 +67,11 @@ public class SchemaFetchMergeOperator implements ProcessOperator {
@Override
public TsBlock next() {
+ if (isReadingStorageGroupInfo) {
+ isReadingStorageGroupInfo = false;
+ return generateStorageGroupInfo();
+ }
+
if (children.get(currentIndex).hasNext()) {
return children.get(currentIndex).next();
} else {
@@ -60,12 +82,14 @@ public class SchemaFetchMergeOperator implements ProcessOperator {
@Override
public boolean hasNext() {
- return currentIndex < childrenCount;
+ return isReadingStorageGroupInfo || currentIndex < childrenCount;
}
@Override
public ListenableFuture<?> isBlocked() {
- return currentIndex < children.size() ? children.get(currentIndex).isBlocked() : NOT_BLOCKED;
+ return isReadingStorageGroupInfo || currentIndex >= children.size()
+ ? NOT_BLOCKED
+ : children.get(currentIndex).isBlocked();
}
@Override
@@ -79,4 +103,25 @@ public class SchemaFetchMergeOperator implements ProcessOperator {
child.close();
}
}
+
+ private TsBlock generateStorageGroupInfo() {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ try {
+ // to indicate this binary data is storage group info
+ ReadWriteIOUtils.write((byte) 0, outputStream);
+
+ ReadWriteIOUtils.write(storageGroupList.size(), outputStream);
+ for (String storageGroup : storageGroupList) {
+ ReadWriteIOUtils.write(storageGroup, outputStream);
+ }
+ } catch (IOException e) {
+ // Totally memory operation. This case won't happen.
+ }
+ return new TsBlock(
+ new TimeColumn(1, new long[] {0}),
+ new BinaryColumn(
+ 1,
+ Optional.of(new boolean[] {false}),
+ new Binary[] {new Binary(outputStream.toByteArray())}));
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java
index b21aac7ef3..6437f7eee5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -108,6 +109,9 @@ public class SchemaFetchScanOperator implements SourceOperator {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try {
+ // to indicate this binary data is storage group info
+ ReadWriteIOUtils.write((byte) 1, outputStream);
+
schemaTree.serialize(outputStream);
} catch (IOException e) {
// Totally memory operation. This case won't happen.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 7d07655575..59ad88efa7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -18,15 +18,12 @@
*/
package org.apache.iotdb.db.mpp.plan.analyze;
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.sql.MeasurementNotExistException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
@@ -1151,7 +1148,15 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
SchemaFetchStatement schemaFetchStatement, MPPQueryContext context) {
Analysis analysis = new Analysis();
analysis.setStatement(schemaFetchStatement);
- analysis.setSchemaPartitionInfo(schemaFetchStatement.getSchemaPartition());
+
+ SchemaPartition schemaPartition =
+ partitionFetcher.getSchemaPartition(schemaFetchStatement.getPatternTree());
+ analysis.setSchemaPartitionInfo(schemaPartition);
+
+ if (schemaPartition.isEmpty()) {
+ analysis.setFinishQueryAfterAnalyze(true);
+ }
+
return analysis;
}
@@ -1288,43 +1293,23 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
patternTree.appendPathPattern(pathPattern);
}
- SchemaPartition schemaPartition = partitionFetcher.getSchemaPartition(patternTree);
-
- SchemaTree schemaTree = schemaFetcher.fetchSchema(patternTree, schemaPartition);
+ SchemaTree schemaTree = schemaFetcher.fetchSchema(patternTree);
analysis.setSchemaTree(schemaTree);
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
- Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> schemaPartitionMap =
- schemaPartition.getSchemaPartitionMap();
-
- // todo keep the behaviour consistency of cluster and standalone,
- // the behaviour of standalone fetcher and LocalConfigNode is not consistent with that of
- // cluster mode's
- if (IoTDBDescriptor.getInstance().getConfig().isClusterMode()) {
- for (String storageGroup : schemaPartitionMap.keySet()) {
- sgNameToQueryParamsMap.put(
- storageGroup,
- schemaPartitionMap.get(storageGroup).keySet().stream()
- .map(DataPartitionQueryParam::new)
- .collect(Collectors.toList()));
- }
- } else {
- // the StandalonePartitionFetcher and LocalConfigNode now doesn't support partition fetch
- // via slotId
- schemaTree
- .getMatchedDevices(new PartialPath(ALL_RESULT_NODES))
- .forEach(
- deviceSchemaInfo -> {
- PartialPath devicePath = deviceSchemaInfo.getDevicePath();
- DataPartitionQueryParam queryParam = new DataPartitionQueryParam();
- queryParam.setDevicePath(devicePath.getFullPath());
- sgNameToQueryParamsMap
- .computeIfAbsent(
- schemaTree.getBelongedStorageGroup(devicePath), key -> new ArrayList<>())
- .add(queryParam);
- });
- }
+ schemaTree
+ .getMatchedDevices(new PartialPath(ALL_RESULT_NODES))
+ .forEach(
+ deviceSchemaInfo -> {
+ PartialPath devicePath = deviceSchemaInfo.getDevicePath();
+ DataPartitionQueryParam queryParam = new DataPartitionQueryParam();
+ queryParam.setDevicePath(devicePath.getFullPath());
+ sgNameToQueryParamsMap
+ .computeIfAbsent(
+ schemaTree.getBelongedStorageGroup(devicePath), key -> new ArrayList<>())
+ .add(queryParam);
+ });
DataPartition dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
analysis.setDataPartitionInfo(dataPartition);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
index 19356f6471..44193ec8e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
@@ -18,10 +18,8 @@
*/
package org.apache.iotdb.db.mpp.plan.analyze;
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
-import org.apache.iotdb.commons.partition.SchemaPartition;
+import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -45,18 +43,19 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import io.airlift.concurrent.SetThreadName;
import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -68,7 +67,6 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private final Coordinator coordinator = Coordinator.getInstance();
- private final IPartitionFetcher partitionFetcher = ClusterPartitionFetcher.getInstance();
private final DataNodeSchemaCache schemaCache = DataNodeSchemaCache.getInstance();
private static final class ClusterSchemaFetcherHolder {
@@ -85,28 +83,15 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
@Override
public SchemaTree fetchSchema(PathPatternTree patternTree) {
- return fetchSchema(patternTree, partitionFetcher.getSchemaPartition(patternTree));
- }
-
- @Override
- public SchemaTree fetchSchema(PathPatternTree patternTree, SchemaPartition schemaPartition) {
- Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> schemaPartitionMap =
- schemaPartition.getSchemaPartitionMap();
- List<String> storageGroups = new ArrayList<>(schemaPartitionMap.keySet());
-
- SchemaFetchStatement schemaFetchStatement = new SchemaFetchStatement(patternTree);
- schemaFetchStatement.setSchemaPartition(schemaPartition);
-
- SchemaTree result = executeSchemaFetchQuery(schemaFetchStatement);
- result.setStorageGroups(storageGroups);
- return result;
+ return executeSchemaFetchQuery(new SchemaFetchStatement(patternTree));
}
private SchemaTree executeSchemaFetchQuery(SchemaFetchStatement schemaFetchStatement) {
long queryId = SessionManager.getInstance().requestQueryId(false);
try {
ExecutionResult executionResult =
- coordinator.execute(schemaFetchStatement, queryId, null, "", partitionFetcher, this);
+ coordinator.execute(
+ schemaFetchStatement, queryId, null, "", ClusterPartitionFetcher.getInstance(), this);
// TODO: (xingtanzjr) throw exception
if (executionResult.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new RuntimeException(
@@ -116,6 +101,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
}
try (SetThreadName threadName = new SetThreadName(executionResult.queryId.getId())) {
SchemaTree result = new SchemaTree();
+ List<String> storageGroupList = new ArrayList<>();
while (coordinator.getQueryExecution(queryId).hasNextResult()) {
// The query will be transited to FINISHED when invoking getBatchResult() at the last time
// So we don't need to clean up it manually
@@ -123,20 +109,12 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) {
break;
}
- Binary binary;
- SchemaTree fetchedSchemaTree;
Column column = tsBlock.get().getColumn(0);
for (int i = 0; i < column.getPositionCount(); i++) {
- binary = column.getBinary(i);
- try {
- fetchedSchemaTree =
- SchemaTree.deserialize(new ByteArrayInputStream(binary.getValues()));
- result.mergeSchemaTree(fetchedSchemaTree);
- } catch (IOException e) {
- // Totally memory operation. This case won't happen.
- }
+ parseFetchedData(column.getBinary(i), result, storageGroupList);
}
}
+ result.setStorageGroups(storageGroupList);
return result;
}
} finally {
@@ -144,6 +122,27 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
}
}
+ private void parseFetchedData(
+ Binary data, SchemaTree resultSchemaTree, List<String> storageGroupList) {
+ InputStream inputStream = new ByteArrayInputStream(data.getValues());
+ try {
+ byte type = ReadWriteIOUtils.readByte(inputStream);
+ if (type == 0) {
+ int size = ReadWriteIOUtils.readInt(inputStream);
+ for (int i = 0; i < size; i++) {
+ storageGroupList.add(ReadWriteIOUtils.readString(inputStream));
+ }
+ } else if (type == 1) {
+ resultSchemaTree.mergeSchemaTree(SchemaTree.deserialize(inputStream));
+ } else {
+ throw new RuntimeException(
+ new MetadataException("Failed to fetch schema because of unrecognized data"));
+ }
+ } catch (IOException e) {
+ // Totally memory operation. This case won't happen.
+ }
+ }
+
@Override
public SchemaTree fetchSchemaWithAutoCreate(
PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, boolean isAligned) {
@@ -161,20 +160,14 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
return schemaTree;
}
- SchemaTree remoteSchemaTree;
+ SchemaTree remoteSchemaTree = fetchSchema(patternTree);
+ schemaTree.mergeSchemaTree(remoteSchemaTree);
+ schemaCache.put(remoteSchemaTree);
if (!config.isAutoCreateSchemaEnabled()) {
- remoteSchemaTree = fetchSchema(patternTree, partitionFetcher.getSchemaPartition(patternTree));
- schemaTree.mergeSchemaTree(remoteSchemaTree);
- schemaCache.put(remoteSchemaTree);
return schemaTree;
}
- remoteSchemaTree =
- fetchSchema(patternTree, partitionFetcher.getOrCreateSchemaPartition(patternTree));
- schemaTree.mergeSchemaTree(remoteSchemaTree);
- schemaCache.put(remoteSchemaTree);
-
SchemaTree missingSchemaTree =
checkAndAutoCreateMissingMeasurements(
remoteSchemaTree,
@@ -216,20 +209,14 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
return schemaTree;
}
- SchemaTree remoteSchemaTree;
+ SchemaTree remoteSchemaTree = fetchSchema(patternTree);
+ schemaTree.mergeSchemaTree(remoteSchemaTree);
+ schemaCache.put(remoteSchemaTree);
if (!config.isAutoCreateSchemaEnabled()) {
- remoteSchemaTree = fetchSchema(patternTree, partitionFetcher.getSchemaPartition(patternTree));
- schemaTree.mergeSchemaTree(remoteSchemaTree);
- schemaCache.put(remoteSchemaTree);
return schemaTree;
}
- remoteSchemaTree =
- fetchSchema(patternTree, partitionFetcher.getOrCreateSchemaPartition(patternTree));
- schemaTree.mergeSchemaTree(remoteSchemaTree);
- schemaCache.put(remoteSchemaTree);
-
SchemaTree missingSchemaTree;
for (int i = 0; i < devicePathList.size(); i++) {
missingSchemaTree =
@@ -336,7 +323,8 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
InternalCreateTimeSeriesStatement statement) {
long queryId = SessionManager.getInstance().requestQueryId(false);
ExecutionResult executionResult =
- coordinator.execute(statement, queryId, null, "", partitionFetcher, this);
+ coordinator.execute(
+ statement, queryId, null, "", ClusterPartitionFetcher.getInstance(), this);
// TODO: throw exception
int statusCode = executionResult.status.getCode();
if (statusCode == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
index 286a615d06..e2dfe00b0c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.mpp.plan.analyze;
-import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
@@ -43,11 +42,6 @@ public class FakeSchemaFetcherImpl implements ISchemaFetcher {
return schemaTree;
}
- @Override
- public SchemaTree fetchSchema(PathPatternTree patternTree, SchemaPartition schemaPartition) {
- return null;
- }
-
@Override
public SchemaTree fetchSchemaWithAutoCreate(
PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, boolean aligned) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java
index 182222ebeb..44a583fb9c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.mpp.plan.analyze;
-import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
@@ -34,8 +33,6 @@ public interface ISchemaFetcher {
SchemaTree fetchSchema(PathPatternTree patternTree);
- SchemaTree fetchSchema(PathPatternTree patternTree, SchemaPartition schemaPartition);
-
SchemaTree fetchSchemaWithAutoCreate(
PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, boolean aligned);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
index 2d190a79c3..dbe9f08e51 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.db.mpp.plan.analyze;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.exception.MetadataException;
-import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -83,11 +82,6 @@ public class StandaloneSchemaFetcher implements ISchemaFetcher {
return schemaTree;
}
- @Override
- public SchemaTree fetchSchema(PathPatternTree patternTree, SchemaPartition schemaPartition) {
- return fetchSchema(patternTree);
- }
-
@Override
public SchemaTree fetchSchemaWithAutoCreate(
PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, boolean aligned) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 2cddb29cc5..fa1cc2fb11 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -1160,7 +1160,7 @@ public class LocalExecutionPlanner {
node.getPlanNodeId(),
SchemaFetchMergeOperator.class.getSimpleName());
context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
- return new SchemaFetchMergeOperator(operatorContext, children);
+ return new SchemaFetchMergeOperator(operatorContext, children, node.getStorageGroupList());
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index 8b4c3c39f1..b65d87b4ea 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -701,8 +701,8 @@ public class LogicalPlanBuilder {
return this;
}
- public LogicalPlanBuilder planSchemaFetchMerge() {
- this.root = new SchemaFetchMergeNode(context.getQueryId().genPlanNodeId());
+ public LogicalPlanBuilder planSchemaFetchMerge(List<String> storageGroupList) {
+ this.root = new SchemaFetchMergeNode(context.getQueryId().genPlanNodeId(), storageGroupList);
return this;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
index 1575c5d635..38044f2fe2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
@@ -634,12 +634,11 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
public PlanNode visitSchemaFetch(
SchemaFetchStatement schemaFetchStatement, MPPQueryContext context) {
LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context);
+ List<String> storageGroupList =
+ new ArrayList<>(analysis.getSchemaPartitionInfo().getSchemaPartitionMap().keySet());
return planBuilder
- .planSchemaFetchMerge()
- .planSchemaFetchSource(
- new ArrayList<>(
- schemaFetchStatement.getSchemaPartition().getSchemaPartitionMap().keySet()),
- schemaFetchStatement.getPatternTree())
+ .planSchemaFetchMerge(storageGroupList)
+ .planSchemaFetchSource(storageGroupList, schemaFetchStatement.getPatternTree())
.getRoot();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaFetchMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaFetchMergeNode.java
index 8d1b999ef9..be235cf920 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaFetchMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaFetchMergeNode.java
@@ -23,21 +23,35 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
/** This class defines the scanned result merge task of schema fetcher. */
public class SchemaFetchMergeNode extends AbstractSchemaMergeNode {
- public SchemaFetchMergeNode(PlanNodeId id) {
+ private List<String> storageGroupList;
+
+ public SchemaFetchMergeNode(PlanNodeId id, List<String> storageGroupList) {
super(id);
+ this.storageGroupList = storageGroupList;
+ }
+
+ public List<String> getStorageGroupList() {
+ return storageGroupList;
+ }
+
+ public void setStorageGroupList(List<String> storageGroupList) {
+ this.storageGroupList = storageGroupList;
}
@Override
public PlanNode clone() {
- return new SchemaFetchMergeNode(getPlanNodeId());
+ return new SchemaFetchMergeNode(getPlanNodeId(), storageGroupList);
}
@Override
@@ -48,11 +62,20 @@ public class SchemaFetchMergeNode extends AbstractSchemaMergeNode {
@Override
protected void serializeAttributes(DataOutputStream stream) throws IOException {
PlanNodeType.SCHEMA_FETCH_MERGE.serialize(stream);
+ ReadWriteIOUtils.write(storageGroupList.size(), stream);
+ for (String storageGroup : storageGroupList) {
+ ReadWriteIOUtils.write(storageGroup, stream);
+ }
}
public static PlanNode deserialize(ByteBuffer byteBuffer) {
+ int size = ReadWriteIOUtils.readInt(byteBuffer);
+ List<String> storageGroupList = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ storageGroupList.add(ReadWriteIOUtils.readString(byteBuffer));
+ }
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
- return new SchemaFetchMergeNode(planNodeId);
+ return new SchemaFetchMergeNode(planNodeId, storageGroupList);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/internal/SchemaFetchStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/internal/SchemaFetchStatement.java
index fde6a01a08..92eca3fa58 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/internal/SchemaFetchStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/internal/SchemaFetchStatement.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.mpp.plan.statement.internal;
-import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.mpp.plan.constant.StatementType;
@@ -30,9 +29,7 @@ import java.util.List;
public class SchemaFetchStatement extends Statement {
- private PathPatternTree patternTree;
-
- private SchemaPartition schemaPartition;
+ private final PathPatternTree patternTree;
public SchemaFetchStatement(PathPatternTree patternTree) {
super();
@@ -44,14 +41,6 @@ public class SchemaFetchStatement extends Statement {
return patternTree;
}
- public SchemaPartition getSchemaPartition() {
- return schemaPartition;
- }
-
- public void setSchemaPartition(SchemaPartition schemaPartition) {
- this.schemaPartition = schemaPartition;
- }
-
@Override
public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
return visitor.visitSchemaFetch(this, context);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperatorTest.java
index 7a2ce68018..724efd2b98 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperatorTest.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
@@ -44,6 +45,7 @@ import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayInputStream;
+import java.io.InputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -80,7 +82,9 @@ public class SchemaFetchScanOperatorTest {
Assert.assertFalse(schemaFetchScanOperator.hasNext());
Binary binary = tsBlock.getColumn(0).getBinary(0);
- SchemaTree schemaTree = SchemaTree.deserialize(new ByteArrayInputStream(binary.getValues()));
+ InputStream inputStream = new ByteArrayInputStream(binary.getValues());
+ Assert.assertEquals(1, ReadWriteIOUtils.readByte(inputStream));
+ SchemaTree schemaTree = SchemaTree.deserialize(inputStream);
DeviceSchemaInfo deviceSchemaInfo =
schemaTree.searchDeviceSchemaInfo(