You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/25 10:37:28 UTC

[iotdb] branch master updated: Support set storage group in standalone mpp mode (#5655)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a7b0f198af Support set storage group in standalone mpp mode (#5655)
a7b0f198af is described below

commit a7b0f198afd514cbd8128d0f9d2beda9fffe661a
Author: Haonan <hh...@outlook.com>
AuthorDate: Mon Apr 25 18:37:22 2022 +0800

    Support set storage group in standalone mpp mode (#5655)
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  11 ++
 .../LocalConfigNode.java                           |  73 +++++++++---
 .../localconfignode/LocalDataPartitionTable.java   | 126 +++++++++++++++++++++
 .../LocalSchemaPartitionTable.java                 |   2 +-
 .../iotdb/db/metadata/LocalSchemaProcessor.java    |   1 +
 .../iotdb/db/mpp/execution/QueryExecution.java     |  26 +++--
 .../mpp/execution/config/SetStorageGroupTask.java  |  62 ++++++----
 .../execution/scheduler/StandaloneScheduler.java   |  57 +++++++++-
 .../sql/analyze/StandalonePartitionFetcher.java    |  67 ++++++++++-
 .../mpp/sql/analyze/StandaloneSchemaFetcher.java   |  38 ++++++-
 .../java/org/apache/iotdb/db/service/DataNode.java |   1 +
 .../java/org/apache/iotdb/db/service/IoTDB.java    |   2 +-
 .../thrift/impl/DataNodeTSIServiceImpl.java        |  19 +++-
 .../operator/schema/SchemaScanOperatorTest.java    |   2 +-
 .../iotdb/db/service/InternalServiceImplTest.java  |   2 +-
 15 files changed, 430 insertions(+), 59 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 1ee1d804fd..d1bdc6d3b9 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -496,6 +496,9 @@ public class IoTDBConfig {
   /** indicate whether current mode is mpp */
   private boolean mppMode = false;
 
+  /** indicate whether current mode is cluster */
+  private boolean isClusterMode = false;
+
   /** Replace implementation class of influxdb protocol service */
   private String influxdbImplClassName = InfluxDBServiceImpl.class.getName();
 
@@ -2804,4 +2807,12 @@ public class IoTDBConfig {
   public void setMppMode(boolean mppMode) {
     this.mppMode = mppMode;
   }
+
+  public boolean isClusterMode() {
+    return isClusterMode;
+  }
+
+  public void setClusterMode(boolean isClusterMode) {
+    this.isClusterMode = isClusterMode;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigNode.java b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
similarity index 90%
rename from server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigNode.java
rename to server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
index 8df5935f77..2ea72f4711 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
@@ -17,19 +17,24 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.metadata;
+package org.apache.iotdb.db.localconfignode;
 
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngineV2;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.engine.storagegroup.DataRegion;
+import org.apache.iotdb.db.exception.DataRegionException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.metadata.template.UndefinedTemplateException;
+import org.apache.iotdb.db.metadata.LocalSchemaProcessor;
 import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.metadata.rescon.SchemaResourceManager;
@@ -87,7 +92,11 @@ public class LocalConfigNode {
       StorageGroupSchemaManager.getInstance();
   private TemplateManager templateManager = TemplateManager.getInstance();
   private SchemaEngine schemaEngine = SchemaEngine.getInstance();
-  private LocalSchemaPartitionTable partitionTable = LocalSchemaPartitionTable.getInstance();
+  private LocalSchemaPartitionTable schemaPartitionTable = LocalSchemaPartitionTable.getInstance();
+
+  private StorageEngineV2 storageEngine = StorageEngineV2.getInstance();
+
+  private LocalDataPartitionTable dataPartitionTable = LocalDataPartitionTable.getInstance();
 
   private LocalConfigNode() {
     String schemaDir = config.getSchemaDir();
@@ -127,7 +136,8 @@ public class LocalConfigNode {
       storageGroupSchemaManager.init();
 
       Map<PartialPath, List<SchemaRegionId>> recoveredLocalSchemaRegionInfo = schemaEngine.init();
-      partitionTable.init(recoveredLocalSchemaRegionInfo);
+      schemaPartitionTable.init(recoveredLocalSchemaRegionInfo);
+      dataPartitionTable.init(null);
 
       if (config.getSyncMlogPeriodInMs() != 0) {
         timedForceMLogThread =
@@ -160,7 +170,7 @@ public class LocalConfigNode {
         timedForceMLogThread = null;
       }
 
-      partitionTable.clear();
+      schemaPartitionTable.clear();
       schemaEngine.clear();
       storageGroupSchemaManager.clear();
       templateManager.clear();
@@ -195,7 +205,7 @@ public class LocalConfigNode {
    */
   public void setStorageGroup(PartialPath storageGroup) throws MetadataException {
     storageGroupSchemaManager.setStorageGroup(storageGroup);
-    for (SchemaRegionId schemaRegionId : partitionTable.setStorageGroup(storageGroup)) {
+    for (SchemaRegionId schemaRegionId : schemaPartitionTable.setStorageGroup(storageGroup)) {
       schemaEngine.createSchemaRegion(storageGroup, schemaRegionId);
     }
 
@@ -206,11 +216,22 @@ public class LocalConfigNode {
     if (!config.isEnableMemControl()) {
       MemTableManager.getInstance().addOrDeleteStorageGroup(1);
     }
+
+    if (config.isMppMode() && !config.isClusterMode()) {
+      for (DataRegionId dataRegionId : dataPartitionTable.setStorageGroup(storageGroup)) {
+        try {
+          storageEngine.createDataRegion(dataRegionId, storageGroup.getFullPath(), Long.MAX_VALUE);
+        } catch (DataRegionException e) {
+          // TODO (Fix exception type)
+          throw new MetadataException(e);
+        }
+      }
+    }
   }
 
   public void deleteStorageGroup(PartialPath storageGroup) throws MetadataException {
     deleteSchemaRegionsInStorageGroup(
-        storageGroup, partitionTable.getSchemaRegionIdsByStorageGroup(storageGroup));
+        storageGroup, schemaPartitionTable.getSchemaRegionIdsByStorageGroup(storageGroup));
 
     for (Template template : templateManager.getTemplateMap().values()) {
       templateManager.unmarkStorageGroup(template, storageGroup.getFullPath());
@@ -224,7 +245,7 @@ public class LocalConfigNode {
       MemTableManager.getInstance().addOrDeleteStorageGroup(-1);
     }
 
-    partitionTable.deleteStorageGroup(storageGroup);
+    schemaPartitionTable.deleteStorageGroup(storageGroup);
 
     // delete storage group after all related resources have been cleared
     storageGroupSchemaManager.deleteStorageGroup(storageGroup);
@@ -510,17 +531,17 @@ public class LocalConfigNode {
    */
   public SchemaRegionId getBelongedSchemaRegionId(PartialPath path) throws MetadataException {
     PartialPath storageGroup = storageGroupSchemaManager.getBelongedStorageGroup(path);
-    return partitionTable.getSchemaRegionId(storageGroup, path);
+    return schemaPartitionTable.getSchemaRegionId(storageGroup, path);
   }
 
   // This interface involves storage group and schema region auto creation
   public SchemaRegionId getBelongedSchemaRegionIdWithAutoCreate(PartialPath path)
       throws MetadataException {
     PartialPath storageGroup = ensureStorageGroup(path);
-    SchemaRegionId schemaRegionId = partitionTable.getSchemaRegionId(storageGroup, path);
+    SchemaRegionId schemaRegionId = schemaPartitionTable.getSchemaRegionId(storageGroup, path);
     if (schemaRegionId == null) {
-      partitionTable.setStorageGroup(storageGroup);
-      schemaRegionId = partitionTable.getSchemaRegionId(storageGroup, path);
+      schemaPartitionTable.setStorageGroup(storageGroup);
+      schemaRegionId = schemaPartitionTable.getSchemaRegionId(storageGroup, path);
     }
     ISchemaRegion schemaRegion = schemaEngine.getSchemaRegion(schemaRegionId);
     if (schemaRegion == null) {
@@ -541,14 +562,15 @@ public class LocalConfigNode {
     for (PartialPath storageGroup :
         storageGroupSchemaManager.getInvolvedStorageGroups(pathPattern, isPrefixMatch)) {
       result.addAll(
-          partitionTable.getInvolvedSchemaRegionIds(storageGroup, pathPattern, isPrefixMatch));
+          schemaPartitionTable.getInvolvedSchemaRegionIds(
+              storageGroup, pathPattern, isPrefixMatch));
     }
     return result;
   }
 
   public List<SchemaRegionId> getSchemaRegionIdsByStorageGroup(PartialPath storageGroup)
       throws MetadataException {
-    return partitionTable.getSchemaRegionIdsByStorageGroup(storageGroup);
+    return schemaPartitionTable.getSchemaRegionIdsByStorageGroup(storageGroup);
   }
 
   // endregion
@@ -734,4 +756,29 @@ public class LocalConfigNode {
   }
 
   // endregion
+
+  // region Interfaces for DataRegionId Management
+  /**
+   * Get the target DataRegionIds, which the given path belongs to. The path must be a fullPath
+   * without wildcards, * or **. This method is the first step when there's a task on one certain
+   * path, e.g., root.sg1 is a storage group and path = root.sg1.d1, return DataRegionId of
+   * root.sg1. If there's no storage group on the given path, StorageGroupNotSetException will be
+   * thrown.
+   */
+  public DataRegionId getBelongedDataRegionRegionId(PartialPath path)
+      throws MetadataException, DataRegionException {
+    PartialPath storageGroup = storageGroupSchemaManager.getBelongedStorageGroup(path);
+    DataRegionId dataRegionId = dataPartitionTable.getDataRegionId(storageGroup, path);
+    DataRegion dataRegion = storageEngine.getDataRegion(dataRegionId);
+    if (dataRegion == null) {
+      storageEngine.createDataRegion(dataRegionId, storageGroup.getFullPath(), Long.MAX_VALUE);
+    }
+    return dataPartitionTable.getDataRegionId(storageGroup, path);
+  }
+
+  public List<DataRegionId> getDataRegionIdsByStorageGroup(PartialPath storageGroup) {
+    return dataPartitionTable.getDataRegionIdsByStorageGroup(storageGroup);
+  }
+
+  // endregion
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalDataPartitionTable.java b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalDataPartitionTable.java
new file mode 100644
index 0000000000..0cc9e81d6d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalDataPartitionTable.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.localconfignode;
+
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+// This class is used for data partition maintaining the map between storage group and
+// dataRegionIds.
+public class LocalDataPartitionTable {
+
+  private AtomicInteger dataRegionIdGenerator;
+
+  private Map<PartialPath, List<DataRegionId>> table;
+
+  private static class LocalDataPartitionTableHolder {
+    private static final LocalDataPartitionTable INSTANCE = new LocalDataPartitionTable();
+
+    private LocalDataPartitionTableHolder() {};
+  }
+
+  private LocalDataPartitionTable() {}
+
+  public static LocalDataPartitionTable getInstance() {
+    return LocalDataPartitionTableHolder.INSTANCE;
+  }
+
+  public synchronized void init(Map<PartialPath, List<DataRegionId>> recoveredLocalDataRegionInfo) {
+    table = new ConcurrentHashMap<>();
+    dataRegionIdGenerator = new AtomicInteger(0);
+    // TODO:(recovery)
+  }
+
+  public synchronized void clear() {
+    if (table != null) {
+      table.clear();
+      table = null;
+    }
+
+    if (dataRegionIdGenerator != null) {
+      dataRegionIdGenerator = null;
+    }
+  }
+
+  public synchronized DataRegionId allocateDataRegionId(PartialPath storageGroup) {
+    DataRegionId dataRegionId = new DataRegionId(dataRegionIdGenerator.getAndIncrement());
+    table.get(storageGroup).add(dataRegionId);
+    return dataRegionId;
+  }
+
+  public synchronized void putDataRegionId(PartialPath storageGroup, DataRegionId dataRegionId) {
+    table.get(storageGroup).add(dataRegionId);
+
+    if (dataRegionId.getId() >= dataRegionIdGenerator.get()) {
+      dataRegionIdGenerator.set(dataRegionId.getId() + 1);
+    }
+  }
+
+  public synchronized void removeDataRegionId(PartialPath storageGroup, DataRegionId dataRegionId) {
+    table.get(storageGroup).remove(dataRegionId);
+  }
+
+  public DataRegionId getDataRegionId(PartialPath storageGroup, PartialPath path) {
+    return calculateDataRegionId(storageGroup, path);
+  }
+
+  public List<DataRegionId> getInvolvedDataRegionIds(
+      PartialPath storageGroup, PartialPath pathPattern, boolean isPrefixMatch) {
+    List<DataRegionId> result = new ArrayList<>();
+    if (table.containsKey(storageGroup)) {
+      result.addAll(table.get(storageGroup));
+    }
+    return result;
+  }
+
+  public List<DataRegionId> getDataRegionIdsByStorageGroup(PartialPath storageGroup) {
+    return new ArrayList<>(table.get(storageGroup));
+  }
+
+  public synchronized List<DataRegionId> setStorageGroup(PartialPath storageGroup) {
+    if (table.containsKey(storageGroup)) {
+      return table.get(storageGroup);
+    }
+    List<DataRegionId> dataRegionIdList = new CopyOnWriteArrayList<>();
+    dataRegionIdList.add(new DataRegionId(dataRegionIdGenerator.getAndIncrement()));
+    table.put(storageGroup, dataRegionIdList);
+    return dataRegionIdList;
+  }
+
+  public synchronized List<DataRegionId> deleteStorageGroup(PartialPath storageGroup) {
+    return table.remove(storageGroup);
+  }
+
+  // This method may be extended to implement multi dataRegion for one storageGroup
+  // todo keep consistent with the partition method of config node in new cluster
+  private DataRegionId calculateDataRegionId(PartialPath storageGroup, PartialPath path) {
+    if (!table.containsKey(storageGroup)) {
+      setStorageGroup(storageGroup);
+    }
+    return table.get(storageGroup).iterator().next();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaPartitionTable.java b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalSchemaPartitionTable.java
similarity index 98%
rename from server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaPartitionTable.java
rename to server/src/main/java/org/apache/iotdb/db/localconfignode/LocalSchemaPartitionTable.java
index c0c65c2ebe..df61980759 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaPartitionTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalSchemaPartitionTable.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.metadata;
+package org.apache.iotdb.db.localconfignode;
 
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
index 7d82d3167b..5a3622b1d3 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.metadata.template.UndefinedTemplateException;
+import org.apache.iotdb.db.localconfignode.LocalConfigNode;
 import org.apache.iotdb.db.metadata.lastCache.LastCacheManager;
 import org.apache.iotdb.db.metadata.mnode.IMNode;
 import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 0df9e2de4d..67b4be045c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.mpp.execution;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.buffer.DataBlockService;
 import org.apache.iotdb.db.mpp.buffer.ISourceHandle;
@@ -26,6 +27,7 @@ import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
 import org.apache.iotdb.db.mpp.execution.scheduler.ClusterScheduler;
 import org.apache.iotdb.db.mpp.execution.scheduler.IScheduler;
+import org.apache.iotdb.db.mpp.execution.scheduler.StandaloneScheduler;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.analyze.Analyzer;
 import org.apache.iotdb.db.mpp.sql.analyze.IPartitionFetcher;
@@ -65,6 +67,8 @@ import static com.google.common.base.Throwables.throwIfUnchecked;
 public class QueryExecution implements IQueryExecution {
   private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class);
 
+  private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
   private final MPPQueryContext context;
   private IScheduler scheduler;
   private final QueryStateMachine stateMachine;
@@ -142,13 +146,21 @@ public class QueryExecution implements IQueryExecution {
   private void schedule() {
     // TODO: (xingtanzjr) initialize the query scheduler according to configuration
     this.scheduler =
-        new ClusterScheduler(
-            context,
-            stateMachine,
-            distributedPlan.getInstances(),
-            context.getQueryType(),
-            executor,
-            scheduledExecutor);
+        config.isClusterMode()
+            ? new ClusterScheduler(
+                context,
+                stateMachine,
+                distributedPlan.getInstances(),
+                context.getQueryType(),
+                executor,
+                scheduledExecutor)
+            : new StandaloneScheduler(
+                context,
+                stateMachine,
+                distributedPlan.getInstances(),
+                context.getQueryType(),
+                executor,
+                scheduledExecutor);
     this.scheduler.start();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/SetStorageGroupTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/SetStorageGroupTask.java
index a28c7924c4..81eae020fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/SetStorageGroupTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/SetStorageGroupTask.java
@@ -24,6 +24,10 @@ import org.apache.iotdb.commons.exception.BadNodeUrlException;
 import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
 import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.localconfignode.LocalConfigNode;
 import org.apache.iotdb.db.mpp.sql.statement.metadata.SetStorageGroupStatement;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
@@ -36,6 +40,9 @@ import org.slf4j.LoggerFactory;
 
 public class SetStorageGroupTask implements IConfigTask {
   private static final Logger LOGGER = LoggerFactory.getLogger(SetStorageGroupTask.class);
+
+  private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
   private final SetStorageGroupStatement setStorageGroupStatement;
 
   public SetStorageGroupTask(SetStorageGroupStatement setStorageGroupStatement) {
@@ -45,31 +52,40 @@ public class SetStorageGroupTask implements IConfigTask {
   @Override
   public ListenableFuture<ConfigTaskResult> execute() {
     SettableFuture<ConfigTaskResult> future = SettableFuture.create();
-    // Construct request using statement
-    TStorageGroupSchema storageGroupSchema = new TStorageGroupSchema();
-    storageGroupSchema.setName(setStorageGroupStatement.getStorageGroupPath().getFullPath());
-    TSetStorageGroupReq req = new TSetStorageGroupReq(storageGroupSchema);
-
-    ConfigNodeClient configNodeClient = null;
-    try {
-      configNodeClient = new ConfigNodeClient();
-      // Send request to some API server
-      TSStatus tsStatus = configNodeClient.setStorageGroup(req);
-      // Get response or throw exception
-      if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
-        LOGGER.error(
-            "Failed to execute set storage group {} in config node, status is {}.",
-            setStorageGroupStatement.getStorageGroupPath(),
-            tsStatus);
-        future.setException(new StatementExecutionException(tsStatus));
-      } else {
-        future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+    // TODO:(this judgement needs to be integrated in a high level framework)
+    if (config.isClusterMode()) {
+      // Construct request using statement
+      TStorageGroupSchema storageGroupSchema = new TStorageGroupSchema();
+      storageGroupSchema.setName(setStorageGroupStatement.getStorageGroupPath().getFullPath());
+      TSetStorageGroupReq req = new TSetStorageGroupReq(storageGroupSchema);
+      ConfigNodeClient configNodeClient = null;
+      try {
+        configNodeClient = new ConfigNodeClient();
+        // Send request to some API server
+        TSStatus tsStatus = configNodeClient.setStorageGroup(req);
+        // Get response or throw exception
+        if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
+          LOGGER.error(
+              "Failed to execute set storage group {} in config node, status is {}.",
+              setStorageGroupStatement.getStorageGroupPath(),
+              tsStatus);
+          future.setException(new StatementExecutionException(tsStatus));
+        } else {
+          future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+        }
+      } catch (IoTDBConnectionException | BadNodeUrlException e) {
+        LOGGER.error("Failed to connect to config node.");
+        future.setException(e);
       }
-    } catch (IoTDBConnectionException | BadNodeUrlException e) {
-      LOGGER.error("Failed to connect to config node.");
-      future.setException(e);
+    } else {
+      try {
+        LocalConfigNode.getInstance()
+            .setStorageGroup(setStorageGroupStatement.getStorageGroupPath());
+      } catch (MetadataException e) {
+        future.setException(e);
+      }
+      future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
     }
-
     // If the action is executed successfully, return the Future.
     // If your operation is async, you can return the corresponding future directly.
     return future;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
index 1081b7359c..a1c31c99ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
@@ -18,22 +18,73 @@
  */
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
-import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.StorageEngineV2;
 import org.apache.iotdb.db.metadata.LocalSchemaProcessor;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.execution.FragmentInfo;
+import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
+import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 
 import io.airlift.units.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
 
 public class StandaloneScheduler implements IScheduler {
 
-  private static final StorageEngine STORAGE_ENGINE = StorageEngine.getInstance();
+  private static final StorageEngineV2 STORAGE_ENGINE = StorageEngineV2.getInstance();
 
   private static final LocalSchemaProcessor SCHEMA_ENGINE = LocalSchemaProcessor.getInstance();
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(ClusterScheduler.class);
+
+  private MPPQueryContext queryContext;
+  // The stateMachine of the QueryExecution owned by this QueryScheduler
+  private QueryStateMachine stateMachine;
+  private QueryType queryType;
+  // The fragment instances which should be sent to corresponding Nodes.
+  private List<FragmentInstance> instances;
+
+  private ExecutorService executor;
+  private ScheduledExecutorService scheduledExecutor;
+
+  private IFragInstanceDispatcher dispatcher;
+  private IFragInstanceStateTracker stateTracker;
+  private IQueryTerminator queryTerminator;
+
+  public StandaloneScheduler(
+      MPPQueryContext queryContext,
+      QueryStateMachine stateMachine,
+      List<FragmentInstance> instances,
+      QueryType queryType,
+      ExecutorService executor,
+      ScheduledExecutorService scheduledExecutor) {
+    this.queryContext = queryContext;
+    this.instances = instances;
+    this.queryType = queryType;
+    this.executor = executor;
+    this.scheduledExecutor = scheduledExecutor;
+    this.stateTracker =
+        new FixedRateFragInsStateTracker(stateMachine, executor, scheduledExecutor, instances);
+    this.queryTerminator =
+        new SimpleQueryTerminator(executor, queryContext.getQueryId(), instances);
+  }
+
   @Override
-  public void start() {}
+  public void start() {
+    // For the FragmentInstance of WRITE, it will be executed directly when dispatching.
+    // TODO: Other QueryTypes
+    if (queryType == QueryType.WRITE) {
+
+      return;
+    }
+  }
 
   @Override
   public void stop() {}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandalonePartitionFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandalonePartitionFetcher.java
index 7fbdb00bf4..ceacc51b6c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandalonePartitionFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandalonePartitionFetcher.java
@@ -18,21 +18,41 @@
  */
 package org.apache.iotdb.db.mpp.sql.analyze;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
 import org.apache.iotdb.commons.partition.SchemaPartition;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngineV2;
+import org.apache.iotdb.db.exception.DataRegionException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
+import org.apache.iotdb.db.localconfignode.LocalConfigNode;
+import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
 
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 public class StandalonePartitionFetcher implements IPartitionFetcher {
 
-  private StandalonePartitionFetcher() {}
+  private final LocalConfigNode localConfigNode = LocalConfigNode.getInstance();
+  private final StorageEngineV2 storageEngine = StorageEngineV2.getInstance();
+
+  private static final class StandalonePartitionFetcherHolder {
+    private static final StandalonePartitionFetcher INSTANCE = new StandalonePartitionFetcher();
+
+    private StandalonePartitionFetcherHolder() {}
+  }
 
-  // TODO need to use safe singleton pattern
   public static StandalonePartitionFetcher getInstance() {
-    return new StandalonePartitionFetcher();
+    return StandalonePartitionFetcher.StandalonePartitionFetcherHolder.INSTANCE;
   }
 
   @Override
@@ -48,12 +68,49 @@ public class StandalonePartitionFetcher implements IPartitionFetcher {
   @Override
   public DataPartition getDataPartition(
       Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
-    return null;
+    try {
+      Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
+          dataPartitionMap = new HashMap<>();
+      for (Map.Entry<String, List<DataPartitionQueryParam>> sgEntry :
+          sgNameToQueryParamsMap.entrySet()) {
+        // for each sg
+        String storageGroupName = sgEntry.getKey();
+        List<DataPartitionQueryParam> dataPartitionQueryParams = sgEntry.getValue();
+        Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>
+            deviceToRegionsMap = new HashMap<>();
+        for (DataPartitionQueryParam dataPartitionQueryParam : dataPartitionQueryParams) {
+          // for each device
+          String deviceId = dataPartitionQueryParam.getDevicePath();
+          DataRegionId dataRegionId =
+              localConfigNode.getBelongedDataRegionRegionId(new PartialPath(deviceId));
+          Map<TTimePartitionSlot, List<TRegionReplicaSet>> timePartitionToRegionsMap =
+              new HashMap<>();
+          for (TTimePartitionSlot timePartitionSlot :
+              dataPartitionQueryParam.getTimePartitionSlotList()) {
+            // for each time partition
+            timePartitionToRegionsMap.put(
+                timePartitionSlot,
+                Collections.singletonList(
+                    new TRegionReplicaSet(
+                        new TConsensusGroupId(dataRegionId.getType(), dataRegionId.getId()),
+                        Collections.EMPTY_LIST)));
+          }
+          deviceToRegionsMap.put(new TSeriesPartitionSlot(), timePartitionToRegionsMap);
+        }
+        dataPartitionMap.put(storageGroupName, deviceToRegionsMap);
+      }
+      return new DataPartition(
+          dataPartitionMap,
+          IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
+          IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
+    } catch (MetadataException | DataRegionException e) {
+      throw new StatementAnalyzeException("An error occurred when executing getDataPartition()");
+    }
   }
 
   @Override
   public DataPartition getOrCreateDataPartition(
       Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
-    return null;
+    return getDataPartition(sgNameToQueryParamsMap);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandaloneSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandaloneSchemaFetcher.java
index e1deb0f9b5..1a43583964 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandaloneSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandaloneSchemaFetcher.java
@@ -18,15 +18,29 @@
  */
 package org.apache.iotdb.db.mpp.sql.analyze;
 
+import org.apache.iotdb.commons.consensus.SchemaRegionId;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.localconfignode.LocalConfigNode;
 import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
+import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
 import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 public class StandaloneSchemaFetcher implements ISchemaFetcher {
 
+  private final LocalConfigNode localConfigNode = LocalConfigNode.getInstance();
+  private final SchemaEngine schemaEngine = SchemaEngine.getInstance();
+
   private StandaloneSchemaFetcher() {}
 
   public static StandaloneSchemaFetcher getInstance() {
@@ -35,7 +49,22 @@ public class StandaloneSchemaFetcher implements ISchemaFetcher {
 
   @Override
   public SchemaTree fetchSchema(PathPatternTree patternTree) {
-    return null;
+    Set<String> storageGroupSet = new HashSet<>();
+    SchemaTree schemaTree = new SchemaTree();
+    List<PartialPath> partialPathList = patternTree.splitToPathList();
+    try {
+      for (PartialPath path : partialPathList) {
+        String storageGroup = localConfigNode.getBelongedStorageGroup(path).getFullPath();
+        storageGroupSet.add(storageGroup);
+        SchemaRegionId schemaRegionId = localConfigNode.getBelongedSchemaRegionId(path);
+        ISchemaRegion schemaRegion = schemaEngine.getSchemaRegion(schemaRegionId);
+        schemaTree.appendMeasurementPaths(schemaRegion.getMeasurementPaths(path, false));
+      }
+    } catch (MetadataException e) {
+      throw new RuntimeException(e);
+    }
+    schemaTree.setStorageGroups(new ArrayList<>(storageGroupSet));
+    return schemaTree;
   }
 
   @Override
@@ -50,6 +79,11 @@ public class StandaloneSchemaFetcher implements ISchemaFetcher {
       List<String[]> measurements,
       List<TSDataType[]> tsDataTypes,
       List<Boolean> aligned) {
-    return null;
+    Map<PartialPath, List<String>> deviceToMeasurementMap = new HashMap<>();
+    for (int i = 0; i < devicePath.size(); i++) {
+      deviceToMeasurementMap.put(devicePath.get(i), Arrays.asList(measurements.get(i)));
+    }
+    // todo implement auto create schema
+    return fetchSchema(new PathPatternTree(deviceToMeasurementMap));
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 7774b43826..bedd697089 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -170,6 +170,7 @@ public class DataNode implements DataNodeMBean {
   public void active() throws StartupException {
     // set the mpp mode to true
     IoTDBDescriptor.getInstance().getConfig().setMppMode(true);
+    IoTDBDescriptor.getInstance().getConfig().setClusterMode(true);
     // start iotdb server first
     IoTDB.getInstance().active();
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index f281b1f4b5..b0324c3a1a 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -37,7 +37,7 @@ import org.apache.iotdb.db.engine.cq.ContinuousQueryService;
 import org.apache.iotdb.db.engine.flush.FlushManager;
 import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.metadata.LocalConfigNode;
+import org.apache.iotdb.db.localconfignode.LocalConfigNode;
 import org.apache.iotdb.db.metadata.LocalSchemaProcessor;
 import org.apache.iotdb.db.mpp.buffer.DataBlockService;
 import org.apache.iotdb.db.mpp.schedule.FragmentInstanceScheduler;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
index d54841ab0b..78a5d986ce 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.service.thrift.impl;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.OperationType;
 import org.apache.iotdb.db.mpp.common.QueryId;
@@ -31,6 +32,8 @@ import org.apache.iotdb.db.mpp.sql.analyze.ClusterPartitionFetcher;
 import org.apache.iotdb.db.mpp.sql.analyze.ClusterSchemaFetcher;
 import org.apache.iotdb.db.mpp.sql.analyze.IPartitionFetcher;
 import org.apache.iotdb.db.mpp.sql.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.sql.analyze.StandalonePartitionFetcher;
+import org.apache.iotdb.db.mpp.sql.analyze.StandaloneSchemaFetcher;
 import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
 import org.apache.iotdb.db.mpp.sql.statement.Statement;
 import org.apache.iotdb.db.mpp.sql.statement.crud.InsertMultiTabletsStatement;
@@ -107,13 +110,25 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeTSIServiceImpl.class);
 
+  private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
   private static final Coordinator COORDINATOR = Coordinator.getInstance();
 
   private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
 
-  private static final IPartitionFetcher PARTITION_FETCHER = ClusterPartitionFetcher.getInstance();
+  private final IPartitionFetcher PARTITION_FETCHER;
+
+  private final ISchemaFetcher SCHEMA_FETCHER;
 
-  private static final ISchemaFetcher SCHEMA_FETCHER = ClusterSchemaFetcher.getInstance();
+  public DataNodeTSIServiceImpl() {
+    if (config.isClusterMode()) {
+      PARTITION_FETCHER = ClusterPartitionFetcher.getInstance();
+      SCHEMA_FETCHER = ClusterSchemaFetcher.getInstance();
+    } else {
+      PARTITION_FETCHER = StandalonePartitionFetcher.getInstance();
+      SCHEMA_FETCHER = StandaloneSchemaFetcher.getInstance();
+    }
+  }
 
   @Override
   public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java
index d4532ff167..6b3874b11d 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.mpp.operator.schema;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.metadata.LocalConfigNode;
+import org.apache.iotdb.db.localconfignode.LocalConfigNode;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
diff --git a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
index 255b73f5a7..20e2ab626b 100644
--- a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
@@ -32,7 +32,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.consensus.ConsensusImpl;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.metadata.LocalConfigNode;
+import org.apache.iotdb.db.localconfignode.LocalConfigNode;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.sql.analyze.QueryType;