You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/04/21 10:05:28 UTC

[iotdb] 01/02: init

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch standaloneMPPWrite
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 92a1e2669c071b92d028fc1464aec6718ab52df1
Author: HTHou <hh...@outlook.com>
AuthorDate: Thu Apr 21 17:09:26 2022 +0800

    init
---
 .../apache/iotdb/db/metadata/LocalConfigNode.java  |  60 +++++++---
 .../iotdb/db/metadata/LocalDataPartitionTable.java | 124 +++++++++++++++++++++
 .../iotdb/db/mpp/execution/QueryExecution.java     |  24 ++--
 .../execution/scheduler/StandaloneScheduler.java   |  56 +++++++++-
 .../sql/analyze/StandalonePartitionFetcher.java    |  64 ++++++++++-
 .../mpp/sql/analyze/StandaloneSchemaFetcher.java   |  38 ++++++-
 .../java/org/apache/iotdb/db/service/DataNode.java |   1 +
 .../thrift/impl/DataNodeTSIServiceImpl.java        |  17 ++-
 8 files changed, 352 insertions(+), 32 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigNode.java
index 2b2f20433a..3500192207 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigNode.java
@@ -21,10 +21,14 @@ package org.apache.iotdb.db.metadata;
 
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngineV2;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.engine.storagegroup.DataRegion;
+import org.apache.iotdb.db.exception.DataRegionException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
@@ -87,7 +91,11 @@ public class LocalConfigNode {
       StorageGroupSchemaManager.getInstance();
   private TemplateManager templateManager = TemplateManager.getInstance();
   private SchemaEngine schemaEngine = SchemaEngine.getInstance();
-  private LocalSchemaPartitionTable partitionTable = LocalSchemaPartitionTable.getInstance();
+  private LocalSchemaPartitionTable schemaPartitionTable = LocalSchemaPartitionTable.getInstance();
+
+  private StorageEngineV2 storageEngine = StorageEngineV2.getInstance();
+
+  private LocalDataPartitionTable dataPartitionTable = LocalDataPartitionTable.getInstance();
 
   private LocalConfigNode() {
     String schemaDir = config.getSchemaDir();
@@ -125,7 +133,7 @@ public class LocalConfigNode {
 
       templateManager.init();
       storageGroupSchemaManager.init();
-      partitionTable.init();
+      schemaPartitionTable.init();
       schemaEngine.init();
 
       initSchemaRegion();
@@ -150,7 +158,7 @@ public class LocalConfigNode {
 
   private void initSchemaRegion() throws MetadataException {
     for (PartialPath storageGroup : storageGroupSchemaManager.getAllStorageGroupPaths()) {
-      partitionTable.setStorageGroup(storageGroup);
+      schemaPartitionTable.setStorageGroup(storageGroup);
 
       File sgDir = new File(config.getSchemaDir(), storageGroup.getFullPath());
 
@@ -167,7 +175,7 @@ public class LocalConfigNode {
         SchemaRegionId schemaRegionId =
             new SchemaRegionId(Integer.parseInt(schemaRegionDir.getName()));
         schemaEngine.createSchemaRegion(storageGroup, schemaRegionId);
-        partitionTable.putSchemaRegionId(storageGroup, schemaRegionId);
+        schemaPartitionTable.putSchemaRegionId(storageGroup, schemaRegionId);
       }
     }
   }
@@ -185,7 +193,7 @@ public class LocalConfigNode {
         timedForceMLogThread = null;
       }
 
-      partitionTable.clear();
+      schemaPartitionTable.clear();
       schemaEngine.clear();
       storageGroupSchemaManager.clear();
       templateManager.clear();
@@ -220,10 +228,10 @@ public class LocalConfigNode {
    */
   public void setStorageGroup(PartialPath storageGroup) throws MetadataException {
     storageGroupSchemaManager.setStorageGroup(storageGroup);
-    partitionTable.setStorageGroup(storageGroup);
+    schemaPartitionTable.setStorageGroup(storageGroup);
 
     schemaEngine.createSchemaRegion(
-        storageGroup, partitionTable.allocateSchemaRegionId(storageGroup));
+        storageGroup, schemaPartitionTable.allocateSchemaRegionId(storageGroup));
     if (SchemaSyncManager.getInstance().isEnableSync()) {
       SchemaSyncManager.getInstance().syncMetadataPlan(new SetStorageGroupPlan(storageGroup));
     }
@@ -235,7 +243,7 @@ public class LocalConfigNode {
 
   public void deleteStorageGroup(PartialPath storageGroup) throws MetadataException {
     deleteSchemaRegionsInStorageGroup(
-        storageGroup, partitionTable.getSchemaRegionIdsByStorageGroup(storageGroup));
+        storageGroup, schemaPartitionTable.getSchemaRegionIdsByStorageGroup(storageGroup));
 
     for (Template template : templateManager.getTemplateMap().values()) {
       templateManager.unmarkStorageGroup(template, storageGroup.getFullPath());
@@ -249,7 +257,7 @@ public class LocalConfigNode {
       MemTableManager.getInstance().addOrDeleteStorageGroup(-1);
     }
 
-    partitionTable.deleteStorageGroup(storageGroup);
+    schemaPartitionTable.deleteStorageGroup(storageGroup);
 
     // delete storage group after all related resources have been cleared
     storageGroupSchemaManager.deleteStorageGroup(storageGroup);
@@ -534,12 +542,12 @@ public class LocalConfigNode {
    */
   public SchemaRegionId getBelongedSchemaRegionId(PartialPath path) throws MetadataException {
     PartialPath storageGroup = storageGroupSchemaManager.getBelongedStorageGroup(path);
-    SchemaRegionId schemaRegionId = partitionTable.getSchemaRegionId(storageGroup, path);
+    SchemaRegionId schemaRegionId = schemaPartitionTable.getSchemaRegionId(storageGroup, path);
     ISchemaRegion schemaRegion = schemaEngine.getSchemaRegion(schemaRegionId);
     if (schemaRegion == null) {
       schemaEngine.createSchemaRegion(storageGroup, schemaRegionId);
     }
-    return partitionTable.getSchemaRegionId(storageGroup, path);
+    return schemaPartitionTable.getSchemaRegionId(storageGroup, path);
   }
 
   // This interface involves storage group auto creation
@@ -561,14 +569,15 @@ public class LocalConfigNode {
     for (PartialPath storageGroup :
         storageGroupSchemaManager.getInvolvedStorageGroups(pathPattern, isPrefixMatch)) {
       result.addAll(
-          partitionTable.getInvolvedSchemaRegionIds(storageGroup, pathPattern, isPrefixMatch));
+          schemaPartitionTable.getInvolvedSchemaRegionIds(
+              storageGroup, pathPattern, isPrefixMatch));
     }
     return result;
   }
 
   public List<SchemaRegionId> getSchemaRegionIdsByStorageGroup(PartialPath storageGroup)
       throws MetadataException {
-    return partitionTable.getSchemaRegionIdsByStorageGroup(storageGroup);
+    return schemaPartitionTable.getSchemaRegionIdsByStorageGroup(storageGroup);
   }
 
   // endregion
@@ -754,4 +763,29 @@ public class LocalConfigNode {
   }
 
   // endregion
+
+  // region Interfaces for DataRegionId Management
+  /**
+   * Get the target DataRegionIds, which the given path belongs to. The path must be a fullPath
+   * without wildcards, * or **. This method is the first step when there's a task on one certain
+   * path, e.g., root.sg1 is a storage group and path = root.sg1.d1, return DataRegionId of
+   * root.sg1. If there's no storage group on the given path, StorageGroupNotSetException will be
+   * thrown.
+   */
+  public DataRegionId getBelongedDataRegionRegionId(PartialPath path)
+      throws MetadataException, DataRegionException {
+    PartialPath storageGroup = storageGroupSchemaManager.getBelongedStorageGroup(path);
+    DataRegionId dataRegionId = dataPartitionTable.getDataRegionId(storageGroup, path);
+    DataRegion dataRegion = storageEngine.getDataRegion(dataRegionId);
+    if (dataRegion == null) {
+      storageEngine.createDataRegion(dataRegionId, storageGroup.getFullPath(), Long.MAX_VALUE);
+    }
+    return dataPartitionTable.getDataRegionId(storageGroup, path);
+  }
+
+  public List<DataRegionId> getDataRegionIdsByStorageGroup(PartialPath storageGroup) {
+    return dataPartitionTable.getDataRegionIdsByStorageGroup(storageGroup);
+  }
+
+  // endregion
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/LocalDataPartitionTable.java b/server/src/main/java/org/apache/iotdb/db/metadata/LocalDataPartitionTable.java
new file mode 100644
index 0000000000..52fc102437
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/LocalDataPartitionTable.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata;
+
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+// This class is used for data partition maintaining the map between storage group and
+// dataRegionIds.
+public class LocalDataPartitionTable {
+
+  private AtomicInteger dataRegionIdGenerator;
+
+  private Map<PartialPath, Set<DataRegionId>> table;
+
+  private static class LocalDataPartitionTableHolder {
+    private static final LocalDataPartitionTable INSTANCE = new LocalDataPartitionTable();
+
+    private LocalDataPartitionTableHolder() {};
+  }
+
+  private LocalDataPartitionTable() {}
+
+  public static LocalDataPartitionTable getInstance() {
+    return LocalDataPartitionTableHolder.INSTANCE;
+  }
+
+  public synchronized void init() {
+    table = new ConcurrentHashMap<>();
+    dataRegionIdGenerator = new AtomicInteger(0);
+  }
+
+  public synchronized void clear() {
+    if (table != null) {
+      table.clear();
+      table = null;
+    }
+
+    if (dataRegionIdGenerator != null) {
+      dataRegionIdGenerator = null;
+    }
+  }
+
+  public synchronized DataRegionId allocateDataRegionId(PartialPath storageGroup) {
+    DataRegionId dataRegionId = new DataRegionId(dataRegionIdGenerator.getAndIncrement());
+    table.get(storageGroup).add(dataRegionId);
+    return dataRegionId;
+  }
+
+  public synchronized void putDataRegionId(PartialPath storageGroup, DataRegionId dataRegionId) {
+    table.get(storageGroup).add(dataRegionId);
+
+    if (dataRegionId.getId() >= dataRegionIdGenerator.get()) {
+      dataRegionIdGenerator.set(dataRegionId.getId() + 1);
+    }
+  }
+
+  public synchronized void removeDataRegionId(PartialPath storageGroup, DataRegionId dataRegionId) {
+    table.get(storageGroup).remove(dataRegionId);
+  }
+
+  public DataRegionId getDataRegionId(PartialPath storageGroup, PartialPath path) {
+    return calculateDataRegionId(storageGroup, path);
+  }
+
+  public List<DataRegionId> getInvolvedDataRegionIds(
+      PartialPath storageGroup, PartialPath pathPattern, boolean isPrefixMatch) {
+    List<DataRegionId> result = new ArrayList<>();
+    if (table.containsKey(storageGroup)) {
+      result.addAll(table.get(storageGroup));
+    }
+    return result;
+  }
+
+  public List<DataRegionId> getDataRegionIdsByStorageGroup(PartialPath storageGroup) {
+    return new ArrayList<>(table.get(storageGroup));
+  }
+
+  public synchronized void setStorageGroup(PartialPath storageGroup) {
+    if (table.containsKey(storageGroup)) {
+      return;
+    }
+    table.put(storageGroup, Collections.synchronizedSet(new HashSet<>()));
+  }
+
+  public synchronized Set<DataRegionId> deleteStorageGroup(PartialPath storageGroup) {
+    return table.remove(storageGroup);
+  }
+
+  // This method may be extended to implement multi dataRegion for one storageGroup
+  // todo keep consistent with the partition method of config node in new cluster
+  private DataRegionId calculateDataRegionId(PartialPath storageGroup, PartialPath path) {
+    if (!table.containsKey(storageGroup)) {
+      setStorageGroup(storageGroup);
+    }
+    return table.get(storageGroup).iterator().next();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index a9bd605823..6eaf77d939 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
 import org.apache.iotdb.db.mpp.execution.scheduler.ClusterScheduler;
 import org.apache.iotdb.db.mpp.execution.scheduler.IScheduler;
+import org.apache.iotdb.db.mpp.execution.scheduler.StandaloneScheduler;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.analyze.Analyzer;
 import org.apache.iotdb.db.mpp.sql.analyze.IPartitionFetcher;
@@ -37,6 +38,7 @@ import org.apache.iotdb.db.mpp.sql.planner.LogicalPlanner;
 import org.apache.iotdb.db.mpp.sql.planner.plan.DistributedQueryPlan;
 import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan;
 import org.apache.iotdb.db.mpp.sql.statement.Statement;
+import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -142,13 +144,21 @@ public class QueryExecution implements IQueryExecution {
   private void schedule() {
     // TODO: (xingtanzjr) initialize the query scheduler according to configuration
     this.scheduler =
-        new ClusterScheduler(
-            context,
-            stateMachine,
-            distributedPlan.getInstances(),
-            context.getQueryType(),
-            executor,
-            scheduledExecutor);
+        IoTDB.getInstance().isClusterMode()
+            ? new ClusterScheduler(
+                context,
+                stateMachine,
+                distributedPlan.getInstances(),
+                context.getQueryType(),
+                executor,
+                scheduledExecutor)
+            : new StandaloneScheduler(
+                context,
+                stateMachine,
+                distributedPlan.getInstances(),
+                context.getQueryType(),
+                executor,
+                scheduledExecutor);
     this.scheduler.start();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
index 1081b7359c..5abf0490e9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
@@ -18,22 +18,72 @@
  */
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
-import org.apache.iotdb.db.engine.StorageEngine;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.iotdb.db.engine.StorageEngineV2;
 import org.apache.iotdb.db.metadata.LocalSchemaProcessor;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.execution.FragmentInfo;
 
 import io.airlift.units.Duration;
+import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
+import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class StandaloneScheduler implements IScheduler {
 
-  private static final StorageEngine STORAGE_ENGINE = StorageEngine.getInstance();
+  private static final StorageEngineV2 STORAGE_ENGINE = StorageEngineV2.getInstance();
 
   private static final LocalSchemaProcessor SCHEMA_ENGINE = LocalSchemaProcessor.getInstance();
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(ClusterScheduler.class);
+
+  private MPPQueryContext queryContext;
+  // The stateMachine of the QueryExecution owned by this QueryScheduler
+  private QueryStateMachine stateMachine;
+  private QueryType queryType;
+  // The fragment instances which should be sent to corresponding Nodes.
+  private List<FragmentInstance> instances;
+
+  private ExecutorService executor;
+  private ScheduledExecutorService scheduledExecutor;
+
+  private IFragInstanceDispatcher dispatcher;
+  private IFragInstanceStateTracker stateTracker;
+  private IQueryTerminator queryTerminator;
+
+  public StandaloneScheduler(
+      MPPQueryContext queryContext,
+      QueryStateMachine stateMachine,
+      List<FragmentInstance> instances,
+      QueryType queryType,
+      ExecutorService executor,
+      ScheduledExecutorService scheduledExecutor) {
+    this.queryContext = queryContext;
+    this.instances = instances;
+    this.queryType = queryType;
+    this.executor = executor;
+    this.scheduledExecutor = scheduledExecutor;
+    this.stateTracker =
+        new FixedRateFragInsStateTracker(stateMachine, executor, scheduledExecutor, instances);
+    this.queryTerminator =
+        new SimpleQueryTerminator(executor, queryContext.getQueryId(), instances);
+  }
+
   @Override
-  public void start() {}
+  public void start() {
+    // For the FragmentInstance of WRITE, it will be executed directly when dispatching.
+    // TODO: Other QueryTypes
+    if (queryType == QueryType.WRITE) {
+
+      return;
+    }
+  }
 
   @Override
   public void stop() {}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandalonePartitionFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandalonePartitionFetcher.java
index 7fbdb00bf4..c78fc3d8d9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandalonePartitionFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandalonePartitionFetcher.java
@@ -18,21 +18,40 @@
  */
 package org.apache.iotdb.db.mpp.sql.analyze;
 
+import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
 import org.apache.iotdb.commons.partition.SchemaPartition;
+import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
+import org.apache.iotdb.commons.partition.TimePartitionSlot;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngineV2;
+import org.apache.iotdb.db.exception.DataRegionException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
+import org.apache.iotdb.db.metadata.LocalConfigNode;
+import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
 
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 public class StandalonePartitionFetcher implements IPartitionFetcher {
 
-  private StandalonePartitionFetcher() {}
+  private final LocalConfigNode localConfigNode = LocalConfigNode.getInstance();
+  private final StorageEngineV2 storageEngine = StorageEngineV2.getInstance();
+
+  private static final class StandalonePartitionFetcherHolder {
+    private static final StandalonePartitionFetcher INSTANCE = new StandalonePartitionFetcher();
+
+    private StandalonePartitionFetcherHolder() {}
+  }
 
-  // TODO need to use safe singleton pattern
   public static StandalonePartitionFetcher getInstance() {
-    return new StandalonePartitionFetcher();
+    return StandalonePartitionFetcher.StandalonePartitionFetcherHolder.INSTANCE;
   }
 
   @Override
@@ -48,12 +67,47 @@ public class StandalonePartitionFetcher implements IPartitionFetcher {
   @Override
   public DataPartition getDataPartition(
       Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
-    return null;
+    try {
+      Map<String, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>>
+          dataPartitionMap = new HashMap<>();
+      for (Map.Entry<String, List<DataPartitionQueryParam>> sgEntry :
+          sgNameToQueryParamsMap.entrySet()) {
+        // for each sg
+        String storageGroupName = sgEntry.getKey();
+        List<DataPartitionQueryParam> dataPartitionQueryParams = sgEntry.getValue();
+        Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>
+            deviceToRegionsMap = new HashMap<>();
+        for (DataPartitionQueryParam dataPartitionQueryParam : dataPartitionQueryParams) {
+          // for each device
+          String deviceId = dataPartitionQueryParam.getDevicePath();
+          DataRegionId dataRegionId =
+              localConfigNode.getBelongedDataRegionRegionId(new PartialPath(deviceId));
+          Map<TimePartitionSlot, List<RegionReplicaSet>> timePartitionToRegionsMap =
+              new HashMap<>();
+          for (TimePartitionSlot timePartitionSlot :
+              dataPartitionQueryParam.getTimePartitionSlotList()) {
+            // for each time partition
+            timePartitionToRegionsMap.put(
+                timePartitionSlot,
+                Collections.singletonList(
+                    new RegionReplicaSet(dataRegionId, Collections.EMPTY_LIST)));
+          }
+          deviceToRegionsMap.put(new SeriesPartitionSlot(), timePartitionToRegionsMap);
+        }
+        dataPartitionMap.put(storageGroupName, deviceToRegionsMap);
+      }
+      return new DataPartition(
+          dataPartitionMap,
+          IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
+          IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
+    } catch (MetadataException | DataRegionException e) {
+      throw new StatementAnalyzeException("An error occurred when executing getDataPartition()");
+    }
   }
 
   @Override
   public DataPartition getOrCreateDataPartition(
       Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
-    return null;
+    return getDataPartition(sgNameToQueryParamsMap);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandaloneSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandaloneSchemaFetcher.java
index e1deb0f9b5..81d1702710 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandaloneSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandaloneSchemaFetcher.java
@@ -18,15 +18,29 @@
  */
 package org.apache.iotdb.db.mpp.sql.analyze;
 
+import org.apache.iotdb.commons.consensus.SchemaRegionId;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.LocalConfigNode;
 import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
+import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
 import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 public class StandaloneSchemaFetcher implements ISchemaFetcher {
 
+  private final LocalConfigNode localConfigNode = LocalConfigNode.getInstance();
+  private final SchemaEngine schemaEngine = SchemaEngine.getInstance();
+
   private StandaloneSchemaFetcher() {}
 
   public static StandaloneSchemaFetcher getInstance() {
@@ -35,7 +49,22 @@ public class StandaloneSchemaFetcher implements ISchemaFetcher {
 
   @Override
   public SchemaTree fetchSchema(PathPatternTree patternTree) {
-    return null;
+    Set<String> storageGroupSet = new HashSet<>();
+    SchemaTree schemaTree = new SchemaTree();
+    List<PartialPath> partialPathList = patternTree.splitToPathList();
+    try {
+      for (PartialPath path : partialPathList) {
+        String storageGroup = localConfigNode.getBelongedStorageGroup(path).getFullPath();
+        storageGroupSet.add(storageGroup);
+        SchemaRegionId schemaRegionId = localConfigNode.getBelongedSchemaRegionId(path);
+        ISchemaRegion schemaRegion = schemaEngine.getSchemaRegion(schemaRegionId);
+        schemaTree.appendMeasurementPaths(schemaRegion.getMeasurementPaths(path, false));
+      }
+    } catch (MetadataException e) {
+      throw new RuntimeException(e);
+    }
+    schemaTree.setStorageGroups(new ArrayList<>(storageGroupSet));
+    return schemaTree;
   }
 
   @Override
@@ -50,6 +79,11 @@ public class StandaloneSchemaFetcher implements ISchemaFetcher {
       List<String[]> measurements,
       List<TSDataType[]> tsDataTypes,
       List<Boolean> aligned) {
-    return null;
+    Map<PartialPath, List<String>> deviceToMeasurementMap = new HashMap<>();
+    for (int i = 0; i < devicePath.size(); i++) {
+      deviceToMeasurementMap.put(devicePath.get(i), Arrays.asList(measurements.get(i)));
+    }
+    // todo implement auto create schema
+    return fetchSchema(new PathPatternTree(deviceToMeasurementMap));
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index f2285abbc3..e06451f7a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -159,6 +159,7 @@ public class DataNode implements DataNodeMBean {
   public void active() throws StartupException {
     // set the mpp mode to true
     IoTDBDescriptor.getInstance().getConfig().setMppMode(true);
+    IoTDB.getInstance().setClusterMode();
     // start iotdb server first
     IoTDB.getInstance().active();
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
index 3de0a28cc2..091548c8d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
@@ -31,6 +31,8 @@ import org.apache.iotdb.db.mpp.sql.analyze.ClusterPartitionFetcher;
 import org.apache.iotdb.db.mpp.sql.analyze.ClusterSchemaFetcher;
 import org.apache.iotdb.db.mpp.sql.analyze.IPartitionFetcher;
 import org.apache.iotdb.db.mpp.sql.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.sql.analyze.StandalonePartitionFetcher;
+import org.apache.iotdb.db.mpp.sql.analyze.StandaloneSchemaFetcher;
 import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
 import org.apache.iotdb.db.mpp.sql.statement.Statement;
 import org.apache.iotdb.db.mpp.sql.statement.crud.InsertMultiTabletsStatement;
@@ -39,6 +41,7 @@ import org.apache.iotdb.db.mpp.sql.statement.crud.InsertRowsOfOneDeviceStatement
 import org.apache.iotdb.db.mpp.sql.statement.crud.InsertRowsStatement;
 import org.apache.iotdb.db.mpp.sql.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
 import org.apache.iotdb.db.service.metrics.MetricsService;
 import org.apache.iotdb.db.service.metrics.Operation;
@@ -74,9 +77,19 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
 
   private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
 
-  private static final IPartitionFetcher PARTITION_FETCHER = ClusterPartitionFetcher.getInstance();
+  private final IPartitionFetcher PARTITION_FETCHER;
 
-  private static final ISchemaFetcher SCHEMA_FETCHER = ClusterSchemaFetcher.getInstance();
+  private final ISchemaFetcher SCHEMA_FETCHER;
+
+  public DataNodeTSIServiceImpl() {
+    if (IoTDB.getInstance().isClusterMode()) {
+      PARTITION_FETCHER = ClusterPartitionFetcher.getInstance();
+      SCHEMA_FETCHER = ClusterSchemaFetcher.getInstance();
+    } else {
+      PARTITION_FETCHER = StandalonePartitionFetcher.getInstance();
+      SCHEMA_FETCHER = StandaloneSchemaFetcher.getInstance();
+    }
+  }
 
   @Override
   public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException {