You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/04/25 01:57:03 UTC
[iotdb] branch master updated: [IoTDB-5721] Streaming query DataPartition and Schema while loading TsFile (#9684)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4020214423 [IoTDB-5721] Streaming query DataPartition and Schema while loading TsFile (#9684)
4020214423 is described below
commit 4020214423520d8913f17979ade4e7f5b196e26f
Author: yschengzi <87...@users.noreply.github.com>
AuthorDate: Tue Apr 25 09:56:57 2023 +0800
[IoTDB-5721] Streaming query DataPartition and Schema while loading TsFile (#9684)
* streaming query data partition
* only serialize once in dispatcher
* streaming auto create and verify schema
* fix IT: add routeChunkData in addOrSendDeletionData and sendAllTsFileData
---
.../resources/conf/iotdb-common.properties | 5 +
.../apache/iotdb/commons/conf/CommonConfig.java | 10 +
.../iotdb/commons/conf/CommonDescriptor.java | 8 +
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 6 +
.../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 68 +++----
.../db/mpp/plan/execution/QueryExecution.java | 6 +-
.../plan/node/load/LoadSingleTsFileNode.java | 59 +++---
.../planner/plan/node/load/LoadTsFileNode.java | 16 +-
.../scheduler/load/LoadTsFileDispatcherImpl.java | 20 +-
.../plan/scheduler/load/LoadTsFileScheduler.java | 201 ++++++++++++++++-----
10 files changed, 244 insertions(+), 155 deletions(-)
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 61b0bbc4a7..e5cc146ad4 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -84,6 +84,11 @@ cluster_name=defaultCluster
# Datatype: String
# series_partition_executor_class=org.apache.iotdb.commons.partition.executor.hash.BKDRHashExecutor
+# The limit of the TTimePartitionSlot allowed to be transmitted between DataNode and ConfigNode
+# Mainly used to balance communication efficiency when loading very large TsFile
+# Datatype: Integer
+# time_partition_slot_transmit_limit=1000
+
# The policy of extension SchemaRegionGroup for each Database.
# These policies are currently supported:
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index bc6e07c36b..91c3653cb3 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -120,6 +120,8 @@ public class CommonConfig {
private volatile String statusReason = null;
+ private int TTimePartitionSlotTransmitLimit = 1000;
+
/** Disk Monitor */
private double diskSpaceWarningThreshold = 0.05;
@@ -363,6 +365,14 @@ public class CommonConfig {
this.targetMLNodeEndPoint = targetMLNodeEndPoint;
}
+ public int getTTimePartitionSlotTransmitLimit() {
+ return TTimePartitionSlotTransmitLimit;
+ }
+
+ public void setTTimePartitionSlotTransmitLimit(int TTimePartitionSlotTransmitLimit) {
+ this.TTimePartitionSlotTransmitLimit = TTimePartitionSlotTransmitLimit;
+ }
+
public boolean isStopping() {
return isStopping;
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 5aa42bbe5f..d00a1e03df 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -188,6 +188,14 @@ public class CommonDescriptor {
String.valueOf(config.getDiskSpaceWarningThreshold()))
.trim()));
+ config.setTTimePartitionSlotTransmitLimit(
+ Integer.parseInt(
+ properties
+ .getProperty(
+ "time_partition_slot_transmit_limit",
+ String.valueOf(config.getTTimePartitionSlotTransmitLimit()))
+ .trim()));
+
String endPointUrl =
properties.getProperty(
"target_ml_node_endpoint",
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 982447e91b..6953644adf 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -159,6 +159,8 @@ public class IoTDBConfig {
/** The proportion of write memory for loading TsFile */
private double loadTsFileProportion = 0.125;
+ private final int maxLoadingDeviceNumber = 10000;
+
/**
* If memory cost of data region increased more than proportion of {@linkplain
* IoTDBConfig#getAllocateMemoryForStorageEngine()}*{@linkplain
@@ -3334,6 +3336,10 @@ public class IoTDBConfig {
return loadTsFileProportion;
}
+ public int getMaxLoadingDeviceNumber() {
+ return maxLoadingDeviceNumber;
+ }
+
public static String getEnvironmentVariables() {
return "\n\t"
+ IoTDBConstant.IOTDB_HOME
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 4733c9e08e..6ed340aff7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -1998,8 +1998,6 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
public Analysis visitLoadFile(LoadTsFileStatement loadTsFileStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
- Map<String, Long> device2MinTime = new HashMap<>();
- Map<String, Long> device2MaxTime = new HashMap<>();
Map<String, Map<MeasurementSchema, File>> device2Schemas = new HashMap<>();
Map<String, Pair<Boolean, File>> device2IsAligned = new HashMap<>();
@@ -2014,13 +2012,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
}
try {
TsFileResource resource =
- analyzeTsFile(
- loadTsFileStatement,
- tsFile,
- device2MinTime,
- device2MaxTime,
- device2Schemas,
- device2IsAligned);
+ analyzeTsFile(loadTsFileStatement, tsFile, device2Schemas, device2IsAligned);
loadTsFileStatement.addTsFileResource(resource);
} catch (IllegalArgumentException e) {
logger.warn(
@@ -2034,9 +2026,27 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
throw new SemanticException(
String.format("Parse file %s to resource error", tsFile.getPath()));
}
+ if (device2Schemas.size() > CONFIG.getMaxLoadingDeviceNumber()) {
+ autoCreateAndVerifySchema(loadTsFileStatement, device2Schemas, device2IsAligned);
+ }
}
- // auto create and verify schema
+ autoCreateAndVerifySchema(loadTsFileStatement, device2Schemas, device2IsAligned);
+
+ // load function will query data partition in scheduler
+ Analysis analysis = new Analysis();
+ analysis.setStatement(loadTsFileStatement);
+ return analysis;
+ }
+
+ private void autoCreateAndVerifySchema(
+ LoadTsFileStatement loadTsFileStatement,
+ Map<String, Map<MeasurementSchema, File>> device2Schemas,
+ Map<String, Pair<Boolean, File>> device2IsAligned)
+ throws SemanticException {
+ if (device2Schemas.isEmpty()) {
+ return;
+ }
try {
if (loadTsFileStatement.isVerifySchema()) {
verifyLoadingMeasurements(device2Schemas);
@@ -2058,27 +2068,10 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
String.format(
"Auto create or verify schema error when executing statement %s.",
loadTsFileStatement));
+ } finally {
+ device2Schemas.clear();
+ device2IsAligned.clear();
}
-
- // construct partition info
- List<DataPartitionQueryParam> params = new ArrayList<>();
- for (Map.Entry<String, Long> entry : device2MinTime.entrySet()) {
- List<TTimePartitionSlot> timePartitionSlots = new ArrayList<>();
- String device = entry.getKey();
- long endTime = device2MaxTime.get(device);
- long interval = TimePartitionUtils.timePartitionInterval;
- long time = (entry.getValue() / interval) * interval;
- for (; time <= endTime; time += interval) {
- timePartitionSlots.add(TimePartitionUtils.getTimePartition(time));
- }
-
- DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
- dataPartitionQueryParam.setDevicePath(device);
- dataPartitionQueryParam.setTimePartitionSlotList(timePartitionSlots);
- params.add(dataPartitionQueryParam);
- }
-
- return getAnalysisForWriting(loadTsFileStatement, params);
}
/** get analysis according to statement and params */
@@ -2101,8 +2094,6 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
private TsFileResource analyzeTsFile(
LoadTsFileStatement statement,
File tsFile,
- Map<String, Long> device2MinTime,
- Map<String, Long> device2MaxTime,
Map<String, Map<MeasurementSchema, File>> device2Schemas,
Map<String, Pair<Boolean, File>> device2IsAligned)
throws IOException, VerifyMetadataException {
@@ -2158,19 +2149,6 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
resource.deserialize();
}
- // construct device time range
- for (String device : resource.getDevices()) {
- device2MinTime.put(
- device,
- Math.min(
- device2MinTime.getOrDefault(device, Long.MAX_VALUE),
- resource.getStartTime(device)));
- device2MaxTime.put(
- device,
- Math.max(
- device2MaxTime.getOrDefault(device, Long.MIN_VALUE), resource.getEndTime(device)));
- }
-
resource.setStatus(TsFileResourceStatus.CLOSED);
return resource;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 08b0674b1e..7c4515ae5a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -300,7 +300,11 @@ public class QueryExecution implements IQueryExecution {
if (rawStatement instanceof LoadTsFileStatement) {
this.scheduler =
new LoadTsFileScheduler(
- distributedPlan, context, stateMachine, syncInternalServiceClientManager);
+ distributedPlan,
+ context,
+ stateMachine,
+ syncInternalServiceClientManager,
+ partitionFetcher);
this.scheduler.start();
return;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadSingleTsFileNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadSingleTsFileNode.java
index 1c965e5e69..fdc15a0320 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadSingleTsFileNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadSingleTsFileNode.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
-import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -33,6 +32,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
import org.apache.iotdb.db.utils.TimePartitionUtils;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,9 +42,11 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.function.Function;
public class LoadSingleTsFileNode extends WritePlanNode {
private static final Logger logger = LoggerFactory.getLogger(LoadSingleTsFileNode.class);
@@ -55,47 +57,36 @@ public class LoadSingleTsFileNode extends WritePlanNode {
private boolean deleteAfterLoad;
private TRegionReplicaSet localRegionReplicaSet;
- private DataPartition dataPartition;
public LoadSingleTsFileNode(PlanNodeId id) {
super(id);
}
- public LoadSingleTsFileNode(
- PlanNodeId id,
- TsFileResource resource,
- boolean deleteAfterLoad,
- DataPartition dataPartition) {
+ public LoadSingleTsFileNode(PlanNodeId id, TsFileResource resource, boolean deleteAfterLoad) {
super(id);
this.tsFile = resource.getTsFile();
this.resource = resource;
this.deleteAfterLoad = deleteAfterLoad;
- this.dataPartition = dataPartition;
- }
-
- public void checkIfNeedDecodeTsFile() throws IOException {
- Set<TRegionReplicaSet> allRegionReplicaSet = new HashSet<>();
- TTimePartitionSlot timePartitionSlot = null;
- needDecodeTsFile = false;
- for (String device : resource.getDevices()) {
- TTimePartitionSlot startSlot =
- TimePartitionUtils.getTimePartition(resource.getStartTime(device));
- if (timePartitionSlot == null) {
- timePartitionSlot = startSlot;
- }
- if (!startSlot.equals(timePartitionSlot)
- || !TimePartitionUtils.getTimePartition(resource.getEndTime(device))
- .equals(timePartitionSlot)) {
- needDecodeTsFile = true;
- return;
- }
- allRegionReplicaSet.add(
- dataPartition.getDataRegionReplicaSetForWriting(device, timePartitionSlot));
- }
- needDecodeTsFile = !isDispatchedToLocal(allRegionReplicaSet);
+ }
+
+ public boolean needDecodeTsFile(
+ Function<List<Pair<String, TTimePartitionSlot>>, List<TRegionReplicaSet>> partitionFetcher)
+ throws IOException {
+ List<Pair<String, TTimePartitionSlot>> slotList = new ArrayList<>();
+ resource
+ .getDevices()
+ .forEach(
+ o -> {
+ slotList.add(
+ new Pair<>(o, TimePartitionUtils.getTimePartition(resource.getStartTime(o))));
+ slotList.add(
+ new Pair<>(o, TimePartitionUtils.getTimePartition(resource.getEndTime(o))));
+ });
+ needDecodeTsFile = !isDispatchedToLocal(new HashSet<>(partitionFetcher.apply(slotList)));
if (!needDecodeTsFile && !resource.resourceFileExists()) {
resource.serialize();
}
+ return needDecodeTsFile;
}
private boolean isDispatchedToLocal(Set<TRegionReplicaSet> replicaSets) {
@@ -118,10 +109,6 @@ public class LoadSingleTsFileNode extends WritePlanNode {
&& IoTDBDescriptor.getInstance().getConfig().getInternalPort() == endPoint.port;
}
- public boolean needDecodeTsFile() {
- return needDecodeTsFile;
- }
-
public boolean isDeleteAfterLoad() {
return deleteAfterLoad;
}
@@ -135,10 +122,6 @@ public class LoadSingleTsFileNode extends WritePlanNode {
return localRegionReplicaSet;
}
- public DataPartition getDataPartition() {
- return dataPartition;
- }
-
public TsFileResource getTsFileResource() {
return resource;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadTsFileNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadTsFileNode.java
index db2a1db1ab..4ab89f5e77 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadTsFileNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadTsFileNode.java
@@ -90,21 +90,7 @@ public class LoadTsFileNode extends WritePlanNode {
List<WritePlanNode> res = new ArrayList<>();
LoadTsFileStatement statement = (LoadTsFileStatement) analysis.getStatement();
for (TsFileResource resource : resources) {
- try {
- LoadSingleTsFileNode singleTsFileNode =
- new LoadSingleTsFileNode(
- getPlanNodeId(),
- resource,
- statement.isDeleteAfterLoad(),
- analysis.getDataPartitionInfo());
- singleTsFileNode.checkIfNeedDecodeTsFile();
- res.add(singleTsFileNode);
- } catch (Exception e) {
- logger.error(
- String.format(
- "Check whether TsFile %s need decode or not error", resource.getTsFile().getPath()),
- e);
- }
+ res.add(new LoadSingleTsFileNode(getPlanNodeId(), resource, statement.isDeleteAfterLoad()));
}
return res;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileDispatcherImpl.java
index 418bd8aa99..f527232e48 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileDispatcherImpl.java
@@ -106,13 +106,22 @@ public class LoadTsFileDispatcherImpl implements IFragInstanceDispatcher {
private void dispatchOneInstance(FragmentInstance instance)
throws FragmentInstanceDispatchException {
+ TTsFilePieceReq loadTsFileReq = null;
+
for (TDataNodeLocation dataNodeLocation :
instance.getRegionReplicaSet().getDataNodeLocations()) {
TEndPoint endPoint = dataNodeLocation.getInternalEndPoint();
if (isDispatchedToLocal(endPoint)) {
dispatchLocally(instance);
} else {
- dispatchRemote(instance, endPoint);
+ if (loadTsFileReq == null) {
+ loadTsFileReq =
+ new TTsFilePieceReq(
+ instance.getFragment().getPlanNodeTree().serializeToByteBuffer(),
+ uuid,
+ instance.getRegionReplicaSet().getRegionId());
+ }
+ dispatchRemote(loadTsFileReq, endPoint);
}
}
}
@@ -121,15 +130,10 @@ public class LoadTsFileDispatcherImpl implements IFragInstanceDispatcher {
return this.localhostIpAddr.equals(endPoint.getIp()) && localhostInternalPort == endPoint.port;
}
- private void dispatchRemote(FragmentInstance instance, TEndPoint endPoint)
+ private void dispatchRemote(TTsFilePieceReq loadTsFileReq, TEndPoint endPoint)
throws FragmentInstanceDispatchException {
try (SyncDataNodeInternalServiceClient client =
internalServiceClientManager.borrowClient(endPoint)) {
- TTsFilePieceReq loadTsFileReq =
- new TTsFilePieceReq(
- instance.getFragment().getPlanNodeTree().serializeToByteBuffer(),
- uuid,
- instance.getRegionReplicaSet().getRegionId());
TLoadResp loadResp = client.sendTsFilePieceNode(loadTsFileReq);
if (!loadResp.isAccepted()) {
logger.warn(loadResp.message);
@@ -165,7 +169,7 @@ public class LoadTsFileDispatcherImpl implements IFragInstanceDispatcher {
if (!RpcUtils.SUCCESS_STATUS.equals(resultStatus)) {
throw new FragmentInstanceDispatchException(resultStatus);
}
- } else if (planNode instanceof LoadSingleTsFileNode) { // do not need split
+ } else if (planNode instanceof LoadSingleTsFileNode) { // do not need to split
try {
StorageEngine.getInstance()
.getDataRegion((DataRegionId) groupId)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
index e37449055b..435768c1d2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
@@ -22,8 +22,12 @@ package org.apache.iotdb.db.mpp.plan.scheduler.load;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.partition.DataPartition;
+import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.partition.StorageExecutor;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -36,6 +40,7 @@ import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInfo;
+import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.mpp.plan.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.plan.planner.plan.PlanFragment;
@@ -45,6 +50,7 @@ import org.apache.iotdb.db.mpp.plan.scheduler.FragInstanceDispatchResult;
import org.apache.iotdb.db.mpp.plan.scheduler.IScheduler;
import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.Pair;
import io.airlift.units.Duration;
import org.slf4j.Logger;
@@ -66,6 +72,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
/**
* {@link LoadTsFileScheduler} is used for scheduling {@link LoadSingleTsFileNode} and {@link
@@ -76,16 +83,24 @@ import java.util.stream.Collectors;
*/
public class LoadTsFileScheduler implements IScheduler {
private static final Logger logger = LoggerFactory.getLogger(LoadTsFileScheduler.class);
- private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
public static final long LOAD_TASK_MAX_TIME_IN_SECOND = 5184000L; // one day
- private static final long MAX_MEMORY_SIZE =
- Math.min(
- config.getThriftMaxFrameSize() / 2,
- (long) (config.getAllocateMemoryForStorageEngine() * config.getLoadTsFileProportion()));
+ private static final long MAX_MEMORY_SIZE;
+ private static final int TRANSMIT_LIMIT;
+
+ static {
+ IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ MAX_MEMORY_SIZE =
+ Math.min(
+ config.getThriftMaxFrameSize() >> 2,
+ (long) (config.getAllocateMemoryForStorageEngine() * config.getLoadTsFileProportion()));
+ TRANSMIT_LIMIT =
+ CommonDescriptor.getInstance().getConfig().getTTimePartitionSlotTransmitLimit();
+ }
private final MPPQueryContext queryContext;
private final QueryStateMachine stateMachine;
private final LoadTsFileDispatcherImpl dispatcher;
+ private final DataPartitionBatchFetcher partitionFetcher;
private final List<LoadSingleTsFileNode> tsFileNodeList;
private final PlanFragmentId fragmentId;
@@ -95,12 +110,14 @@ public class LoadTsFileScheduler implements IScheduler {
DistributedQueryPlan distributedQueryPlan,
MPPQueryContext queryContext,
QueryStateMachine stateMachine,
- IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
+ IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager,
+ IPartitionFetcher partitionFetcher) {
this.queryContext = queryContext;
this.stateMachine = stateMachine;
this.tsFileNodeList = new ArrayList<>();
this.fragmentId = distributedQueryPlan.getRootSubPlan().getPlanFragment().getId();
this.dispatcher = new LoadTsFileDispatcherImpl(internalServiceClientManager);
+ this.partitionFetcher = new DataPartitionBatchFetcher(partitionFetcher);
this.allReplicaSets = new HashSet<>();
for (FragmentInstance fragmentInstance : distributedQueryPlan.getInstances()) {
@@ -111,31 +128,57 @@ public class LoadTsFileScheduler implements IScheduler {
@Override
public void start() {
stateMachine.transitionToRunning();
- for (LoadSingleTsFileNode node : tsFileNodeList) {
- if (!node.needDecodeTsFile()) {
- boolean isLoadLocallySuccess = loadLocally(node);
-
- node.clean();
- if (!isLoadLocallySuccess) {
- return;
+ int tsFileNodeListSize = tsFileNodeList.size();
+ boolean isLoadSuccess = true;
+
+ for (int i = 0; i < tsFileNodeListSize; ++i) {
+ LoadSingleTsFileNode node = tsFileNodeList.get(i);
+ boolean isLoadSingleTsFileSuccess = true;
+ try {
+ if (!node.needDecodeTsFile(
+ partitionFetcher::queryDataPartition)) { // do not decode, load locally
+ isLoadSingleTsFileSuccess = loadLocally(node);
+ node.clean();
+
+ } else { // need decode, load locally or remotely, use two phases method
+ String uuid = UUID.randomUUID().toString();
+ dispatcher.setUuid(uuid);
+ allReplicaSets.clear();
+
+ boolean isFirstPhaseSuccess = firstPhase(node);
+ boolean isSecondPhaseSuccess =
+ secondPhase(isFirstPhaseSuccess, uuid, node.getTsFileResource().getTsFile());
+
+ node.clean();
+ if (!isFirstPhaseSuccess || !isSecondPhaseSuccess) {
+ isLoadSingleTsFileSuccess = false;
+ }
}
- continue;
- }
-
- String uuid = UUID.randomUUID().toString();
- dispatcher.setUuid(uuid);
- allReplicaSets.clear();
-
- boolean isFirstPhaseSuccess = firstPhase(node);
- boolean isSecondPhaseSuccess =
- secondPhase(isFirstPhaseSuccess, uuid, node.getTsFileResource().getTsFile());
-
- node.clean();
- if (!isFirstPhaseSuccess || !isSecondPhaseSuccess) {
- return;
+ if (isLoadSingleTsFileSuccess) {
+ logger.info(
+ String.format(
+ "Load TsFile %s Successfully, load process [%s/%s]",
+ node.getTsFileResource().getTsFilePath(), i + 1, tsFileNodeListSize));
+ } else {
+ isLoadSuccess = false;
+ logger.warn(
+ String.format(
+ "Can not Load TsFile %s, load process [%s/%s]",
+ node.getTsFileResource().getTsFilePath(), i + 1, tsFileNodeListSize));
+ }
+ } catch (Exception e) {
+ isLoadSuccess = false;
+ stateMachine.transitionToFailed(e);
+ logger.warn(
+ String.format(
+ "LoadTsFileScheduler loads TsFile %s error",
+ node.getTsFileResource().getTsFilePath()),
+ e);
}
}
- stateMachine.transitionToFinished();
+ if (isLoadSuccess) {
+ stateMachine.transitionToFinished();
+ }
}
private boolean firstPhase(LoadSingleTsFileNode node) {
@@ -145,9 +188,11 @@ public class LoadTsFileScheduler implements IScheduler {
node.getTsFileResource().getTsFile(), tsFileDataManager::addOrSendTsFileData)
.splitTsFileByDataPartition();
if (!tsFileDataManager.sendAllTsFileData()) {
+ stateMachine.transitionToFailed(new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()));
return false;
}
} catch (IllegalStateException e) {
+ stateMachine.transitionToFailed(e);
logger.warn(
String.format(
"Dispatch TsFileData error when parsing TsFile %s.",
@@ -310,18 +355,20 @@ public class LoadTsFileScheduler implements IScheduler {
ROLLBACK
}
- private class TsFileDataManager {
+ private static class TsFileDataManager {
private final LoadTsFileScheduler scheduler;
private final LoadSingleTsFileNode singleTsFileNode;
private long dataSize;
- private Map<TRegionReplicaSet, LoadTsFilePieceNode> replicaSet2Piece;
+ private final Map<TRegionReplicaSet, LoadTsFilePieceNode> replicaSet2Piece;
+ private final List<ChunkData> nonDirectionalChunkData;
public TsFileDataManager(LoadTsFileScheduler scheduler, LoadSingleTsFileNode singleTsFileNode) {
this.scheduler = scheduler;
this.singleTsFileNode = singleTsFileNode;
this.dataSize = 0;
this.replicaSet2Piece = new HashMap<>();
+ this.nonDirectionalChunkData = new ArrayList<>();
}
private boolean addOrSendTsFileData(TsFileData tsFileData) {
@@ -331,16 +378,13 @@ public class LoadTsFileScheduler implements IScheduler {
}
private boolean addOrSendChunkData(ChunkData chunkData) {
- TRegionReplicaSet replicaSet =
- singleTsFileNode
- .getDataPartition()
- .getDataRegionReplicaSetForWriting(
- chunkData.getDevice(), chunkData.getTimePartitionSlot());
- dataSize +=
- (1 + replicaSet.getDataNodeLocationsSize())
- * chunkData.getDataSize(); // should multiply datanode factor
+ nonDirectionalChunkData.add(chunkData);
+ dataSize += chunkData.getDataSize();
if (dataSize > MAX_MEMORY_SIZE) {
+ routeChunkData();
+
+ // start to dispatch from the biggest TsFilePieceNode
List<TRegionReplicaSet> sortedReplicaSets =
replicaSet2Piece.keySet().stream()
.sorted(
@@ -356,30 +400,50 @@ public class LoadTsFileScheduler implements IScheduler {
return false;
}
- dataSize -= (1 + sortedReplicaSet.getDataNodeLocationsSize()) * pieceNode.getDataSize();
+ dataSize -= pieceNode.getDataSize();
replicaSet2Piece.put(
sortedReplicaSet,
new LoadTsFilePieceNode(
singleTsFileNode.getPlanNodeId(),
- singleTsFileNode.getTsFileResource().getTsFile()));
+ singleTsFileNode
+ .getTsFileResource()
+ .getTsFile())); // can not just remove, because of deletion
if (dataSize <= MAX_MEMORY_SIZE) {
break;
}
}
}
- replicaSet2Piece
- .computeIfAbsent(
- replicaSet,
- o ->
- new LoadTsFilePieceNode(
- singleTsFileNode.getPlanNodeId(),
- singleTsFileNode.getTsFileResource().getTsFile()))
- .addTsFileData(chunkData);
return true;
}
+ private void routeChunkData() {
+ if (nonDirectionalChunkData.isEmpty()) {
+ return;
+ }
+
+ List<TRegionReplicaSet> replicaSets =
+ scheduler.partitionFetcher.queryDataPartition(
+ nonDirectionalChunkData.stream()
+ .map(data -> new Pair<>(data.getDevice(), data.getTimePartitionSlot()))
+ .collect(Collectors.toList()));
+ IntStream.range(0, nonDirectionalChunkData.size())
+ .forEach(
+ i ->
+ replicaSet2Piece
+ .computeIfAbsent(
+ replicaSets.get(i),
+ o ->
+ new LoadTsFilePieceNode(
+ singleTsFileNode.getPlanNodeId(),
+ singleTsFileNode.getTsFileResource().getTsFile()))
+ .addTsFileData(nonDirectionalChunkData.get(i)));
+ nonDirectionalChunkData.clear();
+ }
+
private boolean addOrSendDeletionData(TsFileData deletionData) {
+ routeChunkData(); // ensure chunk data will be added before deletion
+
for (Map.Entry<TRegionReplicaSet, LoadTsFilePieceNode> entry : replicaSet2Piece.entrySet()) {
dataSize += deletionData.getDataSize();
entry.getValue().addTsFileData(deletionData);
@@ -388,6 +452,8 @@ public class LoadTsFileScheduler implements IScheduler {
}
private boolean sendAllTsFileData() {
+ routeChunkData();
+
for (Map.Entry<TRegionReplicaSet, LoadTsFilePieceNode> entry : replicaSet2Piece.entrySet()) {
if (!scheduler.dispatchOnePieceNode(entry.getValue(), entry.getKey())) {
logger.warn(
@@ -400,4 +466,43 @@ public class LoadTsFileScheduler implements IScheduler {
return true;
}
}
+
+ private static class DataPartitionBatchFetcher {
+ private final IPartitionFetcher fetcher;
+
+ public DataPartitionBatchFetcher(IPartitionFetcher fetcher) {
+ this.fetcher = fetcher;
+ }
+
+ public List<TRegionReplicaSet> queryDataPartition(
+ List<Pair<String, TTimePartitionSlot>> slotList) {
+ List<TRegionReplicaSet> replicaSets = new ArrayList<>();
+ int size = slotList.size();
+
+ for (int i = 0; i < size; i += TRANSMIT_LIMIT) {
+ List<Pair<String, TTimePartitionSlot>> subSlotList =
+ slotList.subList(i, Math.min(size, i + TRANSMIT_LIMIT));
+ DataPartition dataPartition = fetcher.getOrCreateDataPartition(toQueryParam(subSlotList));
+ replicaSets.addAll(
+ subSlotList.stream()
+ .map(pair -> dataPartition.getDataRegionReplicaSetForWriting(pair.left, pair.right))
+ .collect(Collectors.toList()));
+ }
+ return replicaSets;
+ }
+
+ private List<DataPartitionQueryParam> toQueryParam(
+ List<Pair<String, TTimePartitionSlot>> slots) {
+ return slots.stream()
+ .collect(
+ Collectors.groupingBy(
+ Pair::getLeft, Collectors.mapping(Pair::getRight, Collectors.toSet())))
+ .entrySet()
+ .stream()
+ .map(
+ entry ->
+ new DataPartitionQueryParam(entry.getKey(), new ArrayList<>(entry.getValue())))
+ .collect(Collectors.toList());
+ }
+ }
}