You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/25 10:37:28 UTC
[iotdb] branch master updated: Support set storage group in standalone mpp mode (#5655)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a7b0f198af Support set storage group in standalone mpp mode (#5655)
a7b0f198af is described below
commit a7b0f198afd514cbd8128d0f9d2beda9fffe661a
Author: Haonan <hh...@outlook.com>
AuthorDate: Mon Apr 25 18:37:22 2022 +0800
Support set storage group in standalone mpp mode (#5655)
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 ++
.../LocalConfigNode.java | 73 +++++++++---
.../localconfignode/LocalDataPartitionTable.java | 126 +++++++++++++++++++++
.../LocalSchemaPartitionTable.java | 2 +-
.../iotdb/db/metadata/LocalSchemaProcessor.java | 1 +
.../iotdb/db/mpp/execution/QueryExecution.java | 26 +++--
.../mpp/execution/config/SetStorageGroupTask.java | 62 ++++++----
.../execution/scheduler/StandaloneScheduler.java | 57 +++++++++-
.../sql/analyze/StandalonePartitionFetcher.java | 67 ++++++++++-
.../mpp/sql/analyze/StandaloneSchemaFetcher.java | 38 ++++++-
.../java/org/apache/iotdb/db/service/DataNode.java | 1 +
.../java/org/apache/iotdb/db/service/IoTDB.java | 2 +-
.../thrift/impl/DataNodeTSIServiceImpl.java | 19 +++-
.../operator/schema/SchemaScanOperatorTest.java | 2 +-
.../iotdb/db/service/InternalServiceImplTest.java | 2 +-
15 files changed, 430 insertions(+), 59 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 1ee1d804fd..d1bdc6d3b9 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -496,6 +496,9 @@ public class IoTDBConfig {
/** indicate whether current mode is mpp */
private boolean mppMode = false;
+ /** indicate whether current mode is cluster */
+ private boolean isClusterMode = false;
+
/** Replace implementation class of influxdb protocol service */
private String influxdbImplClassName = InfluxDBServiceImpl.class.getName();
@@ -2804,4 +2807,12 @@ public class IoTDBConfig {
public void setMppMode(boolean mppMode) {
this.mppMode = mppMode;
}
+
+ public boolean isClusterMode() {
+ return isClusterMode;
+ }
+
+ public void setClusterMode(boolean isClusterMode) {
+ this.isClusterMode = isClusterMode;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigNode.java b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
similarity index 90%
rename from server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigNode.java
rename to server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
index 8df5935f77..2ea72f4711 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
@@ -17,19 +17,24 @@
* under the License.
*/
-package org.apache.iotdb.db.metadata;
+package org.apache.iotdb.db.localconfignode;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngineV2;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.engine.storagegroup.DataRegion;
+import org.apache.iotdb.db.exception.DataRegionException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.metadata.template.UndefinedTemplateException;
+import org.apache.iotdb.db.metadata.LocalSchemaProcessor;
import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.metadata.rescon.SchemaResourceManager;
@@ -87,7 +92,11 @@ public class LocalConfigNode {
StorageGroupSchemaManager.getInstance();
private TemplateManager templateManager = TemplateManager.getInstance();
private SchemaEngine schemaEngine = SchemaEngine.getInstance();
- private LocalSchemaPartitionTable partitionTable = LocalSchemaPartitionTable.getInstance();
+ private LocalSchemaPartitionTable schemaPartitionTable = LocalSchemaPartitionTable.getInstance();
+
+ private StorageEngineV2 storageEngine = StorageEngineV2.getInstance();
+
+ private LocalDataPartitionTable dataPartitionTable = LocalDataPartitionTable.getInstance();
private LocalConfigNode() {
String schemaDir = config.getSchemaDir();
@@ -127,7 +136,8 @@ public class LocalConfigNode {
storageGroupSchemaManager.init();
Map<PartialPath, List<SchemaRegionId>> recoveredLocalSchemaRegionInfo = schemaEngine.init();
- partitionTable.init(recoveredLocalSchemaRegionInfo);
+ schemaPartitionTable.init(recoveredLocalSchemaRegionInfo);
+ dataPartitionTable.init(null);
if (config.getSyncMlogPeriodInMs() != 0) {
timedForceMLogThread =
@@ -160,7 +170,7 @@ public class LocalConfigNode {
timedForceMLogThread = null;
}
- partitionTable.clear();
+ schemaPartitionTable.clear();
schemaEngine.clear();
storageGroupSchemaManager.clear();
templateManager.clear();
@@ -195,7 +205,7 @@ public class LocalConfigNode {
*/
public void setStorageGroup(PartialPath storageGroup) throws MetadataException {
storageGroupSchemaManager.setStorageGroup(storageGroup);
- for (SchemaRegionId schemaRegionId : partitionTable.setStorageGroup(storageGroup)) {
+ for (SchemaRegionId schemaRegionId : schemaPartitionTable.setStorageGroup(storageGroup)) {
schemaEngine.createSchemaRegion(storageGroup, schemaRegionId);
}
@@ -206,11 +216,22 @@ public class LocalConfigNode {
if (!config.isEnableMemControl()) {
MemTableManager.getInstance().addOrDeleteStorageGroup(1);
}
+
+ if (config.isMppMode() && !config.isClusterMode()) {
+ for (DataRegionId dataRegionId : dataPartitionTable.setStorageGroup(storageGroup)) {
+ try {
+ storageEngine.createDataRegion(dataRegionId, storageGroup.getFullPath(), Long.MAX_VALUE);
+ } catch (DataRegionException e) {
+ // TODO (Fix exception type)
+ throw new MetadataException(e);
+ }
+ }
+ }
}
public void deleteStorageGroup(PartialPath storageGroup) throws MetadataException {
deleteSchemaRegionsInStorageGroup(
- storageGroup, partitionTable.getSchemaRegionIdsByStorageGroup(storageGroup));
+ storageGroup, schemaPartitionTable.getSchemaRegionIdsByStorageGroup(storageGroup));
for (Template template : templateManager.getTemplateMap().values()) {
templateManager.unmarkStorageGroup(template, storageGroup.getFullPath());
@@ -224,7 +245,7 @@ public class LocalConfigNode {
MemTableManager.getInstance().addOrDeleteStorageGroup(-1);
}
- partitionTable.deleteStorageGroup(storageGroup);
+ schemaPartitionTable.deleteStorageGroup(storageGroup);
// delete storage group after all related resources have been cleared
storageGroupSchemaManager.deleteStorageGroup(storageGroup);
@@ -510,17 +531,17 @@ public class LocalConfigNode {
*/
public SchemaRegionId getBelongedSchemaRegionId(PartialPath path) throws MetadataException {
PartialPath storageGroup = storageGroupSchemaManager.getBelongedStorageGroup(path);
- return partitionTable.getSchemaRegionId(storageGroup, path);
+ return schemaPartitionTable.getSchemaRegionId(storageGroup, path);
}
// This interface involves storage group and schema region auto creation
public SchemaRegionId getBelongedSchemaRegionIdWithAutoCreate(PartialPath path)
throws MetadataException {
PartialPath storageGroup = ensureStorageGroup(path);
- SchemaRegionId schemaRegionId = partitionTable.getSchemaRegionId(storageGroup, path);
+ SchemaRegionId schemaRegionId = schemaPartitionTable.getSchemaRegionId(storageGroup, path);
if (schemaRegionId == null) {
- partitionTable.setStorageGroup(storageGroup);
- schemaRegionId = partitionTable.getSchemaRegionId(storageGroup, path);
+ schemaPartitionTable.setStorageGroup(storageGroup);
+ schemaRegionId = schemaPartitionTable.getSchemaRegionId(storageGroup, path);
}
ISchemaRegion schemaRegion = schemaEngine.getSchemaRegion(schemaRegionId);
if (schemaRegion == null) {
@@ -541,14 +562,15 @@ public class LocalConfigNode {
for (PartialPath storageGroup :
storageGroupSchemaManager.getInvolvedStorageGroups(pathPattern, isPrefixMatch)) {
result.addAll(
- partitionTable.getInvolvedSchemaRegionIds(storageGroup, pathPattern, isPrefixMatch));
+ schemaPartitionTable.getInvolvedSchemaRegionIds(
+ storageGroup, pathPattern, isPrefixMatch));
}
return result;
}
public List<SchemaRegionId> getSchemaRegionIdsByStorageGroup(PartialPath storageGroup)
throws MetadataException {
- return partitionTable.getSchemaRegionIdsByStorageGroup(storageGroup);
+ return schemaPartitionTable.getSchemaRegionIdsByStorageGroup(storageGroup);
}
// endregion
@@ -734,4 +756,29 @@ public class LocalConfigNode {
}
// endregion
+
+ // region Interfaces for DataRegionId Management
+ /**
+ * Get the target DataRegionIds, which the given path belongs to. The path must be a fullPath
+ * without wildcards, * or **. This method is the first step when there's a task on one certain
+ * path, e.g., root.sg1 is a storage group and path = root.sg1.d1, return DataRegionId of
+ * root.sg1. If there's no storage group on the given path, StorageGroupNotSetException will be
+ * thrown.
+ */
+ public DataRegionId getBelongedDataRegionRegionId(PartialPath path)
+ throws MetadataException, DataRegionException {
+ PartialPath storageGroup = storageGroupSchemaManager.getBelongedStorageGroup(path);
+ DataRegionId dataRegionId = dataPartitionTable.getDataRegionId(storageGroup, path);
+ DataRegion dataRegion = storageEngine.getDataRegion(dataRegionId);
+ if (dataRegion == null) {
+ storageEngine.createDataRegion(dataRegionId, storageGroup.getFullPath(), Long.MAX_VALUE);
+ }
+ return dataPartitionTable.getDataRegionId(storageGroup, path);
+ }
+
+ public List<DataRegionId> getDataRegionIdsByStorageGroup(PartialPath storageGroup) {
+ return dataPartitionTable.getDataRegionIdsByStorageGroup(storageGroup);
+ }
+
+ // endregion
}
diff --git a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalDataPartitionTable.java b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalDataPartitionTable.java
new file mode 100644
index 0000000000..0cc9e81d6d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalDataPartitionTable.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.localconfignode;
+
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+// This class is used for data partition maintaining the map between storage group and
+// dataRegionIds.
+public class LocalDataPartitionTable {
+
+ private AtomicInteger dataRegionIdGenerator;
+
+ private Map<PartialPath, List<DataRegionId>> table;
+
+ private static class LocalDataPartitionTableHolder {
+ private static final LocalDataPartitionTable INSTANCE = new LocalDataPartitionTable();
+
+ private LocalDataPartitionTableHolder() {};
+ }
+
+ private LocalDataPartitionTable() {}
+
+ public static LocalDataPartitionTable getInstance() {
+ return LocalDataPartitionTableHolder.INSTANCE;
+ }
+
+ public synchronized void init(Map<PartialPath, List<DataRegionId>> recoveredLocalDataRegionInfo) {
+ table = new ConcurrentHashMap<>();
+ dataRegionIdGenerator = new AtomicInteger(0);
+ // TODO:(recovery)
+ }
+
+ public synchronized void clear() {
+ if (table != null) {
+ table.clear();
+ table = null;
+ }
+
+ if (dataRegionIdGenerator != null) {
+ dataRegionIdGenerator = null;
+ }
+ }
+
+ public synchronized DataRegionId allocateDataRegionId(PartialPath storageGroup) {
+ DataRegionId dataRegionId = new DataRegionId(dataRegionIdGenerator.getAndIncrement());
+ table.get(storageGroup).add(dataRegionId);
+ return dataRegionId;
+ }
+
+ public synchronized void putDataRegionId(PartialPath storageGroup, DataRegionId dataRegionId) {
+ table.get(storageGroup).add(dataRegionId);
+
+ if (dataRegionId.getId() >= dataRegionIdGenerator.get()) {
+ dataRegionIdGenerator.set(dataRegionId.getId() + 1);
+ }
+ }
+
+ public synchronized void removeDataRegionId(PartialPath storageGroup, DataRegionId dataRegionId) {
+ table.get(storageGroup).remove(dataRegionId);
+ }
+
+ public DataRegionId getDataRegionId(PartialPath storageGroup, PartialPath path) {
+ return calculateDataRegionId(storageGroup, path);
+ }
+
+ public List<DataRegionId> getInvolvedDataRegionIds(
+ PartialPath storageGroup, PartialPath pathPattern, boolean isPrefixMatch) {
+ List<DataRegionId> result = new ArrayList<>();
+ if (table.containsKey(storageGroup)) {
+ result.addAll(table.get(storageGroup));
+ }
+ return result;
+ }
+
+ public List<DataRegionId> getDataRegionIdsByStorageGroup(PartialPath storageGroup) {
+ return new ArrayList<>(table.get(storageGroup));
+ }
+
+ public synchronized List<DataRegionId> setStorageGroup(PartialPath storageGroup) {
+ if (table.containsKey(storageGroup)) {
+ return table.get(storageGroup);
+ }
+ List<DataRegionId> dataRegionIdList = new CopyOnWriteArrayList<>();
+ dataRegionIdList.add(new DataRegionId(dataRegionIdGenerator.getAndIncrement()));
+ table.put(storageGroup, dataRegionIdList);
+ return dataRegionIdList;
+ }
+
+ public synchronized List<DataRegionId> deleteStorageGroup(PartialPath storageGroup) {
+ return table.remove(storageGroup);
+ }
+
+ // This method may be extended to implement multi dataRegion for one storageGroup
+ // todo keep consistent with the partition method of config node in new cluster
+ private DataRegionId calculateDataRegionId(PartialPath storageGroup, PartialPath path) {
+ if (!table.containsKey(storageGroup)) {
+ setStorageGroup(storageGroup);
+ }
+ return table.get(storageGroup).iterator().next();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaPartitionTable.java b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalSchemaPartitionTable.java
similarity index 98%
rename from server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaPartitionTable.java
rename to server/src/main/java/org/apache/iotdb/db/localconfignode/LocalSchemaPartitionTable.java
index c0c65c2ebe..df61980759 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaPartitionTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalSchemaPartitionTable.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.metadata;
+package org.apache.iotdb.db.localconfignode;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.db.exception.metadata.MetadataException;
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
index 7d82d3167b..5a3622b1d3 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.metadata.template.UndefinedTemplateException;
+import org.apache.iotdb.db.localconfignode.LocalConfigNode;
import org.apache.iotdb.db.metadata.lastCache.LastCacheManager;
import org.apache.iotdb.db.metadata.mnode.IMNode;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 0df9e2de4d..67b4be045c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.execution;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.buffer.DataBlockService;
import org.apache.iotdb.db.mpp.buffer.ISourceHandle;
@@ -26,6 +27,7 @@ import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
import org.apache.iotdb.db.mpp.execution.scheduler.ClusterScheduler;
import org.apache.iotdb.db.mpp.execution.scheduler.IScheduler;
+import org.apache.iotdb.db.mpp.execution.scheduler.StandaloneScheduler;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.analyze.Analyzer;
import org.apache.iotdb.db.mpp.sql.analyze.IPartitionFetcher;
@@ -65,6 +67,8 @@ import static com.google.common.base.Throwables.throwIfUnchecked;
public class QueryExecution implements IQueryExecution {
private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class);
+ private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
private final MPPQueryContext context;
private IScheduler scheduler;
private final QueryStateMachine stateMachine;
@@ -142,13 +146,21 @@ public class QueryExecution implements IQueryExecution {
private void schedule() {
// TODO: (xingtanzjr) initialize the query scheduler according to configuration
this.scheduler =
- new ClusterScheduler(
- context,
- stateMachine,
- distributedPlan.getInstances(),
- context.getQueryType(),
- executor,
- scheduledExecutor);
+ config.isClusterMode()
+ ? new ClusterScheduler(
+ context,
+ stateMachine,
+ distributedPlan.getInstances(),
+ context.getQueryType(),
+ executor,
+ scheduledExecutor)
+ : new StandaloneScheduler(
+ context,
+ stateMachine,
+ distributedPlan.getInstances(),
+ context.getQueryType(),
+ executor,
+ scheduledExecutor);
this.scheduler.start();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/SetStorageGroupTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/SetStorageGroupTask.java
index a28c7924c4..81eae020fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/SetStorageGroupTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/SetStorageGroupTask.java
@@ -24,6 +24,10 @@ import org.apache.iotdb.commons.exception.BadNodeUrlException;
import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.localconfignode.LocalConfigNode;
import org.apache.iotdb.db.mpp.sql.statement.metadata.SetStorageGroupStatement;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
@@ -36,6 +40,9 @@ import org.slf4j.LoggerFactory;
public class SetStorageGroupTask implements IConfigTask {
private static final Logger LOGGER = LoggerFactory.getLogger(SetStorageGroupTask.class);
+
+ private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
private final SetStorageGroupStatement setStorageGroupStatement;
public SetStorageGroupTask(SetStorageGroupStatement setStorageGroupStatement) {
@@ -45,31 +52,40 @@ public class SetStorageGroupTask implements IConfigTask {
@Override
public ListenableFuture<ConfigTaskResult> execute() {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
- // Construct request using statement
- TStorageGroupSchema storageGroupSchema = new TStorageGroupSchema();
- storageGroupSchema.setName(setStorageGroupStatement.getStorageGroupPath().getFullPath());
- TSetStorageGroupReq req = new TSetStorageGroupReq(storageGroupSchema);
-
- ConfigNodeClient configNodeClient = null;
- try {
- configNodeClient = new ConfigNodeClient();
- // Send request to some API server
- TSStatus tsStatus = configNodeClient.setStorageGroup(req);
- // Get response or throw exception
- if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
- LOGGER.error(
- "Failed to execute set storage group {} in config node, status is {}.",
- setStorageGroupStatement.getStorageGroupPath(),
- tsStatus);
- future.setException(new StatementExecutionException(tsStatus));
- } else {
- future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ // TODO:(this judgement needs to be integrated in a high level framework)
+ if (config.isClusterMode()) {
+ // Construct request using statement
+ TStorageGroupSchema storageGroupSchema = new TStorageGroupSchema();
+ storageGroupSchema.setName(setStorageGroupStatement.getStorageGroupPath().getFullPath());
+ TSetStorageGroupReq req = new TSetStorageGroupReq(storageGroupSchema);
+ ConfigNodeClient configNodeClient = null;
+ try {
+ configNodeClient = new ConfigNodeClient();
+ // Send request to some API server
+ TSStatus tsStatus = configNodeClient.setStorageGroup(req);
+ // Get response or throw exception
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
+ LOGGER.error(
+ "Failed to execute set storage group {} in config node, status is {}.",
+ setStorageGroupStatement.getStorageGroupPath(),
+ tsStatus);
+ future.setException(new StatementExecutionException(tsStatus));
+ } else {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ }
+ } catch (IoTDBConnectionException | BadNodeUrlException e) {
+ LOGGER.error("Failed to connect to config node.");
+ future.setException(e);
}
- } catch (IoTDBConnectionException | BadNodeUrlException e) {
- LOGGER.error("Failed to connect to config node.");
- future.setException(e);
+ } else {
+ try {
+ LocalConfigNode.getInstance()
+ .setStorageGroup(setStorageGroupStatement.getStorageGroupPath());
+ } catch (MetadataException e) {
+ future.setException(e);
+ }
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
-
// If the action is executed successfully, return the Future.
// If your operation is async, you can return the corresponding future directly.
return future;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
index 1081b7359c..a1c31c99ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
@@ -18,22 +18,73 @@
*/
package org.apache.iotdb.db.mpp.execution.scheduler;
-import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.StorageEngineV2;
import org.apache.iotdb.db.metadata.LocalSchemaProcessor;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.execution.FragmentInfo;
+import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
+import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
import io.airlift.units.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
public class StandaloneScheduler implements IScheduler {
- private static final StorageEngine STORAGE_ENGINE = StorageEngine.getInstance();
+ private static final StorageEngineV2 STORAGE_ENGINE = StorageEngineV2.getInstance();
private static final LocalSchemaProcessor SCHEMA_ENGINE = LocalSchemaProcessor.getInstance();
+ private static final Logger LOGGER = LoggerFactory.getLogger(ClusterScheduler.class);
+
+ private MPPQueryContext queryContext;
+ // The stateMachine of the QueryExecution owned by this QueryScheduler
+ private QueryStateMachine stateMachine;
+ private QueryType queryType;
+ // The fragment instances which should be sent to corresponding Nodes.
+ private List<FragmentInstance> instances;
+
+ private ExecutorService executor;
+ private ScheduledExecutorService scheduledExecutor;
+
+ private IFragInstanceDispatcher dispatcher;
+ private IFragInstanceStateTracker stateTracker;
+ private IQueryTerminator queryTerminator;
+
+ public StandaloneScheduler(
+ MPPQueryContext queryContext,
+ QueryStateMachine stateMachine,
+ List<FragmentInstance> instances,
+ QueryType queryType,
+ ExecutorService executor,
+ ScheduledExecutorService scheduledExecutor) {
+ this.queryContext = queryContext;
+ this.instances = instances;
+ this.queryType = queryType;
+ this.executor = executor;
+ this.scheduledExecutor = scheduledExecutor;
+ this.stateTracker =
+ new FixedRateFragInsStateTracker(stateMachine, executor, scheduledExecutor, instances);
+ this.queryTerminator =
+ new SimpleQueryTerminator(executor, queryContext.getQueryId(), instances);
+ }
+
@Override
- public void start() {}
+ public void start() {
+ // For the FragmentInstance of WRITE, it will be executed directly when dispatching.
+ // TODO: Other QueryTypes
+ if (queryType == QueryType.WRITE) {
+
+ return;
+ }
+ }
@Override
public void stop() {}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandalonePartitionFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandalonePartitionFetcher.java
index 7fbdb00bf4..ceacc51b6c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandalonePartitionFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandalonePartitionFetcher.java
@@ -18,21 +18,41 @@
*/
package org.apache.iotdb.db.mpp.sql.analyze;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.partition.SchemaPartition;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngineV2;
+import org.apache.iotdb.db.exception.DataRegionException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
+import org.apache.iotdb.db.localconfignode.LocalConfigNode;
+import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class StandalonePartitionFetcher implements IPartitionFetcher {
- private StandalonePartitionFetcher() {}
+ private final LocalConfigNode localConfigNode = LocalConfigNode.getInstance();
+ private final StorageEngineV2 storageEngine = StorageEngineV2.getInstance();
+
+ private static final class StandalonePartitionFetcherHolder {
+ private static final StandalonePartitionFetcher INSTANCE = new StandalonePartitionFetcher();
+
+ private StandalonePartitionFetcherHolder() {}
+ }
- // TODO need to use safe singleton pattern
public static StandalonePartitionFetcher getInstance() {
- return new StandalonePartitionFetcher();
+ return StandalonePartitionFetcher.StandalonePartitionFetcherHolder.INSTANCE;
}
@Override
@@ -48,12 +68,49 @@ public class StandalonePartitionFetcher implements IPartitionFetcher {
@Override
public DataPartition getDataPartition(
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
- return null;
+ try {
+ Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
+ dataPartitionMap = new HashMap<>();
+ for (Map.Entry<String, List<DataPartitionQueryParam>> sgEntry :
+ sgNameToQueryParamsMap.entrySet()) {
+ // for each sg
+ String storageGroupName = sgEntry.getKey();
+ List<DataPartitionQueryParam> dataPartitionQueryParams = sgEntry.getValue();
+ Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>
+ deviceToRegionsMap = new HashMap<>();
+ for (DataPartitionQueryParam dataPartitionQueryParam : dataPartitionQueryParams) {
+ // for each device
+ String deviceId = dataPartitionQueryParam.getDevicePath();
+ DataRegionId dataRegionId =
+ localConfigNode.getBelongedDataRegionRegionId(new PartialPath(deviceId));
+ Map<TTimePartitionSlot, List<TRegionReplicaSet>> timePartitionToRegionsMap =
+ new HashMap<>();
+ for (TTimePartitionSlot timePartitionSlot :
+ dataPartitionQueryParam.getTimePartitionSlotList()) {
+ // for each time partition
+ timePartitionToRegionsMap.put(
+ timePartitionSlot,
+ Collections.singletonList(
+ new TRegionReplicaSet(
+ new TConsensusGroupId(dataRegionId.getType(), dataRegionId.getId()),
+ Collections.EMPTY_LIST)));
+ }
+ deviceToRegionsMap.put(new TSeriesPartitionSlot(), timePartitionToRegionsMap);
+ }
+ dataPartitionMap.put(storageGroupName, deviceToRegionsMap);
+ }
+ return new DataPartition(
+ dataPartitionMap,
+ IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
+ IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
+ } catch (MetadataException | DataRegionException e) {
+ throw new StatementAnalyzeException("An error occurred when executing getDataPartition()");
+ }
}
@Override
public DataPartition getOrCreateDataPartition(
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
- return null;
+ return getDataPartition(sgNameToQueryParamsMap);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandaloneSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandaloneSchemaFetcher.java
index e1deb0f9b5..1a43583964 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandaloneSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandaloneSchemaFetcher.java
@@ -18,15 +18,29 @@
*/
package org.apache.iotdb.db.mpp.sql.analyze;
+import org.apache.iotdb.commons.consensus.SchemaRegionId;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.localconfignode.LocalConfigNode;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
+import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
public class StandaloneSchemaFetcher implements ISchemaFetcher {
+ private final LocalConfigNode localConfigNode = LocalConfigNode.getInstance();
+ private final SchemaEngine schemaEngine = SchemaEngine.getInstance();
+
private StandaloneSchemaFetcher() {}
public static StandaloneSchemaFetcher getInstance() {
@@ -35,7 +49,22 @@ public class StandaloneSchemaFetcher implements ISchemaFetcher {
@Override
public SchemaTree fetchSchema(PathPatternTree patternTree) {
- return null;
+ Set<String> storageGroupSet = new HashSet<>();
+ SchemaTree schemaTree = new SchemaTree();
+ List<PartialPath> partialPathList = patternTree.splitToPathList();
+ try {
+ for (PartialPath path : partialPathList) {
+ String storageGroup = localConfigNode.getBelongedStorageGroup(path).getFullPath();
+ storageGroupSet.add(storageGroup);
+ SchemaRegionId schemaRegionId = localConfigNode.getBelongedSchemaRegionId(path);
+ ISchemaRegion schemaRegion = schemaEngine.getSchemaRegion(schemaRegionId);
+ schemaTree.appendMeasurementPaths(schemaRegion.getMeasurementPaths(path, false));
+ }
+ } catch (MetadataException e) {
+ throw new RuntimeException(e);
+ }
+ schemaTree.setStorageGroups(new ArrayList<>(storageGroupSet));
+ return schemaTree;
}
@Override
@@ -50,6 +79,11 @@ public class StandaloneSchemaFetcher implements ISchemaFetcher {
List<String[]> measurements,
List<TSDataType[]> tsDataTypes,
List<Boolean> aligned) {
- return null;
+ Map<PartialPath, List<String>> deviceToMeasurementMap = new HashMap<>();
+ for (int i = 0; i < devicePath.size(); i++) {
+ deviceToMeasurementMap.put(devicePath.get(i), Arrays.asList(measurements.get(i)));
+ }
+ // todo implement auto create schema
+ return fetchSchema(new PathPatternTree(deviceToMeasurementMap));
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 7774b43826..bedd697089 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -170,6 +170,7 @@ public class DataNode implements DataNodeMBean {
public void active() throws StartupException {
// set the mpp mode to true
IoTDBDescriptor.getInstance().getConfig().setMppMode(true);
+ IoTDBDescriptor.getInstance().getConfig().setClusterMode(true);
// start iotdb server first
IoTDB.getInstance().active();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index f281b1f4b5..b0324c3a1a 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -37,7 +37,7 @@ import org.apache.iotdb.db.engine.cq.ContinuousQueryService;
import org.apache.iotdb.db.engine.flush.FlushManager;
import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService;
import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.metadata.LocalConfigNode;
+import org.apache.iotdb.db.localconfignode.LocalConfigNode;
import org.apache.iotdb.db.metadata.LocalSchemaProcessor;
import org.apache.iotdb.db.mpp.buffer.DataBlockService;
import org.apache.iotdb.db.mpp.schedule.FragmentInstanceScheduler;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
index d54841ab0b..78a5d986ce 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.service.thrift.impl;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.OperationType;
import org.apache.iotdb.db.mpp.common.QueryId;
@@ -31,6 +32,8 @@ import org.apache.iotdb.db.mpp.sql.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.mpp.sql.analyze.ClusterSchemaFetcher;
import org.apache.iotdb.db.mpp.sql.analyze.IPartitionFetcher;
import org.apache.iotdb.db.mpp.sql.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.sql.analyze.StandalonePartitionFetcher;
+import org.apache.iotdb.db.mpp.sql.analyze.StandaloneSchemaFetcher;
import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
import org.apache.iotdb.db.mpp.sql.statement.Statement;
import org.apache.iotdb.db.mpp.sql.statement.crud.InsertMultiTabletsStatement;
@@ -107,13 +110,25 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeTSIServiceImpl.class);
+ private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
private static final Coordinator COORDINATOR = Coordinator.getInstance();
private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
- private static final IPartitionFetcher PARTITION_FETCHER = ClusterPartitionFetcher.getInstance();
+ private final IPartitionFetcher PARTITION_FETCHER;
+
+ private final ISchemaFetcher SCHEMA_FETCHER;
- private static final ISchemaFetcher SCHEMA_FETCHER = ClusterSchemaFetcher.getInstance();
+ public DataNodeTSIServiceImpl() {
+ if (config.isClusterMode()) {
+ PARTITION_FETCHER = ClusterPartitionFetcher.getInstance();
+ SCHEMA_FETCHER = ClusterSchemaFetcher.getInstance();
+ } else {
+ PARTITION_FETCHER = StandalonePartitionFetcher.getInstance();
+ SCHEMA_FETCHER = StandaloneSchemaFetcher.getInstance();
+ }
+ }
@Override
public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java
index d4532ff167..6b3874b11d 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.mpp.operator.schema;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.metadata.LocalConfigNode;
+import org.apache.iotdb.db.localconfignode.LocalConfigNode;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
diff --git a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
index 255b73f5a7..20e2ab626b 100644
--- a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
@@ -32,7 +32,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.ConsensusImpl;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.metadata.LocalConfigNode;
+import org.apache.iotdb.db.localconfignode.LocalConfigNode;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.sql.analyze.QueryType;