You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/04/21 10:05:28 UTC
[iotdb] 01/02: init
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch standaloneMPPWrite
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 92a1e2669c071b92d028fc1464aec6718ab52df1
Author: HTHou <hh...@outlook.com>
AuthorDate: Thu Apr 21 17:09:26 2022 +0800
init
---
.../apache/iotdb/db/metadata/LocalConfigNode.java | 60 +++++++---
.../iotdb/db/metadata/LocalDataPartitionTable.java | 124 +++++++++++++++++++++
.../iotdb/db/mpp/execution/QueryExecution.java | 24 ++--
.../execution/scheduler/StandaloneScheduler.java | 56 +++++++++-
.../sql/analyze/StandalonePartitionFetcher.java | 64 ++++++++++-
.../mpp/sql/analyze/StandaloneSchemaFetcher.java | 38 ++++++-
.../java/org/apache/iotdb/db/service/DataNode.java | 1 +
.../thrift/impl/DataNodeTSIServiceImpl.java | 17 ++-
8 files changed, 352 insertions(+), 32 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigNode.java
index 2b2f20433a..3500192207 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigNode.java
@@ -21,10 +21,14 @@ package org.apache.iotdb.db.metadata;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngineV2;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.engine.storagegroup.DataRegion;
+import org.apache.iotdb.db.exception.DataRegionException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
@@ -87,7 +91,11 @@ public class LocalConfigNode {
StorageGroupSchemaManager.getInstance();
private TemplateManager templateManager = TemplateManager.getInstance();
private SchemaEngine schemaEngine = SchemaEngine.getInstance();
- private LocalSchemaPartitionTable partitionTable = LocalSchemaPartitionTable.getInstance();
+ private LocalSchemaPartitionTable schemaPartitionTable = LocalSchemaPartitionTable.getInstance();
+
+ private StorageEngineV2 storageEngine = StorageEngineV2.getInstance();
+
+ private LocalDataPartitionTable dataPartitionTable = LocalDataPartitionTable.getInstance();
private LocalConfigNode() {
String schemaDir = config.getSchemaDir();
@@ -125,7 +133,7 @@ public class LocalConfigNode {
templateManager.init();
storageGroupSchemaManager.init();
- partitionTable.init();
+ schemaPartitionTable.init();
schemaEngine.init();
initSchemaRegion();
@@ -150,7 +158,7 @@ public class LocalConfigNode {
private void initSchemaRegion() throws MetadataException {
for (PartialPath storageGroup : storageGroupSchemaManager.getAllStorageGroupPaths()) {
- partitionTable.setStorageGroup(storageGroup);
+ schemaPartitionTable.setStorageGroup(storageGroup);
File sgDir = new File(config.getSchemaDir(), storageGroup.getFullPath());
@@ -167,7 +175,7 @@ public class LocalConfigNode {
SchemaRegionId schemaRegionId =
new SchemaRegionId(Integer.parseInt(schemaRegionDir.getName()));
schemaEngine.createSchemaRegion(storageGroup, schemaRegionId);
- partitionTable.putSchemaRegionId(storageGroup, schemaRegionId);
+ schemaPartitionTable.putSchemaRegionId(storageGroup, schemaRegionId);
}
}
}
@@ -185,7 +193,7 @@ public class LocalConfigNode {
timedForceMLogThread = null;
}
- partitionTable.clear();
+ schemaPartitionTable.clear();
schemaEngine.clear();
storageGroupSchemaManager.clear();
templateManager.clear();
@@ -220,10 +228,10 @@ public class LocalConfigNode {
*/
public void setStorageGroup(PartialPath storageGroup) throws MetadataException {
storageGroupSchemaManager.setStorageGroup(storageGroup);
- partitionTable.setStorageGroup(storageGroup);
+ schemaPartitionTable.setStorageGroup(storageGroup);
schemaEngine.createSchemaRegion(
- storageGroup, partitionTable.allocateSchemaRegionId(storageGroup));
+ storageGroup, schemaPartitionTable.allocateSchemaRegionId(storageGroup));
if (SchemaSyncManager.getInstance().isEnableSync()) {
SchemaSyncManager.getInstance().syncMetadataPlan(new SetStorageGroupPlan(storageGroup));
}
@@ -235,7 +243,7 @@ public class LocalConfigNode {
public void deleteStorageGroup(PartialPath storageGroup) throws MetadataException {
deleteSchemaRegionsInStorageGroup(
- storageGroup, partitionTable.getSchemaRegionIdsByStorageGroup(storageGroup));
+ storageGroup, schemaPartitionTable.getSchemaRegionIdsByStorageGroup(storageGroup));
for (Template template : templateManager.getTemplateMap().values()) {
templateManager.unmarkStorageGroup(template, storageGroup.getFullPath());
@@ -249,7 +257,7 @@ public class LocalConfigNode {
MemTableManager.getInstance().addOrDeleteStorageGroup(-1);
}
- partitionTable.deleteStorageGroup(storageGroup);
+ schemaPartitionTable.deleteStorageGroup(storageGroup);
// delete storage group after all related resources have been cleared
storageGroupSchemaManager.deleteStorageGroup(storageGroup);
@@ -534,12 +542,12 @@ public class LocalConfigNode {
*/
public SchemaRegionId getBelongedSchemaRegionId(PartialPath path) throws MetadataException {
PartialPath storageGroup = storageGroupSchemaManager.getBelongedStorageGroup(path);
- SchemaRegionId schemaRegionId = partitionTable.getSchemaRegionId(storageGroup, path);
+ SchemaRegionId schemaRegionId = schemaPartitionTable.getSchemaRegionId(storageGroup, path);
ISchemaRegion schemaRegion = schemaEngine.getSchemaRegion(schemaRegionId);
if (schemaRegion == null) {
schemaEngine.createSchemaRegion(storageGroup, schemaRegionId);
}
- return partitionTable.getSchemaRegionId(storageGroup, path);
+ return schemaPartitionTable.getSchemaRegionId(storageGroup, path);
}
// This interface involves storage group auto creation
@@ -561,14 +569,15 @@ public class LocalConfigNode {
for (PartialPath storageGroup :
storageGroupSchemaManager.getInvolvedStorageGroups(pathPattern, isPrefixMatch)) {
result.addAll(
- partitionTable.getInvolvedSchemaRegionIds(storageGroup, pathPattern, isPrefixMatch));
+ schemaPartitionTable.getInvolvedSchemaRegionIds(
+ storageGroup, pathPattern, isPrefixMatch));
}
return result;
}
public List<SchemaRegionId> getSchemaRegionIdsByStorageGroup(PartialPath storageGroup)
throws MetadataException {
- return partitionTable.getSchemaRegionIdsByStorageGroup(storageGroup);
+ return schemaPartitionTable.getSchemaRegionIdsByStorageGroup(storageGroup);
}
// endregion
@@ -754,4 +763,29 @@ public class LocalConfigNode {
}
// endregion
+
+ // region Interfaces for DataRegionId Management
+ /**
+ * Get the target DataRegionIds, which the given path belongs to. The path must be a fullPath
+ * without wildcards, * or **. This method is the first step when there's a task on one certain
+ * path, e.g., root.sg1 is a storage group and path = root.sg1.d1, return DataRegionId of
+ * root.sg1. If there's no storage group on the given path, StorageGroupNotSetException will be
+ * thrown.
+ */
+ public DataRegionId getBelongedDataRegionRegionId(PartialPath path)
+ throws MetadataException, DataRegionException {
+ PartialPath storageGroup = storageGroupSchemaManager.getBelongedStorageGroup(path);
+ DataRegionId dataRegionId = dataPartitionTable.getDataRegionId(storageGroup, path);
+ DataRegion dataRegion = storageEngine.getDataRegion(dataRegionId);
+ if (dataRegion == null) {
+ storageEngine.createDataRegion(dataRegionId, storageGroup.getFullPath(), Long.MAX_VALUE);
+ }
+ return dataPartitionTable.getDataRegionId(storageGroup, path);
+ }
+
+ public List<DataRegionId> getDataRegionIdsByStorageGroup(PartialPath storageGroup) {
+ return dataPartitionTable.getDataRegionIdsByStorageGroup(storageGroup);
+ }
+
+ // endregion
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/LocalDataPartitionTable.java b/server/src/main/java/org/apache/iotdb/db/metadata/LocalDataPartitionTable.java
new file mode 100644
index 0000000000..52fc102437
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/LocalDataPartitionTable.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata;
+
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+// This class is used for data partition maintaining the map between storage group and
+// dataRegionIds.
+public class LocalDataPartitionTable {
+
+ private AtomicInteger dataRegionIdGenerator;
+
+ private Map<PartialPath, Set<DataRegionId>> table;
+
+ private static class LocalDataPartitionTableHolder {
+ private static final LocalDataPartitionTable INSTANCE = new LocalDataPartitionTable();
+
+ private LocalDataPartitionTableHolder() {};
+ }
+
+ private LocalDataPartitionTable() {}
+
+ public static LocalDataPartitionTable getInstance() {
+ return LocalDataPartitionTableHolder.INSTANCE;
+ }
+
+ public synchronized void init() {
+ table = new ConcurrentHashMap<>();
+ dataRegionIdGenerator = new AtomicInteger(0);
+ }
+
+ public synchronized void clear() {
+ if (table != null) {
+ table.clear();
+ table = null;
+ }
+
+ if (dataRegionIdGenerator != null) {
+ dataRegionIdGenerator = null;
+ }
+ }
+
+ public synchronized DataRegionId allocateDataRegionId(PartialPath storageGroup) {
+ DataRegionId dataRegionId = new DataRegionId(dataRegionIdGenerator.getAndIncrement());
+ table.get(storageGroup).add(dataRegionId);
+ return dataRegionId;
+ }
+
+ public synchronized void putDataRegionId(PartialPath storageGroup, DataRegionId dataRegionId) {
+ table.get(storageGroup).add(dataRegionId);
+
+ if (dataRegionId.getId() >= dataRegionIdGenerator.get()) {
+ dataRegionIdGenerator.set(dataRegionId.getId() + 1);
+ }
+ }
+
+ public synchronized void removeDataRegionId(PartialPath storageGroup, DataRegionId dataRegionId) {
+ table.get(storageGroup).remove(dataRegionId);
+ }
+
+ public DataRegionId getDataRegionId(PartialPath storageGroup, PartialPath path) {
+ return calculateDataRegionId(storageGroup, path);
+ }
+
+ public List<DataRegionId> getInvolvedDataRegionIds(
+ PartialPath storageGroup, PartialPath pathPattern, boolean isPrefixMatch) {
+ List<DataRegionId> result = new ArrayList<>();
+ if (table.containsKey(storageGroup)) {
+ result.addAll(table.get(storageGroup));
+ }
+ return result;
+ }
+
+ public List<DataRegionId> getDataRegionIdsByStorageGroup(PartialPath storageGroup) {
+ return new ArrayList<>(table.get(storageGroup));
+ }
+
+ public synchronized void setStorageGroup(PartialPath storageGroup) {
+ if (table.containsKey(storageGroup)) {
+ return;
+ }
+ table.put(storageGroup, Collections.synchronizedSet(new HashSet<>()));
+ }
+
+ public synchronized Set<DataRegionId> deleteStorageGroup(PartialPath storageGroup) {
+ return table.remove(storageGroup);
+ }
+
+ // This method may be extended to implement multi dataRegion for one storageGroup
+ // todo keep consistent with the partition method of config node in new cluster
+ private DataRegionId calculateDataRegionId(PartialPath storageGroup, PartialPath path) {
+ if (!table.containsKey(storageGroup)) {
+ setStorageGroup(storageGroup);
+ }
+ return table.get(storageGroup).iterator().next();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index a9bd605823..6eaf77d939 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
import org.apache.iotdb.db.mpp.execution.scheduler.ClusterScheduler;
import org.apache.iotdb.db.mpp.execution.scheduler.IScheduler;
+import org.apache.iotdb.db.mpp.execution.scheduler.StandaloneScheduler;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.analyze.Analyzer;
import org.apache.iotdb.db.mpp.sql.analyze.IPartitionFetcher;
@@ -37,6 +38,7 @@ import org.apache.iotdb.db.mpp.sql.planner.LogicalPlanner;
import org.apache.iotdb.db.mpp.sql.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.mpp.sql.statement.Statement;
+import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -142,13 +144,21 @@ public class QueryExecution implements IQueryExecution {
private void schedule() {
// TODO: (xingtanzjr) initialize the query scheduler according to configuration
this.scheduler =
- new ClusterScheduler(
- context,
- stateMachine,
- distributedPlan.getInstances(),
- context.getQueryType(),
- executor,
- scheduledExecutor);
+ IoTDB.getInstance().isClusterMode()
+ ? new ClusterScheduler(
+ context,
+ stateMachine,
+ distributedPlan.getInstances(),
+ context.getQueryType(),
+ executor,
+ scheduledExecutor)
+ : new StandaloneScheduler(
+ context,
+ stateMachine,
+ distributedPlan.getInstances(),
+ context.getQueryType(),
+ executor,
+ scheduledExecutor);
this.scheduler.start();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
index 1081b7359c..5abf0490e9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
@@ -18,22 +18,72 @@
*/
package org.apache.iotdb.db.mpp.execution.scheduler;
-import org.apache.iotdb.db.engine.StorageEngine;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.iotdb.db.engine.StorageEngineV2;
import org.apache.iotdb.db.metadata.LocalSchemaProcessor;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.execution.FragmentInfo;
import io.airlift.units.Duration;
+import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
+import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class StandaloneScheduler implements IScheduler {
- private static final StorageEngine STORAGE_ENGINE = StorageEngine.getInstance();
+ private static final StorageEngineV2 STORAGE_ENGINE = StorageEngineV2.getInstance();
private static final LocalSchemaProcessor SCHEMA_ENGINE = LocalSchemaProcessor.getInstance();
+ private static final Logger LOGGER = LoggerFactory.getLogger(ClusterScheduler.class);
+
+ private MPPQueryContext queryContext;
+ // The stateMachine of the QueryExecution owned by this QueryScheduler
+ private QueryStateMachine stateMachine;
+ private QueryType queryType;
+ // The fragment instances which should be sent to corresponding Nodes.
+ private List<FragmentInstance> instances;
+
+ private ExecutorService executor;
+ private ScheduledExecutorService scheduledExecutor;
+
+ private IFragInstanceDispatcher dispatcher;
+ private IFragInstanceStateTracker stateTracker;
+ private IQueryTerminator queryTerminator;
+
+ public StandaloneScheduler(
+ MPPQueryContext queryContext,
+ QueryStateMachine stateMachine,
+ List<FragmentInstance> instances,
+ QueryType queryType,
+ ExecutorService executor,
+ ScheduledExecutorService scheduledExecutor) {
+ this.queryContext = queryContext;
+ this.instances = instances;
+ this.queryType = queryType;
+ this.executor = executor;
+ this.scheduledExecutor = scheduledExecutor;
+ this.stateTracker =
+ new FixedRateFragInsStateTracker(stateMachine, executor, scheduledExecutor, instances);
+ this.queryTerminator =
+ new SimpleQueryTerminator(executor, queryContext.getQueryId(), instances);
+ }
+
@Override
- public void start() {}
+ public void start() {
+ // For the FragmentInstance of WRITE, it will be executed directly when dispatching.
+ // TODO: Other QueryTypes
+ if (queryType == QueryType.WRITE) {
+
+ return;
+ }
+ }
@Override
public void stop() {}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandalonePartitionFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandalonePartitionFetcher.java
index 7fbdb00bf4..c78fc3d8d9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandalonePartitionFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandalonePartitionFetcher.java
@@ -18,21 +18,40 @@
*/
package org.apache.iotdb.db.mpp.sql.analyze;
+import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.commons.partition.SchemaPartition;
+import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
+import org.apache.iotdb.commons.partition.TimePartitionSlot;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngineV2;
+import org.apache.iotdb.db.exception.DataRegionException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
+import org.apache.iotdb.db.metadata.LocalConfigNode;
+import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class StandalonePartitionFetcher implements IPartitionFetcher {
- private StandalonePartitionFetcher() {}
+ private final LocalConfigNode localConfigNode = LocalConfigNode.getInstance();
+ private final StorageEngineV2 storageEngine = StorageEngineV2.getInstance();
+
+ private static final class StandalonePartitionFetcherHolder {
+ private static final StandalonePartitionFetcher INSTANCE = new StandalonePartitionFetcher();
+
+ private StandalonePartitionFetcherHolder() {}
+ }
- // TODO need to use safe singleton pattern
public static StandalonePartitionFetcher getInstance() {
- return new StandalonePartitionFetcher();
+ return StandalonePartitionFetcher.StandalonePartitionFetcherHolder.INSTANCE;
}
@Override
@@ -48,12 +67,47 @@ public class StandalonePartitionFetcher implements IPartitionFetcher {
@Override
public DataPartition getDataPartition(
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
- return null;
+ try {
+ Map<String, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>>
+ dataPartitionMap = new HashMap<>();
+ for (Map.Entry<String, List<DataPartitionQueryParam>> sgEntry :
+ sgNameToQueryParamsMap.entrySet()) {
+ // for each sg
+ String storageGroupName = sgEntry.getKey();
+ List<DataPartitionQueryParam> dataPartitionQueryParams = sgEntry.getValue();
+ Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>
+ deviceToRegionsMap = new HashMap<>();
+ for (DataPartitionQueryParam dataPartitionQueryParam : dataPartitionQueryParams) {
+ // for each device
+ String deviceId = dataPartitionQueryParam.getDevicePath();
+ DataRegionId dataRegionId =
+ localConfigNode.getBelongedDataRegionRegionId(new PartialPath(deviceId));
+ Map<TimePartitionSlot, List<RegionReplicaSet>> timePartitionToRegionsMap =
+ new HashMap<>();
+ for (TimePartitionSlot timePartitionSlot :
+ dataPartitionQueryParam.getTimePartitionSlotList()) {
+ // for each time partition
+ timePartitionToRegionsMap.put(
+ timePartitionSlot,
+ Collections.singletonList(
+ new RegionReplicaSet(dataRegionId, Collections.EMPTY_LIST)));
+ }
+ deviceToRegionsMap.put(new SeriesPartitionSlot(), timePartitionToRegionsMap);
+ }
+ dataPartitionMap.put(storageGroupName, deviceToRegionsMap);
+ }
+ return new DataPartition(
+ dataPartitionMap,
+ IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
+ IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
+ } catch (MetadataException | DataRegionException e) {
+ throw new StatementAnalyzeException("An error occurred when executing getDataPartition()");
+ }
}
@Override
public DataPartition getOrCreateDataPartition(
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
- return null;
+ return getDataPartition(sgNameToQueryParamsMap);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandaloneSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandaloneSchemaFetcher.java
index e1deb0f9b5..81d1702710 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandaloneSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandaloneSchemaFetcher.java
@@ -18,15 +18,29 @@
*/
package org.apache.iotdb.db.mpp.sql.analyze;
+import org.apache.iotdb.commons.consensus.SchemaRegionId;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.LocalConfigNode;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
+import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
public class StandaloneSchemaFetcher implements ISchemaFetcher {
+ private final LocalConfigNode localConfigNode = LocalConfigNode.getInstance();
+ private final SchemaEngine schemaEngine = SchemaEngine.getInstance();
+
private StandaloneSchemaFetcher() {}
public static StandaloneSchemaFetcher getInstance() {
@@ -35,7 +49,22 @@ public class StandaloneSchemaFetcher implements ISchemaFetcher {
@Override
public SchemaTree fetchSchema(PathPatternTree patternTree) {
- return null;
+ Set<String> storageGroupSet = new HashSet<>();
+ SchemaTree schemaTree = new SchemaTree();
+ List<PartialPath> partialPathList = patternTree.splitToPathList();
+ try {
+ for (PartialPath path : partialPathList) {
+ String storageGroup = localConfigNode.getBelongedStorageGroup(path).getFullPath();
+ storageGroupSet.add(storageGroup);
+ SchemaRegionId schemaRegionId = localConfigNode.getBelongedSchemaRegionId(path);
+ ISchemaRegion schemaRegion = schemaEngine.getSchemaRegion(schemaRegionId);
+ schemaTree.appendMeasurementPaths(schemaRegion.getMeasurementPaths(path, false));
+ }
+ } catch (MetadataException e) {
+ throw new RuntimeException(e);
+ }
+ schemaTree.setStorageGroups(new ArrayList<>(storageGroupSet));
+ return schemaTree;
}
@Override
@@ -50,6 +79,11 @@ public class StandaloneSchemaFetcher implements ISchemaFetcher {
List<String[]> measurements,
List<TSDataType[]> tsDataTypes,
List<Boolean> aligned) {
- return null;
+ Map<PartialPath, List<String>> deviceToMeasurementMap = new HashMap<>();
+ for (int i = 0; i < devicePath.size(); i++) {
+ deviceToMeasurementMap.put(devicePath.get(i), Arrays.asList(measurements.get(i)));
+ }
+ // todo implement auto create schema
+ return fetchSchema(new PathPatternTree(deviceToMeasurementMap));
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index f2285abbc3..e06451f7a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -159,6 +159,7 @@ public class DataNode implements DataNodeMBean {
public void active() throws StartupException {
// set the mpp mode to true
IoTDBDescriptor.getInstance().getConfig().setMppMode(true);
+ IoTDB.getInstance().setClusterMode();
// start iotdb server first
IoTDB.getInstance().active();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
index 3de0a28cc2..091548c8d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
@@ -31,6 +31,8 @@ import org.apache.iotdb.db.mpp.sql.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.mpp.sql.analyze.ClusterSchemaFetcher;
import org.apache.iotdb.db.mpp.sql.analyze.IPartitionFetcher;
import org.apache.iotdb.db.mpp.sql.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.sql.analyze.StandalonePartitionFetcher;
+import org.apache.iotdb.db.mpp.sql.analyze.StandaloneSchemaFetcher;
import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
import org.apache.iotdb.db.mpp.sql.statement.Statement;
import org.apache.iotdb.db.mpp.sql.statement.crud.InsertMultiTabletsStatement;
@@ -39,6 +41,7 @@ import org.apache.iotdb.db.mpp.sql.statement.crud.InsertRowsOfOneDeviceStatement
import org.apache.iotdb.db.mpp.sql.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.mpp.sql.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
import org.apache.iotdb.db.service.metrics.MetricsService;
import org.apache.iotdb.db.service.metrics.Operation;
@@ -74,9 +77,19 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
- private static final IPartitionFetcher PARTITION_FETCHER = ClusterPartitionFetcher.getInstance();
+ private final IPartitionFetcher PARTITION_FETCHER;
- private static final ISchemaFetcher SCHEMA_FETCHER = ClusterSchemaFetcher.getInstance();
+ private final ISchemaFetcher SCHEMA_FETCHER;
+
+ public DataNodeTSIServiceImpl() {
+ if (IoTDB.getInstance().isClusterMode()) {
+ PARTITION_FETCHER = ClusterPartitionFetcher.getInstance();
+ SCHEMA_FETCHER = ClusterSchemaFetcher.getInstance();
+ } else {
+ PARTITION_FETCHER = StandalonePartitionFetcher.getInstance();
+ SCHEMA_FETCHER = StandaloneSchemaFetcher.getInstance();
+ }
+ }
@Override
public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException {