You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/04/26 08:56:14 UTC

[iotdb] branch rel/1.1 updated: [IOTDB-5721][To rel/1.1] Streaming query DataPartition and Schema while loading TsFile (#9697)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.1 by this push:
     new 1096fe4590 [IOTDB-5721][To rel/1.1] Streaming query DataPartition and Schema while loading TsFile (#9697)
1096fe4590 is described below

commit 1096fe459084f0d918b264ef989d87228ef9ee74
Author: yschengzi <87...@users.noreply.github.com>
AuthorDate: Wed Apr 26 16:56:07 2023 +0800

    [IOTDB-5721][To rel/1.1] Streaming query DataPartition and Schema while loading TsFile (#9697)
---
 .../resources/conf/iotdb-common.properties         |   5 +
 .../apache/iotdb/commons/conf/CommonConfig.java    |  10 +
 .../iotdb/commons/conf/CommonDescriptor.java       |   8 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   6 +
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  |  68 +++----
 .../db/mpp/plan/execution/QueryExecution.java      |   6 +-
 .../plan/node/load/LoadSingleTsFileNode.java       |  59 +++---
 .../planner/plan/node/load/LoadTsFileNode.java     |  16 +-
 .../scheduler/load/LoadTsFileDispatcherImpl.java   |  20 +-
 .../plan/scheduler/load/LoadTsFileScheduler.java   | 201 ++++++++++++++++-----
 10 files changed, 244 insertions(+), 155 deletions(-)

diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index e8efbc0771..df384af89c 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -84,6 +84,11 @@ cluster_name=defaultCluster
 # Datatype: String
 # series_partition_executor_class=org.apache.iotdb.commons.partition.executor.hash.BKDRHashExecutor
 
+# The limit of the TTimePartitionSlot allowed to be transmitted between DataNode and ConfigNode
+# Mainly used to balance communication efficiency when loading very large TsFile
+# Datatype: Integer
+# time_partition_slot_transmit_limit=1000
+
 
 # The policy of extension SchemaRegionGroup for each Database.
 # These policies are currently supported:
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index bc6e07c36b..91c3653cb3 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -120,6 +120,8 @@ public class CommonConfig {
 
   private volatile String statusReason = null;
 
+  private int TTimePartitionSlotTransmitLimit = 1000;
+
   /** Disk Monitor */
   private double diskSpaceWarningThreshold = 0.05;
 
@@ -363,6 +365,14 @@ public class CommonConfig {
     this.targetMLNodeEndPoint = targetMLNodeEndPoint;
   }
 
+  public int getTTimePartitionSlotTransmitLimit() {
+    return TTimePartitionSlotTransmitLimit;
+  }
+
+  public void setTTimePartitionSlotTransmitLimit(int TTimePartitionSlotTransmitLimit) {
+    this.TTimePartitionSlotTransmitLimit = TTimePartitionSlotTransmitLimit;
+  }
+
   public boolean isStopping() {
     return isStopping;
   }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 5aa42bbe5f..d00a1e03df 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -188,6 +188,14 @@ public class CommonDescriptor {
                     String.valueOf(config.getDiskSpaceWarningThreshold()))
                 .trim()));
 
+    config.setTTimePartitionSlotTransmitLimit(
+        Integer.parseInt(
+            properties
+                .getProperty(
+                    "time_partition_slot_transmit_limit",
+                    String.valueOf(config.getTTimePartitionSlotTransmitLimit()))
+                .trim()));
+
     String endPointUrl =
         properties.getProperty(
             "target_ml_node_endpoint",
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 33c1cc3bfa..db3a6268d3 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -159,6 +159,8 @@ public class IoTDBConfig {
   /** The proportion of write memory for loading TsFile */
   private double loadTsFileProportion = 0.125;
 
+  private final int maxLoadingDeviceNumber = 10000;
+
   /**
    * If memory cost of data region increased more than proportion of {@linkplain
    * IoTDBConfig#getAllocateMemoryForStorageEngine()}*{@linkplain
@@ -3277,6 +3279,10 @@ public class IoTDBConfig {
     return loadTsFileProportion;
   }
 
+  public int getMaxLoadingDeviceNumber() {
+    return maxLoadingDeviceNumber;
+  }
+
   public static String getEnvironmentVariables() {
     return "\n\t"
         + IoTDBConstant.IOTDB_HOME
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 2b394113d4..facc6b8ff9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -2003,8 +2003,6 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
   public Analysis visitLoadFile(LoadTsFileStatement loadTsFileStatement, MPPQueryContext context) {
     context.setQueryType(QueryType.WRITE);
 
-    Map<String, Long> device2MinTime = new HashMap<>();
-    Map<String, Long> device2MaxTime = new HashMap<>();
     Map<String, Map<MeasurementSchema, File>> device2Schemas = new HashMap<>();
     Map<String, Pair<Boolean, File>> device2IsAligned = new HashMap<>();
 
@@ -2019,13 +2017,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
       }
       try {
         TsFileResource resource =
-            analyzeTsFile(
-                loadTsFileStatement,
-                tsFile,
-                device2MinTime,
-                device2MaxTime,
-                device2Schemas,
-                device2IsAligned);
+            analyzeTsFile(loadTsFileStatement, tsFile, device2Schemas, device2IsAligned);
         loadTsFileStatement.addTsFileResource(resource);
       } catch (IllegalArgumentException e) {
         logger.warn(
@@ -2039,9 +2031,27 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
         throw new SemanticException(
             String.format("Parse file %s to resource error", tsFile.getPath()));
       }
+      if (device2Schemas.size() > CONFIG.getMaxLoadingDeviceNumber()) {
+        autoCreateAndVerifySchema(loadTsFileStatement, device2Schemas, device2IsAligned);
+      }
     }
 
-    // auto create and verify schema
+    autoCreateAndVerifySchema(loadTsFileStatement, device2Schemas, device2IsAligned);
+
+    // load function will query data partition in scheduler
+    Analysis analysis = new Analysis();
+    analysis.setStatement(loadTsFileStatement);
+    return analysis;
+  }
+
+  private void autoCreateAndVerifySchema(
+      LoadTsFileStatement loadTsFileStatement,
+      Map<String, Map<MeasurementSchema, File>> device2Schemas,
+      Map<String, Pair<Boolean, File>> device2IsAligned)
+      throws SemanticException {
+    if (device2Schemas.isEmpty()) {
+      return;
+    }
     try {
       if (loadTsFileStatement.isVerifySchema()) {
         verifyLoadingMeasurements(device2Schemas);
@@ -2063,27 +2073,10 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
           String.format(
               "Auto create or verify schema error when executing statement %s.",
               loadTsFileStatement));
+    } finally {
+      device2Schemas.clear();
+      device2IsAligned.clear();
     }
-
-    // construct partition info
-    List<DataPartitionQueryParam> params = new ArrayList<>();
-    for (Map.Entry<String, Long> entry : device2MinTime.entrySet()) {
-      List<TTimePartitionSlot> timePartitionSlots = new ArrayList<>();
-      String device = entry.getKey();
-      long endTime = device2MaxTime.get(device);
-      long interval = TimePartitionUtils.timePartitionInterval;
-      long time = (entry.getValue() / interval) * interval;
-      for (; time <= endTime; time += interval) {
-        timePartitionSlots.add(TimePartitionUtils.getTimePartition(time));
-      }
-
-      DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
-      dataPartitionQueryParam.setDevicePath(device);
-      dataPartitionQueryParam.setTimePartitionSlotList(timePartitionSlots);
-      params.add(dataPartitionQueryParam);
-    }
-
-    return getAnalysisForWriting(loadTsFileStatement, params);
   }
 
   /** get analysis according to statement and params */
@@ -2106,8 +2099,6 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
   private TsFileResource analyzeTsFile(
       LoadTsFileStatement statement,
       File tsFile,
-      Map<String, Long> device2MinTime,
-      Map<String, Long> device2MaxTime,
       Map<String, Map<MeasurementSchema, File>> device2Schemas,
       Map<String, Pair<Boolean, File>> device2IsAligned)
       throws IOException, VerifyMetadataException {
@@ -2163,19 +2154,6 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
         resource.deserialize();
       }
 
-      // construct device time range
-      for (String device : resource.getDevices()) {
-        device2MinTime.put(
-            device,
-            Math.min(
-                device2MinTime.getOrDefault(device, Long.MAX_VALUE),
-                resource.getStartTime(device)));
-        device2MaxTime.put(
-            device,
-            Math.max(
-                device2MaxTime.getOrDefault(device, Long.MIN_VALUE), resource.getEndTime(device)));
-      }
-
       resource.setStatus(TsFileResourceStatus.CLOSED);
       return resource;
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 94fb69067e..b0f6861b55 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -297,7 +297,11 @@ public class QueryExecution implements IQueryExecution {
     if (rawStatement instanceof LoadTsFileStatement) {
       this.scheduler =
           new LoadTsFileScheduler(
-              distributedPlan, context, stateMachine, syncInternalServiceClientManager);
+              distributedPlan,
+              context,
+              stateMachine,
+              syncInternalServiceClientManager,
+              partitionFetcher);
       this.scheduler.start();
       return;
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadSingleTsFileNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadSingleTsFileNode.java
index 1c965e5e69..fdc15a0320 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadSingleTsFileNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadSingleTsFileNode.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
-import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -33,6 +32,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.db.utils.TimePartitionUtils;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,9 +42,11 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.function.Function;
 
 public class LoadSingleTsFileNode extends WritePlanNode {
   private static final Logger logger = LoggerFactory.getLogger(LoadSingleTsFileNode.class);
@@ -55,47 +57,36 @@ public class LoadSingleTsFileNode extends WritePlanNode {
   private boolean deleteAfterLoad;
 
   private TRegionReplicaSet localRegionReplicaSet;
-  private DataPartition dataPartition;
 
   public LoadSingleTsFileNode(PlanNodeId id) {
     super(id);
   }
 
-  public LoadSingleTsFileNode(
-      PlanNodeId id,
-      TsFileResource resource,
-      boolean deleteAfterLoad,
-      DataPartition dataPartition) {
+  public LoadSingleTsFileNode(PlanNodeId id, TsFileResource resource, boolean deleteAfterLoad) {
     super(id);
     this.tsFile = resource.getTsFile();
     this.resource = resource;
     this.deleteAfterLoad = deleteAfterLoad;
-    this.dataPartition = dataPartition;
-  }
-
-  public void checkIfNeedDecodeTsFile() throws IOException {
-    Set<TRegionReplicaSet> allRegionReplicaSet = new HashSet<>();
-    TTimePartitionSlot timePartitionSlot = null;
-    needDecodeTsFile = false;
-    for (String device : resource.getDevices()) {
-      TTimePartitionSlot startSlot =
-          TimePartitionUtils.getTimePartition(resource.getStartTime(device));
-      if (timePartitionSlot == null) {
-        timePartitionSlot = startSlot;
-      }
-      if (!startSlot.equals(timePartitionSlot)
-          || !TimePartitionUtils.getTimePartition(resource.getEndTime(device))
-              .equals(timePartitionSlot)) {
-        needDecodeTsFile = true;
-        return;
-      }
-      allRegionReplicaSet.add(
-          dataPartition.getDataRegionReplicaSetForWriting(device, timePartitionSlot));
-    }
-    needDecodeTsFile = !isDispatchedToLocal(allRegionReplicaSet);
+  }
+
+  public boolean needDecodeTsFile(
+      Function<List<Pair<String, TTimePartitionSlot>>, List<TRegionReplicaSet>> partitionFetcher)
+      throws IOException {
+    List<Pair<String, TTimePartitionSlot>> slotList = new ArrayList<>();
+    resource
+        .getDevices()
+        .forEach(
+            o -> {
+              slotList.add(
+                  new Pair<>(o, TimePartitionUtils.getTimePartition(resource.getStartTime(o))));
+              slotList.add(
+                  new Pair<>(o, TimePartitionUtils.getTimePartition(resource.getEndTime(o))));
+            });
+    needDecodeTsFile = !isDispatchedToLocal(new HashSet<>(partitionFetcher.apply(slotList)));
     if (!needDecodeTsFile && !resource.resourceFileExists()) {
       resource.serialize();
     }
+    return needDecodeTsFile;
   }
 
   private boolean isDispatchedToLocal(Set<TRegionReplicaSet> replicaSets) {
@@ -118,10 +109,6 @@ public class LoadSingleTsFileNode extends WritePlanNode {
         && IoTDBDescriptor.getInstance().getConfig().getInternalPort() == endPoint.port;
   }
 
-  public boolean needDecodeTsFile() {
-    return needDecodeTsFile;
-  }
-
   public boolean isDeleteAfterLoad() {
     return deleteAfterLoad;
   }
@@ -135,10 +122,6 @@ public class LoadSingleTsFileNode extends WritePlanNode {
     return localRegionReplicaSet;
   }
 
-  public DataPartition getDataPartition() {
-    return dataPartition;
-  }
-
   public TsFileResource getTsFileResource() {
     return resource;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadTsFileNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadTsFileNode.java
index db2a1db1ab..4ab89f5e77 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadTsFileNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadTsFileNode.java
@@ -90,21 +90,7 @@ public class LoadTsFileNode extends WritePlanNode {
     List<WritePlanNode> res = new ArrayList<>();
     LoadTsFileStatement statement = (LoadTsFileStatement) analysis.getStatement();
     for (TsFileResource resource : resources) {
-      try {
-        LoadSingleTsFileNode singleTsFileNode =
-            new LoadSingleTsFileNode(
-                getPlanNodeId(),
-                resource,
-                statement.isDeleteAfterLoad(),
-                analysis.getDataPartitionInfo());
-        singleTsFileNode.checkIfNeedDecodeTsFile();
-        res.add(singleTsFileNode);
-      } catch (Exception e) {
-        logger.error(
-            String.format(
-                "Check whether TsFile %s need decode or not error", resource.getTsFile().getPath()),
-            e);
-      }
+      res.add(new LoadSingleTsFileNode(getPlanNodeId(), resource, statement.isDeleteAfterLoad()));
     }
     return res;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileDispatcherImpl.java
index 418bd8aa99..f527232e48 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileDispatcherImpl.java
@@ -106,13 +106,22 @@ public class LoadTsFileDispatcherImpl implements IFragInstanceDispatcher {
 
   private void dispatchOneInstance(FragmentInstance instance)
       throws FragmentInstanceDispatchException {
+    TTsFilePieceReq loadTsFileReq = null;
+
     for (TDataNodeLocation dataNodeLocation :
         instance.getRegionReplicaSet().getDataNodeLocations()) {
       TEndPoint endPoint = dataNodeLocation.getInternalEndPoint();
       if (isDispatchedToLocal(endPoint)) {
         dispatchLocally(instance);
       } else {
-        dispatchRemote(instance, endPoint);
+        if (loadTsFileReq == null) {
+          loadTsFileReq =
+              new TTsFilePieceReq(
+                  instance.getFragment().getPlanNodeTree().serializeToByteBuffer(),
+                  uuid,
+                  instance.getRegionReplicaSet().getRegionId());
+        }
+        dispatchRemote(loadTsFileReq, endPoint);
       }
     }
   }
@@ -121,15 +130,10 @@ public class LoadTsFileDispatcherImpl implements IFragInstanceDispatcher {
     return this.localhostIpAddr.equals(endPoint.getIp()) && localhostInternalPort == endPoint.port;
   }
 
-  private void dispatchRemote(FragmentInstance instance, TEndPoint endPoint)
+  private void dispatchRemote(TTsFilePieceReq loadTsFileReq, TEndPoint endPoint)
       throws FragmentInstanceDispatchException {
     try (SyncDataNodeInternalServiceClient client =
         internalServiceClientManager.borrowClient(endPoint)) {
-      TTsFilePieceReq loadTsFileReq =
-          new TTsFilePieceReq(
-              instance.getFragment().getPlanNodeTree().serializeToByteBuffer(),
-              uuid,
-              instance.getRegionReplicaSet().getRegionId());
       TLoadResp loadResp = client.sendTsFilePieceNode(loadTsFileReq);
       if (!loadResp.isAccepted()) {
         logger.warn(loadResp.message);
@@ -165,7 +169,7 @@ public class LoadTsFileDispatcherImpl implements IFragInstanceDispatcher {
       if (!RpcUtils.SUCCESS_STATUS.equals(resultStatus)) {
         throw new FragmentInstanceDispatchException(resultStatus);
       }
-    } else if (planNode instanceof LoadSingleTsFileNode) { // do not need split
+    } else if (planNode instanceof LoadSingleTsFileNode) { // do not need to split
       try {
         StorageEngine.getInstance()
             .getDataRegion((DataRegionId) groupId)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
index e37449055b..435768c1d2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
@@ -22,8 +22,12 @@ package org.apache.iotdb.db.mpp.plan.scheduler.load;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.partition.DataPartition;
+import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
 import org.apache.iotdb.commons.partition.StorageExecutor;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -36,6 +40,7 @@ import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInfo;
+import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
 import org.apache.iotdb.db.mpp.plan.planner.plan.DistributedQueryPlan;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.mpp.plan.planner.plan.PlanFragment;
@@ -45,6 +50,7 @@ import org.apache.iotdb.db.mpp.plan.scheduler.FragInstanceDispatchResult;
 import org.apache.iotdb.db.mpp.plan.scheduler.IScheduler;
 import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import io.airlift.units.Duration;
 import org.slf4j.Logger;
@@ -66,6 +72,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 /**
  * {@link LoadTsFileScheduler} is used for scheduling {@link LoadSingleTsFileNode} and {@link
@@ -76,16 +83,24 @@ import java.util.stream.Collectors;
  */
 public class LoadTsFileScheduler implements IScheduler {
   private static final Logger logger = LoggerFactory.getLogger(LoadTsFileScheduler.class);
-  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   public static final long LOAD_TASK_MAX_TIME_IN_SECOND = 5184000L; // one day
-  private static final long MAX_MEMORY_SIZE =
-      Math.min(
-          config.getThriftMaxFrameSize() / 2,
-          (long) (config.getAllocateMemoryForStorageEngine() * config.getLoadTsFileProportion()));
+  private static final long MAX_MEMORY_SIZE;
+  private static final int TRANSMIT_LIMIT;
+
+  static {
+    IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+    MAX_MEMORY_SIZE =
+        Math.min(
+            config.getThriftMaxFrameSize() >> 2,
+            (long) (config.getAllocateMemoryForStorageEngine() * config.getLoadTsFileProportion()));
+    TRANSMIT_LIMIT =
+        CommonDescriptor.getInstance().getConfig().getTTimePartitionSlotTransmitLimit();
+  }
 
   private final MPPQueryContext queryContext;
   private final QueryStateMachine stateMachine;
   private final LoadTsFileDispatcherImpl dispatcher;
+  private final DataPartitionBatchFetcher partitionFetcher;
   private final List<LoadSingleTsFileNode> tsFileNodeList;
   private final PlanFragmentId fragmentId;
 
@@ -95,12 +110,14 @@ public class LoadTsFileScheduler implements IScheduler {
       DistributedQueryPlan distributedQueryPlan,
       MPPQueryContext queryContext,
       QueryStateMachine stateMachine,
-      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
+      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager,
+      IPartitionFetcher partitionFetcher) {
     this.queryContext = queryContext;
     this.stateMachine = stateMachine;
     this.tsFileNodeList = new ArrayList<>();
     this.fragmentId = distributedQueryPlan.getRootSubPlan().getPlanFragment().getId();
     this.dispatcher = new LoadTsFileDispatcherImpl(internalServiceClientManager);
+    this.partitionFetcher = new DataPartitionBatchFetcher(partitionFetcher);
     this.allReplicaSets = new HashSet<>();
 
     for (FragmentInstance fragmentInstance : distributedQueryPlan.getInstances()) {
@@ -111,31 +128,57 @@ public class LoadTsFileScheduler implements IScheduler {
   @Override
   public void start() {
     stateMachine.transitionToRunning();
-    for (LoadSingleTsFileNode node : tsFileNodeList) {
-      if (!node.needDecodeTsFile()) {
-        boolean isLoadLocallySuccess = loadLocally(node);
-
-        node.clean();
-        if (!isLoadLocallySuccess) {
-          return;
+    int tsFileNodeListSize = tsFileNodeList.size();
+    boolean isLoadSuccess = true;
+
+    for (int i = 0; i < tsFileNodeListSize; ++i) {
+      LoadSingleTsFileNode node = tsFileNodeList.get(i);
+      boolean isLoadSingleTsFileSuccess = true;
+      try {
+        if (!node.needDecodeTsFile(
+            partitionFetcher::queryDataPartition)) { // do not decode, load locally
+          isLoadSingleTsFileSuccess = loadLocally(node);
+          node.clean();
+
+        } else { // need decode, load locally or remotely, use two phases method
+          String uuid = UUID.randomUUID().toString();
+          dispatcher.setUuid(uuid);
+          allReplicaSets.clear();
+
+          boolean isFirstPhaseSuccess = firstPhase(node);
+          boolean isSecondPhaseSuccess =
+              secondPhase(isFirstPhaseSuccess, uuid, node.getTsFileResource().getTsFile());
+
+          node.clean();
+          if (!isFirstPhaseSuccess || !isSecondPhaseSuccess) {
+            isLoadSingleTsFileSuccess = false;
+          }
         }
-        continue;
-      }
-
-      String uuid = UUID.randomUUID().toString();
-      dispatcher.setUuid(uuid);
-      allReplicaSets.clear();
-
-      boolean isFirstPhaseSuccess = firstPhase(node);
-      boolean isSecondPhaseSuccess =
-          secondPhase(isFirstPhaseSuccess, uuid, node.getTsFileResource().getTsFile());
-
-      node.clean();
-      if (!isFirstPhaseSuccess || !isSecondPhaseSuccess) {
-        return;
+        if (isLoadSingleTsFileSuccess) {
+          logger.info(
+              String.format(
+                  "Load TsFile %s Successfully, load process [%s/%s]",
+                  node.getTsFileResource().getTsFilePath(), i + 1, tsFileNodeListSize));
+        } else {
+          isLoadSuccess = false;
+          logger.warn(
+              String.format(
+                  "Can not Load TsFile %s, load process [%s/%s]",
+                  node.getTsFileResource().getTsFilePath(), i + 1, tsFileNodeListSize));
+        }
+      } catch (Exception e) {
+        isLoadSuccess = false;
+        stateMachine.transitionToFailed(e);
+        logger.warn(
+            String.format(
+                "LoadTsFileScheduler loads TsFile %s error",
+                node.getTsFileResource().getTsFilePath()),
+            e);
       }
     }
-    stateMachine.transitionToFinished();
+    if (isLoadSuccess) {
+      stateMachine.transitionToFinished();
+    }
   }
 
   private boolean firstPhase(LoadSingleTsFileNode node) {
@@ -145,9 +188,11 @@ public class LoadTsFileScheduler implements IScheduler {
               node.getTsFileResource().getTsFile(), tsFileDataManager::addOrSendTsFileData)
           .splitTsFileByDataPartition();
       if (!tsFileDataManager.sendAllTsFileData()) {
+        stateMachine.transitionToFailed(new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()));
         return false;
       }
     } catch (IllegalStateException e) {
+      stateMachine.transitionToFailed(e);
       logger.warn(
           String.format(
               "Dispatch TsFileData error when parsing TsFile %s.",
@@ -310,18 +355,20 @@ public class LoadTsFileScheduler implements IScheduler {
     ROLLBACK
   }
 
-  private class TsFileDataManager {
+  private static class TsFileDataManager {
     private final LoadTsFileScheduler scheduler;
     private final LoadSingleTsFileNode singleTsFileNode;
 
     private long dataSize;
-    private Map<TRegionReplicaSet, LoadTsFilePieceNode> replicaSet2Piece;
+    private final Map<TRegionReplicaSet, LoadTsFilePieceNode> replicaSet2Piece;
+    private final List<ChunkData> nonDirectionalChunkData;
 
     public TsFileDataManager(LoadTsFileScheduler scheduler, LoadSingleTsFileNode singleTsFileNode) {
       this.scheduler = scheduler;
       this.singleTsFileNode = singleTsFileNode;
       this.dataSize = 0;
       this.replicaSet2Piece = new HashMap<>();
+      this.nonDirectionalChunkData = new ArrayList<>();
     }
 
     private boolean addOrSendTsFileData(TsFileData tsFileData) {
@@ -331,16 +378,13 @@ public class LoadTsFileScheduler implements IScheduler {
     }
 
     private boolean addOrSendChunkData(ChunkData chunkData) {
-      TRegionReplicaSet replicaSet =
-          singleTsFileNode
-              .getDataPartition()
-              .getDataRegionReplicaSetForWriting(
-                  chunkData.getDevice(), chunkData.getTimePartitionSlot());
-      dataSize +=
-          (1 + replicaSet.getDataNodeLocationsSize())
-              * chunkData.getDataSize(); // should multiply datanode factor
+      nonDirectionalChunkData.add(chunkData);
+      dataSize += chunkData.getDataSize();
 
       if (dataSize > MAX_MEMORY_SIZE) {
+        routeChunkData();
+
+        // start to dispatch from the biggest TsFilePieceNode
         List<TRegionReplicaSet> sortedReplicaSets =
             replicaSet2Piece.keySet().stream()
                 .sorted(
@@ -356,30 +400,50 @@ public class LoadTsFileScheduler implements IScheduler {
             return false;
           }
 
-          dataSize -= (1 + sortedReplicaSet.getDataNodeLocationsSize()) * pieceNode.getDataSize();
+          dataSize -= pieceNode.getDataSize();
           replicaSet2Piece.put(
               sortedReplicaSet,
               new LoadTsFilePieceNode(
                   singleTsFileNode.getPlanNodeId(),
-                  singleTsFileNode.getTsFileResource().getTsFile()));
+                  singleTsFileNode
+                      .getTsFileResource()
+                      .getTsFile())); // can not just remove, because of deletion
           if (dataSize <= MAX_MEMORY_SIZE) {
             break;
           }
         }
       }
 
-      replicaSet2Piece
-          .computeIfAbsent(
-              replicaSet,
-              o ->
-                  new LoadTsFilePieceNode(
-                      singleTsFileNode.getPlanNodeId(),
-                      singleTsFileNode.getTsFileResource().getTsFile()))
-          .addTsFileData(chunkData);
       return true;
     }
 
+    private void routeChunkData() {
+      if (nonDirectionalChunkData.isEmpty()) {
+        return;
+      }
+
+      List<TRegionReplicaSet> replicaSets =
+          scheduler.partitionFetcher.queryDataPartition(
+              nonDirectionalChunkData.stream()
+                  .map(data -> new Pair<>(data.getDevice(), data.getTimePartitionSlot()))
+                  .collect(Collectors.toList()));
+      IntStream.range(0, nonDirectionalChunkData.size())
+          .forEach(
+              i ->
+                  replicaSet2Piece
+                      .computeIfAbsent(
+                          replicaSets.get(i),
+                          o ->
+                              new LoadTsFilePieceNode(
+                                  singleTsFileNode.getPlanNodeId(),
+                                  singleTsFileNode.getTsFileResource().getTsFile()))
+                      .addTsFileData(nonDirectionalChunkData.get(i)));
+      nonDirectionalChunkData.clear();
+    }
+
     private boolean addOrSendDeletionData(TsFileData deletionData) {
+      routeChunkData(); // ensure chunk data will be added before deletion
+
       for (Map.Entry<TRegionReplicaSet, LoadTsFilePieceNode> entry : replicaSet2Piece.entrySet()) {
         dataSize += deletionData.getDataSize();
         entry.getValue().addTsFileData(deletionData);
@@ -388,6 +452,8 @@ public class LoadTsFileScheduler implements IScheduler {
     }
 
     private boolean sendAllTsFileData() {
+      routeChunkData();
+
       for (Map.Entry<TRegionReplicaSet, LoadTsFilePieceNode> entry : replicaSet2Piece.entrySet()) {
         if (!scheduler.dispatchOnePieceNode(entry.getValue(), entry.getKey())) {
           logger.warn(
@@ -400,4 +466,43 @@ public class LoadTsFileScheduler implements IScheduler {
       return true;
     }
   }
+
+  private static class DataPartitionBatchFetcher {
+    private final IPartitionFetcher fetcher;
+
+    public DataPartitionBatchFetcher(IPartitionFetcher fetcher) {
+      this.fetcher = fetcher;
+    }
+
+    public List<TRegionReplicaSet> queryDataPartition(
+        List<Pair<String, TTimePartitionSlot>> slotList) {
+      List<TRegionReplicaSet> replicaSets = new ArrayList<>();
+      int size = slotList.size();
+
+      for (int i = 0; i < size; i += TRANSMIT_LIMIT) {
+        List<Pair<String, TTimePartitionSlot>> subSlotList =
+            slotList.subList(i, Math.min(size, i + TRANSMIT_LIMIT));
+        DataPartition dataPartition = fetcher.getOrCreateDataPartition(toQueryParam(subSlotList));
+        replicaSets.addAll(
+            subSlotList.stream()
+                .map(pair -> dataPartition.getDataRegionReplicaSetForWriting(pair.left, pair.right))
+                .collect(Collectors.toList()));
+      }
+      return replicaSets;
+    }
+
+    private List<DataPartitionQueryParam> toQueryParam(
+        List<Pair<String, TTimePartitionSlot>> slots) {
+      return slots.stream()
+          .collect(
+              Collectors.groupingBy(
+                  Pair::getLeft, Collectors.mapping(Pair::getRight, Collectors.toSet())))
+          .entrySet()
+          .stream()
+          .map(
+              entry ->
+                  new DataPartitionQueryParam(entry.getKey(), new ArrayList<>(entry.getValue())))
+          .collect(Collectors.toList());
+    }
+  }
 }