You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/11/29 01:09:34 UTC
[iotdb] branch rel/1.0 updated: [To rel/1.0] [IOTDB-4940] Optimize query fetch data partition process while containg unclosed time filter (#8230)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.0 by this push:
new eeaaef0071 [To rel/1.0] [IOTDB-4940] Optimize query fetch data partition process while containg unclosed time filter (#8230)
eeaaef0071 is described below
commit eeaaef00712b211d3baeb30104ad6d919baa787d
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Tue Nov 29 09:09:28 2022 +0800
[To rel/1.0] [IOTDB-4940] Optimize query fetch data partition process while containg unclosed time filter (#8230)
---
.../read/partition/GetDataPartitionPlan.java | 34 ++----
.../partition/GetOrCreateDataPartitionPlan.java | 5 +-
.../iotdb/confignode/manager/ConfigManager.java | 13 ++-
.../iotdb/confignode/manager/load/LoadManager.java | 5 +-
.../manager/load/balancer/PartitionBalancer.java | 5 +-
.../partition/GreedyPartitionAllocator.java | 13 ++-
.../balancer/partition/IPartitionAllocator.java | 5 +-
.../manager/partition/PartitionManager.java | 8 +-
.../persistence/partition/PartitionInfo.java | 16 ++-
.../partition/StorageGroupPartitionTable.java | 8 +-
.../request/ConfigPhysicalPlanSerDeTest.java | 27 +++--
.../it/IoTDBClusterRegionLeaderBalancingIT.java | 20 ++--
.../confignode/it/IoTDBConfigNodeSnapshotIT.java | 8 +-
.../it/IoTDBConfigNodeSwitchLeaderIT.java | 18 +--
.../it/partition/IoTDBPartitionDurableIT.java | 7 +-
.../it/partition/IoTDBPartitionGetterIT.java | 6 +-
.../partition/IoTDBPartitionInheritPolicyIT.java | 5 +-
.../it/partition/IoTDBRegionGroupExtensionIT.java | 5 +-
.../confignode/it/utils/ConfigNodeTestUtils.java | 28 +++--
.../commons/partition/DataPartitionQueryParam.java | 35 ++++++
.../commons/partition/DataPartitionTable.java | 23 ++--
.../commons/partition/SeriesPartitionTable.java | 34 +++++-
.../commons/utils/ThriftCommonsSerDeUtils.java | 20 ++++
.../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 125 ++++++++++++++++-----
.../mpp/plan/analyze/ClusterPartitionFetcher.java | 91 +++++++++------
.../db/mpp/plan/analyze/IPartitionFetcher.java | 9 ++
.../plan/analyze/StandalonePartitionFetcher.java | 7 ++
.../mpp/plan/analyze/FakePartitionFetcherImpl.java | 6 +
.../mpp/plan/analyze/QueryTimePartitionTest.java | 73 ++++++++----
.../iotdb/db/mpp/plan/plan/distribution/Util.java | 6 +
.../src/main/thrift/confignode.thrift | 10 +-
31 files changed, 472 insertions(+), 203 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/partition/GetDataPartitionPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/partition/GetDataPartitionPlan.java
index b90419143f..7bd91f74a4 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/partition/GetDataPartitionPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/partition/GetDataPartitionPlan.java
@@ -19,19 +19,17 @@
package org.apache.iotdb.confignode.consensus.request.read.partition;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
-import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
@@ -41,7 +39,7 @@ import java.util.concurrent.ConcurrentHashMap;
public class GetDataPartitionPlan extends ConfigPhysicalPlan {
// Map<StorageGroup, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>>
- protected Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap;
+ protected Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap;
public GetDataPartitionPlan() {
super(ConfigPhysicalPlanType.GetDataPartition);
@@ -52,12 +50,12 @@ public class GetDataPartitionPlan extends ConfigPhysicalPlan {
}
public GetDataPartitionPlan(
- Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap) {
+ Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap) {
this();
this.partitionSlotsMap = partitionSlotsMap;
}
- public Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> getPartitionSlotsMap() {
+ public Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> getPartitionSlotsMap() {
return partitionSlotsMap;
}
@@ -76,22 +74,18 @@ public class GetDataPartitionPlan extends ConfigPhysicalPlan {
stream.writeShort(getType().getPlanType());
stream.writeInt(partitionSlotsMap.size());
- for (Entry<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> entry :
+ for (Entry<String, Map<TSeriesPartitionSlot, TTimeSlotList>> entry :
partitionSlotsMap.entrySet()) {
String storageGroup = entry.getKey();
- Map<TSeriesPartitionSlot, List<TTimePartitionSlot>> seriesPartitionTimePartitionSlots =
- entry.getValue();
+ Map<TSeriesPartitionSlot, TTimeSlotList> seriesPartitionTimePartitionSlots = entry.getValue();
BasicStructureSerDeUtil.write(storageGroup, stream);
stream.writeInt(seriesPartitionTimePartitionSlots.size());
- for (Entry<TSeriesPartitionSlot, List<TTimePartitionSlot>> e :
+ for (Entry<TSeriesPartitionSlot, TTimeSlotList> e :
seriesPartitionTimePartitionSlots.entrySet()) {
TSeriesPartitionSlot seriesPartitionSlot = e.getKey();
- List<TTimePartitionSlot> timePartitionSlots = e.getValue();
+ TTimeSlotList timePartitionSlotList = e.getValue();
ThriftCommonsSerDeUtils.serializeTSeriesPartitionSlot(seriesPartitionSlot, stream);
- stream.writeInt(timePartitionSlots.size());
- timePartitionSlots.forEach(
- timePartitionSlot ->
- ThriftCommonsSerDeUtils.serializeTTimePartitionSlot(timePartitionSlot, stream));
+ ThriftCommonsSerDeUtils.serializeTTimePartitionSlotList(timePartitionSlotList, stream);
}
}
}
@@ -107,13 +101,9 @@ public class GetDataPartitionPlan extends ConfigPhysicalPlan {
for (int j = 0; j < seriesPartitionSlotNum; j++) {
TSeriesPartitionSlot seriesPartitionSlot =
ThriftCommonsSerDeUtils.deserializeTSeriesPartitionSlot(buffer);
- partitionSlotsMap.get(storageGroup).put(seriesPartitionSlot, new ArrayList<>());
- int timePartitionSlotNum = buffer.getInt();
- for (int k = 0; k < timePartitionSlotNum; k++) {
- TTimePartitionSlot timePartitionSlot =
- ThriftCommonsSerDeUtils.deserializeTTimePartitionSlot(buffer);
- partitionSlotsMap.get(storageGroup).get(seriesPartitionSlot).add(timePartitionSlot);
- }
+ TTimeSlotList timePartitionSlotList =
+ ThriftCommonsSerDeUtils.deserializeTTimePartitionSlotList(buffer);
+ partitionSlotsMap.get(storageGroup).put(seriesPartitionSlot, timePartitionSlotList);
}
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/partition/GetOrCreateDataPartitionPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/partition/GetOrCreateDataPartitionPlan.java
index 07f6a11588..279d4e3fed 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/partition/GetOrCreateDataPartitionPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/partition/GetOrCreateDataPartitionPlan.java
@@ -19,11 +19,10 @@
package org.apache.iotdb.confignode.consensus.request.read.partition;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
-import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -34,7 +33,7 @@ public class GetOrCreateDataPartitionPlan extends GetDataPartitionPlan {
}
public GetOrCreateDataPartitionPlan(
- Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap) {
+ Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap) {
this();
this.partitionSlotsMap = partitionSlotsMap;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 4cc6bcf9f4..7926195897 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -133,6 +133,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowStorageGroupResp;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
+import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.db.metadata.template.Template;
@@ -1375,12 +1376,16 @@ public class ConfigManager implements IManager {
getSchemaPartition(patternTree).getSchemaPartitionTable();
// Construct request for getting data partition
- Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap =
- new HashMap<>();
+ Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap = new HashMap<>();
schemaPartitionTable.forEach(
(key, value) -> {
- Map<TSeriesPartitionSlot, List<TTimePartitionSlot>> slotListMap = new HashMap<>();
- value.keySet().forEach(slot -> slotListMap.put(slot, Collections.emptyList()));
+ Map<TSeriesPartitionSlot, TTimeSlotList> slotListMap = new HashMap<>();
+ value
+ .keySet()
+ .forEach(
+ slot ->
+ slotListMap.put(
+ slot, new TTimeSlotList(Collections.emptyList(), true, true)));
partitionSlotsMap.put(key, slotListMap);
});
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index f37bf20d56..328e7f9021 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
-import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
@@ -52,6 +51,7 @@ import org.apache.iotdb.confignode.manager.node.heartbeat.NodeStatistics;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupStatistics;
import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionStatistics;
+import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
import org.slf4j.Logger;
@@ -134,8 +134,7 @@ public class LoadManager {
* @return Map<StorageGroupName, DataPartitionTable>, the allocating result
*/
public Map<String, DataPartitionTable> allocateDataPartition(
- Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>>
- unassignedDataPartitionSlotsMap)
+ Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> unassignedDataPartitionSlotsMap)
throws NoAvailableRegionGroupException {
return partitionBalancer.allocateDataPartition(unassignedDataPartitionSlotsMap);
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
index 72522e099b..89d9235911 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
@@ -19,13 +19,13 @@
package org.apache.iotdb.confignode.manager.load.balancer;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
-import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.partition.DataPartitionTable;
import org.apache.iotdb.commons.partition.SchemaPartitionTable;
import org.apache.iotdb.confignode.exception.NoAvailableRegionGroupException;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.load.balancer.partition.GreedyPartitionAllocator;
import org.apache.iotdb.confignode.manager.load.balancer.partition.IPartitionAllocator;
+import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import java.util.List;
import java.util.Map;
@@ -61,8 +61,7 @@ public class PartitionBalancer {
* @return Map<StorageGroupName, DataPartitionTable>, the allocating result
*/
public Map<String, DataPartitionTable> allocateDataPartition(
- Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>>
- unassignedDataPartitionSlotsMap)
+ Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> unassignedDataPartitionSlotsMap)
throws NoAvailableRegionGroupException {
return genPartitionAllocator().allocateDataPartition(unassignedDataPartitionSlotsMap);
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java
index 4f92c325dc..996cf57116 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.exception.NoAvailableRegionGroupException;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
+import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import org.apache.iotdb.tsfile.utils.Pair;
import java.util.Collections;
@@ -84,15 +85,14 @@ public class GreedyPartitionAllocator implements IPartitionAllocator {
@Override
public Map<String, DataPartitionTable> allocateDataPartition(
- Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>>
- unassignedDataPartitionSlotsMap)
+ Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> unassignedDataPartitionSlotsMap)
throws NoAvailableRegionGroupException {
Map<String, DataPartitionTable> result = new ConcurrentHashMap<>();
- for (Map.Entry<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> slotsMapEntry :
+ for (Map.Entry<String, Map<TSeriesPartitionSlot, TTimeSlotList>> slotsMapEntry :
unassignedDataPartitionSlotsMap.entrySet()) {
final String storageGroup = slotsMapEntry.getKey();
- final Map<TSeriesPartitionSlot, List<TTimePartitionSlot>> unassignedPartitionSlotsMap =
+ final Map<TSeriesPartitionSlot, TTimeSlotList> unassignedPartitionSlotsMap =
slotsMapEntry.getValue();
// List<Pair<allocatedSlotsNum, TConsensusGroupId>>
@@ -103,12 +103,13 @@ public class GreedyPartitionAllocator implements IPartitionAllocator {
DataPartitionTable dataPartitionTable = new DataPartitionTable();
// Enumerate SeriesPartitionSlot
- for (Map.Entry<TSeriesPartitionSlot, List<TTimePartitionSlot>> seriesPartitionEntry :
+ for (Map.Entry<TSeriesPartitionSlot, TTimeSlotList> seriesPartitionEntry :
unassignedPartitionSlotsMap.entrySet()) {
SeriesPartitionTable seriesPartitionTable = new SeriesPartitionTable();
// Enumerate TimePartitionSlot in ascending order
- List<TTimePartitionSlot> timePartitionSlots = seriesPartitionEntry.getValue();
+ List<TTimePartitionSlot> timePartitionSlots =
+ seriesPartitionEntry.getValue().getTimePartitionSlots();
timePartitionSlots.sort(Comparator.comparingLong(TTimePartitionSlot::getStartTime));
for (TTimePartitionSlot timePartitionSlot : timePartitionSlots) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/IPartitionAllocator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/IPartitionAllocator.java
index ec20638d3e..62ef53d965 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/IPartitionAllocator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/IPartitionAllocator.java
@@ -19,10 +19,10 @@
package org.apache.iotdb.confignode.manager.load.balancer.partition;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
-import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.partition.DataPartitionTable;
import org.apache.iotdb.commons.partition.SchemaPartitionTable;
import org.apache.iotdb.confignode.exception.NoAvailableRegionGroupException;
+import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import java.util.List;
import java.util.Map;
@@ -50,7 +50,6 @@ public interface IPartitionAllocator {
* @return Map<StorageGroupName, DataPartitionTable>, the allocating result
*/
Map<String, DataPartitionTable> allocateDataPartition(
- Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>>
- unassignedDataPartitionSlotsMap)
+ Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> unassignedDataPartitionSlotsMap)
throws NoAvailableRegionGroupException;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index df7f826807..d364d6b23c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -73,6 +73,7 @@ import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionCreateTask;
import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDeleteTask;
import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionMaintainTask;
+import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
@@ -157,7 +158,7 @@ public class PartitionManager {
* Thread-safely get DataPartition
*
* @param req DataPartitionPlan with Map<StorageGroupName, Map<SeriesPartitionSlot,
- * List<TimePartitionSlot>>>
+ * TTimeSlotList>>
* @return DataPartitionDataSet that contains only existing DataPartition
*/
public DataSet getDataPartition(GetDataPartitionPlan req) {
@@ -263,9 +264,8 @@ public class PartitionManager {
// the number of serialized CreateDataPartitionReqs is acceptable.
synchronized (this) {
// Filter unassigned DataPartitionSlots
- Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>>
- unassignedDataPartitionSlotsMap =
- partitionInfo.filterUnassignedDataPartitionSlots(req.getPartitionSlotsMap());
+ Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> unassignedDataPartitionSlotsMap =
+ partitionInfo.filterUnassignedDataPartitionSlots(req.getPartitionSlotsMap());
// Here we ensure that each StorageGroup has at least one DataRegion.
// And if some StorageGroups own too many slots, extend DataRegion for them.
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index 44a13ca463..4fed025aa4 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -58,6 +58,7 @@ import org.apache.iotdb.confignode.persistence.metric.PartitionInfoMetrics;
import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionMaintainTask;
import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.rpc.RpcUtils;
@@ -536,16 +537,13 @@ public class PartitionInfo implements SnapshotProcessor {
/**
* Only Leader use this interface. Filter unassigned SchemaPartitionSlots
*
- * @param partitionSlotsMap Map<StorageGroupName, Map<TSeriesPartitionSlot,
- * List<TTimePartitionSlot>>>
- * @return Map<StorageGroupName, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>>,
- * DataPartitionSlots that is not assigned in partitionSlotsMap
+ * @param partitionSlotsMap Map<StorageGroupName, Map<TSeriesPartitionSlot, TTimeSlotList>>
+ * @return Map<StorageGroupName, Map<TSeriesPartitionSlot, TTimeSlotList>>, DataPartitionSlots
+ * that is not assigned in partitionSlotsMap
*/
- public Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>>
- filterUnassignedDataPartitionSlots(
- Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap) {
- Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> result =
- new ConcurrentHashMap<>();
+ public Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> filterUnassignedDataPartitionSlots(
+ Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap) {
+ Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> result = new ConcurrentHashMap<>();
partitionSlotsMap.forEach(
(storageGroup, partitionSlots) -> {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
index 8ec04c7e4d..68ac9204be 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.commons.partition.SchemaPartitionTable;
import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan;
import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -194,8 +195,7 @@ public class StorageGroupPartitionTable {
* @return True if all the PartitionSlots are matched, false otherwise
*/
public boolean getDataPartition(
- Map<TSeriesPartitionSlot, List<TTimePartitionSlot>> partitionSlots,
- DataPartitionTable dataPartition) {
+ Map<TSeriesPartitionSlot, TTimeSlotList> partitionSlots, DataPartitionTable dataPartition) {
return dataPartitionTable.getDataPartition(partitionSlots, dataPartition);
}
@@ -285,8 +285,8 @@ public class StorageGroupPartitionTable {
* @param partitionSlots List<TSeriesPartitionSlot>
* @return Unassigned PartitionSlots
*/
- public Map<TSeriesPartitionSlot, List<TTimePartitionSlot>> filterUnassignedDataPartitionSlots(
- Map<TSeriesPartitionSlot, List<TTimePartitionSlot>> partitionSlots) {
+ public Map<TSeriesPartitionSlot, TTimeSlotList> filterUnassignedDataPartitionSlots(
+ Map<TSeriesPartitionSlot, TTimeSlotList> partitionSlots) {
return dataPartitionTable.filterUnassignedDataPartitionSlots(partitionSlots);
}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index 3b6c0afd28..4672b402ce 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -112,6 +112,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
+import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.CreateSchemaTemplateStatement;
@@ -434,11 +435,16 @@ public class ConfigPhysicalPlanSerDeTest {
TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(10);
TTimePartitionSlot timePartitionSlot = new TTimePartitionSlot(100);
- Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap =
- new HashMap<>();
+ Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap = new HashMap<>();
partitionSlotsMap.put(storageGroup, new HashMap<>());
- partitionSlotsMap.get(storageGroup).put(seriesPartitionSlot, new ArrayList<>());
- partitionSlotsMap.get(storageGroup).get(seriesPartitionSlot).add(timePartitionSlot);
+ partitionSlotsMap
+ .get(storageGroup)
+ .put(seriesPartitionSlot, new TTimeSlotList().setTimePartitionSlots(new ArrayList<>()));
+ partitionSlotsMap
+ .get(storageGroup)
+ .get(seriesPartitionSlot)
+ .getTimePartitionSlots()
+ .add(timePartitionSlot);
GetDataPartitionPlan req0 = new GetDataPartitionPlan(partitionSlotsMap);
GetDataPartitionPlan req1 =
@@ -452,11 +458,16 @@ public class ConfigPhysicalPlanSerDeTest {
TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(10);
TTimePartitionSlot timePartitionSlot = new TTimePartitionSlot(100);
- Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap =
- new HashMap<>();
+ Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap = new HashMap<>();
partitionSlotsMap.put(storageGroup, new HashMap<>());
- partitionSlotsMap.get(storageGroup).put(seriesPartitionSlot, new ArrayList<>());
- partitionSlotsMap.get(storageGroup).get(seriesPartitionSlot).add(timePartitionSlot);
+ partitionSlotsMap
+ .get(storageGroup)
+ .put(seriesPartitionSlot, new TTimeSlotList().setTimePartitionSlots(new ArrayList<>()));
+ partitionSlotsMap
+ .get(storageGroup)
+ .get(seriesPartitionSlot)
+ .getTimePartitionSlots()
+ .add(timePartitionSlot);
GetOrCreateDataPartitionPlan req0 = new GetOrCreateDataPartitionPlan(partitionSlotsMap);
GetOrCreateDataPartitionPlan req1 =
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRegionLeaderBalancingIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRegionLeaderBalancingIT.java
index 8fe1e11bd8..caab757870 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRegionLeaderBalancingIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRegionLeaderBalancingIT.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
+import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.it.env.ConfigFactory;
import org.apache.iotdb.it.env.EnvFactory;
@@ -54,7 +55,6 @@ import org.junit.runner.RunWith;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@@ -135,11 +135,12 @@ public class IoTDBClusterRegionLeaderBalancingIT {
// Create a DataRegionGroup for each StorageGroup through getOrCreateDataPartition
for (int i = 0; i < storageGroupNum; i++) {
- Map<TSeriesPartitionSlot, List<TTimePartitionSlot>> seriesSlotMap = new HashMap<>();
+ Map<TSeriesPartitionSlot, TTimeSlotList> seriesSlotMap = new HashMap<>();
seriesSlotMap.put(
- new TSeriesPartitionSlot(1), Collections.singletonList(new TTimePartitionSlot(100)));
- Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> sgSlotsMap =
- new HashMap<>();
+ new TSeriesPartitionSlot(1),
+ new TTimeSlotList()
+ .setTimePartitionSlots(Collections.singletonList(new TTimePartitionSlot(100))));
+ Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> sgSlotsMap = new HashMap<>();
sgSlotsMap.put(sg + i, seriesSlotMap);
TDataPartitionTableResp dataPartitionTableResp =
client.getOrCreateDataPartitionTable(new TDataPartitionReq(sgSlotsMap));
@@ -197,11 +198,12 @@ public class IoTDBClusterRegionLeaderBalancingIT {
schemaPartitionTableResp.getStatus().getCode());
// Create a DataRegionGroup for each StorageGroup
- Map<TSeriesPartitionSlot, List<TTimePartitionSlot>> seriesSlotMap = new HashMap<>();
+ Map<TSeriesPartitionSlot, TTimeSlotList> seriesSlotMap = new HashMap<>();
seriesSlotMap.put(
- new TSeriesPartitionSlot(1), Collections.singletonList(new TTimePartitionSlot(100)));
- Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> sgSlotsMap =
- new HashMap<>();
+ new TSeriesPartitionSlot(1),
+ new TTimeSlotList()
+ .setTimePartitionSlots(Collections.singletonList(new TTimePartitionSlot(100))));
+ Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> sgSlotsMap = new HashMap<>();
sgSlotsMap.put(sg + i, seriesSlotMap);
TDataPartitionTableResp dataPartitionTableResp =
client.getOrCreateDataPartitionTable(new TDataPartitionReq(sgSlotsMap));
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java
index 70d8c34c52..d8d4f5d071 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java
@@ -42,6 +42,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
+import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.it.env.ConfigFactory;
import org.apache.iotdb.it.env.EnvFactory;
@@ -163,12 +164,15 @@ public class IoTDBConfigNodeSnapshotIT {
new TTimePartitionSlot(testTimePartitionInterval * k);
// Create DataPartition
- Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap =
+ Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap =
new HashMap<>();
partitionSlotsMap.put(storageGroup, new HashMap<>());
partitionSlotsMap
.get(storageGroup)
- .put(seriesPartitionSlot, Collections.singletonList(timePartitionSlot));
+ .put(
+ seriesPartitionSlot,
+ new TTimeSlotList()
+ .setTimePartitionSlots(Collections.singletonList(timePartitionSlot)));
TDataPartitionReq dataPartitionReq = new TDataPartitionReq(partitionSlotsMap);
TDataPartitionTableResp dataPartitionTableResp =
client.getOrCreateDataPartitionTable(dataPartitionReq);
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSwitchLeaderIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSwitchLeaderIT.java
index 746fe7cc1c..47ef166dc3 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSwitchLeaderIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSwitchLeaderIT.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
+import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.it.env.ConfigFactory;
import org.apache.iotdb.it.env.EnvFactory;
@@ -49,7 +50,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -151,10 +151,12 @@ public class IoTDBConfigNodeSwitchLeaderIT {
schemaPartitionTableResp0.getStatus().getCode());
// Create DataRegionGroups through getOrCreateDataPartition and record DataPartitionTable
- Map<TSeriesPartitionSlot, List<TTimePartitionSlot>> seriesSlotMap = new HashMap<>();
+ Map<TSeriesPartitionSlot, TTimeSlotList> seriesSlotMap = new HashMap<>();
seriesSlotMap.put(
- new TSeriesPartitionSlot(1), Collections.singletonList(new TTimePartitionSlot(100)));
- Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> sgSlotsMap = new HashMap<>();
+ new TSeriesPartitionSlot(1),
+ new TTimeSlotList()
+ .setTimePartitionSlots(Collections.singletonList(new TTimePartitionSlot(100))));
+ Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> sgSlotsMap = new HashMap<>();
sgSlotsMap.put(sg0, seriesSlotMap);
sgSlotsMap.put(sg1, seriesSlotMap);
dataPartitionTableResp0 =
@@ -177,10 +179,12 @@ public class IoTDBConfigNodeSwitchLeaderIT {
client.getSchemaPartitionTable(new TSchemaPartitionReq().setPathPatternTree(buffer)));
// Check DataPartitionTable
- Map<TSeriesPartitionSlot, List<TTimePartitionSlot>> seriesSlotMap = new HashMap<>();
+ Map<TSeriesPartitionSlot, TTimeSlotList> seriesSlotMap = new HashMap<>();
seriesSlotMap.put(
- new TSeriesPartitionSlot(1), Collections.singletonList(new TTimePartitionSlot(100)));
- Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> sgSlotsMap = new HashMap<>();
+ new TSeriesPartitionSlot(1),
+ new TTimeSlotList()
+ .setTimePartitionSlots(Collections.singletonList(new TTimePartitionSlot(100))));
+ Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> sgSlotsMap = new HashMap<>();
sgSlotsMap.put(sg0, seriesSlotMap);
sgSlotsMap.put(sg1, seriesSlotMap);
Assert.assertEquals(
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionDurableIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionDurableIT.java
index 731477a326..53baa4e271 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionDurableIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionDurableIT.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
-import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.cluster.RegionStatus;
@@ -42,6 +41,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
+import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.it.env.ConfigFactory;
import org.apache.iotdb.it.env.DataNodeWrapper;
@@ -62,7 +62,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -289,7 +288,7 @@ public class IoTDBPartitionDurableIT {
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
/* Test getOrCreateDataPartition, ConfigNode should create DataPartition and return */
- Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap =
+ Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap =
ConfigNodeTestUtils.constructPartitionSlotsMap(
sg,
0,
@@ -442,7 +441,7 @@ public class IoTDBPartitionDurableIT {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
// Test getOrCreateDataPartition, ConfigNode should create DataPartition and return
- Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap =
+ Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap =
ConfigNodeTestUtils.constructPartitionSlotsMap(
sg,
0,
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionGetterIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionGetterIT.java
index ff5eaf8eee..49f7b1ce60 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionGetterIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionGetterIT.java
@@ -45,6 +45,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowStorageGroupResp;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
+import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.it.env.ConfigFactory;
import org.apache.iotdb.it.env.EnvFactory;
@@ -67,7 +68,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -180,7 +180,7 @@ public class IoTDBPartitionGetterIT {
String storageGroup = sg + i;
for (int j = 0; j < testSeriesPartitionSlotNum; j += seriesPartitionBatchSize) {
for (long k = 0; k < testTimePartitionSlotsNum; k += timePartitionBatchSize) {
- Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap =
+ Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap =
ConfigNodeTestUtils.constructPartitionSlotsMap(
storageGroup,
j,
@@ -316,7 +316,7 @@ public class IoTDBPartitionGetterIT {
TDataPartitionTableResp dataPartitionTableResp;
// Prepare partitionSlotsMap
- Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap =
+ Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap =
ConfigNodeTestUtils.constructPartitionSlotsMap(
sg + 10, 0, 10, 0, 10, testTimePartitionInterval);
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java
index 52deb6d02f..6e8ae39c83 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.confignode.it.partition;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
-import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
@@ -29,6 +28,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
+import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.it.env.ConfigFactory;
import org.apache.iotdb.it.env.EnvFactory;
@@ -47,7 +47,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -135,7 +134,7 @@ public class IoTDBPartitionInheritPolicyIT {
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
TDataPartitionReq dataPartitionReq = new TDataPartitionReq();
TDataPartitionTableResp dataPartitionTableResp;
- Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap;
+ Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap;
for (int i = 0; i < storageGroupNum; i++) {
String storageGroup = sg + i;
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBRegionGroupExtensionIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBRegionGroupExtensionIT.java
index 9c592a1051..e61f8c5316 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBRegionGroupExtensionIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBRegionGroupExtensionIT.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.confignode.it.partition;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
-import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
@@ -29,6 +28,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
+import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.it.env.ConfigFactory;
import org.apache.iotdb.it.env.EnvFactory;
@@ -46,7 +46,6 @@ import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import java.io.IOException;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
@@ -126,7 +125,7 @@ public class IoTDBRegionGroupExtensionIT {
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
/* Insert a DataPartition to create DataRegionGroups */
- Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap =
+ Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap =
ConfigNodeTestUtils.constructPartitionSlotsMap(
curSg, 0, 10, 0, 10, originalTimePartitionInterval);
TDataPartitionTableResp dataPartitionTableResp =
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java
index 6bd8ed4cfb..43a1889901 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
+import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import org.apache.iotdb.it.env.ConfigNodeWrapper;
import org.apache.iotdb.it.env.DataNodeWrapper;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
@@ -120,23 +121,28 @@ public class ConfigNodeTestUtils {
return ByteBuffer.wrap(baos.toByteArray());
}
- public static Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>>
- constructPartitionSlotsMap(
- String storageGroup,
- int seriesSlotStart,
- int seriesSlotEnd,
- long timeSlotStart,
- long timeSlotEnd,
- long timePartitionInterval) {
- Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> result = new HashMap<>();
+ public static Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> constructPartitionSlotsMap(
+ String storageGroup,
+ int seriesSlotStart,
+ int seriesSlotEnd,
+ long timeSlotStart,
+ long timeSlotEnd,
+ long timePartitionInterval) {
+ Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> result = new HashMap<>();
result.put(storageGroup, new HashMap<>());
for (int i = seriesSlotStart; i < seriesSlotEnd; i++) {
TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i);
- result.get(storageGroup).put(seriesPartitionSlot, new ArrayList<>());
+ result
+ .get(storageGroup)
+ .put(seriesPartitionSlot, new TTimeSlotList().setTimePartitionSlots(new ArrayList<>()));
for (long j = timeSlotStart; j < timeSlotEnd; j++) {
TTimePartitionSlot timePartitionSlot = new TTimePartitionSlot(j * timePartitionInterval);
- result.get(storageGroup).get(seriesPartitionSlot).add(timePartitionSlot);
+ result
+ .get(storageGroup)
+ .get(seriesPartitionSlot)
+ .getTimePartitionSlots()
+ .add(timePartitionSlot);
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java
index 83a7cdeb2c..4ed3cd8dc4 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java
@@ -28,12 +28,31 @@ public class DataPartitionQueryParam {
private String devicePath;
private List<TTimePartitionSlot> timePartitionSlotList = new ArrayList<>();
+ // it will be set to true in query when there exist filter like: time <= XXX
+ // (-oo, timePartitionSlotList.get(0))
+ private boolean needLeftAll = false;
+
+ // it will be set to true query when there exist filter like: time >= XXX
+ // (timePartitionSlotList.get(timePartitionSlotList.size() - 1), +oo)
+ private boolean needRightAll = false;
+
public DataPartitionQueryParam(
String devicePath, List<TTimePartitionSlot> timePartitionSlotList) {
this.devicePath = devicePath;
this.timePartitionSlotList = timePartitionSlotList;
}
+ public DataPartitionQueryParam(
+ String devicePath,
+ List<TTimePartitionSlot> timePartitionSlotList,
+ boolean needLeftAll,
+ boolean needRightAll) {
+ this.devicePath = devicePath;
+ this.timePartitionSlotList = timePartitionSlotList;
+ this.needLeftAll = needLeftAll;
+ this.needRightAll = needRightAll;
+ }
+
public DataPartitionQueryParam() {}
public String getDevicePath() {
@@ -51,4 +70,20 @@ public class DataPartitionQueryParam {
public void setTimePartitionSlotList(List<TTimePartitionSlot> timePartitionSlotList) {
this.timePartitionSlotList = timePartitionSlotList;
}
+
+ public boolean isNeedLeftAll() {
+ return needLeftAll;
+ }
+
+ public void setNeedLeftAll(boolean needLeftAll) {
+ this.needLeftAll = needLeftAll;
+ }
+
+ public boolean isNeedRightAll() {
+ return needRightAll;
+ }
+
+ public void setNeedRightAll(boolean needRightAll) {
+ this.needRightAll = needRightAll;
+ }
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
index 39c151a390..de570e21d6 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
+import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.thrift.TException;
@@ -63,7 +64,7 @@ public class DataPartitionTable {
* @return True if all the PartitionSlots are matched, false otherwise
*/
public boolean getDataPartition(
- Map<TSeriesPartitionSlot, List<TTimePartitionSlot>> partitionSlots,
+ Map<TSeriesPartitionSlot, TTimeSlotList> partitionSlots,
DataPartitionTable dataPartitionTable) {
AtomicBoolean result = new AtomicBoolean(true);
if (partitionSlots.isEmpty()) {
@@ -72,12 +73,12 @@ public class DataPartitionTable {
} else {
// Return the DataPartition for each SeriesPartitionSlot
partitionSlots.forEach(
- (seriesPartitionSlot, timePartitionSlots) -> {
+ (seriesPartitionSlot, timePartitionSlotList) -> {
if (dataPartitionMap.containsKey(seriesPartitionSlot)) {
SeriesPartitionTable seriesPartitionTable = new SeriesPartitionTable();
if (!dataPartitionMap
.get(seriesPartitionSlot)
- .getDataPartition(timePartitionSlots, seriesPartitionTable)) {
+ .getDataPartition(timePartitionSlotList, seriesPartitionTable)) {
result.set(false);
}
@@ -146,17 +147,21 @@ public class DataPartitionTable {
* @param partitionSlots SeriesPartitionSlots and TimePartitionSlots
* @return Unassigned PartitionSlots
*/
- public Map<TSeriesPartitionSlot, List<TTimePartitionSlot>> filterUnassignedDataPartitionSlots(
- Map<TSeriesPartitionSlot, List<TTimePartitionSlot>> partitionSlots) {
- Map<TSeriesPartitionSlot, List<TTimePartitionSlot>> result = new ConcurrentHashMap<>();
+ public Map<TSeriesPartitionSlot, TTimeSlotList> filterUnassignedDataPartitionSlots(
+ Map<TSeriesPartitionSlot, TTimeSlotList> partitionSlots) {
+ Map<TSeriesPartitionSlot, TTimeSlotList> result = new ConcurrentHashMap<>();
partitionSlots.forEach(
(seriesPartitionSlot, timePartitionSlots) ->
result.put(
seriesPartitionSlot,
- dataPartitionMap
- .computeIfAbsent(seriesPartitionSlot, empty -> new SeriesPartitionTable())
- .filterUnassignedDataPartitionSlots(timePartitionSlots)));
+ new TTimeSlotList(
+ dataPartitionMap
+ .computeIfAbsent(seriesPartitionSlot, empty -> new SeriesPartitionTable())
+ .filterUnassignedDataPartitionSlots(
+ timePartitionSlots.getTimePartitionSlots()),
+ false,
+ false)));
return result;
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
index 7db5486893..c5712849c2 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
+import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.thrift.TException;
@@ -63,20 +64,47 @@ public class SeriesPartitionTable {
/**
* Thread-safely get DataPartition within the specific StorageGroup
*
- * @param partitionSlots TimePartitionSlots
+ * @param partitionSlotList TimePartitionSlotList
* @param seriesPartitionTable Store the matched SeriesPartitions
* @return True if all the SeriesPartitionSlots are matched, false otherwise
*/
public boolean getDataPartition(
- List<TTimePartitionSlot> partitionSlots, SeriesPartitionTable seriesPartitionTable) {
+ TTimeSlotList partitionSlotList, SeriesPartitionTable seriesPartitionTable) {
AtomicBoolean result = new AtomicBoolean(true);
+ List<TTimePartitionSlot> partitionSlots = partitionSlotList.getTimePartitionSlots();
if (partitionSlots.isEmpty()) {
// Return all DataPartitions in one SeriesPartitionSlot
// when the queried TimePartitionSlots are empty
seriesPartitionTable.getSeriesPartitionMap().putAll(seriesPartitionMap);
} else {
- // Return the DataPartition for each TimePartitionSlot
+ boolean isNeedLeftAll = partitionSlotList.isNeedLeftAll(),
+ isNeedRightAll = partitionSlotList.isNeedRightAll();
+ if (isNeedLeftAll || isNeedRightAll) {
+ // we need to calculate the leftMargin which contains all the time partition on the unclosed
+ // left side: (-oo, leftMargin)
+ // and the rightMargin which contains all the time partition on the unclosed right side:
+ // (rightMargin, +oo)
+ // all the remaining closed time range which locates in [leftMargin, rightMargin] will be
+ // calculated outside if block
+ long leftMargin = isNeedLeftAll ? partitionSlots.get(0).getStartTime() : Long.MIN_VALUE,
+ rightMargin =
+ isNeedRightAll
+ ? partitionSlots.get(partitionSlots.size() - 1).getStartTime()
+ : Long.MAX_VALUE;
+ seriesPartitionTable
+ .getSeriesPartitionMap()
+ .putAll(
+ seriesPartitionMap.entrySet().stream()
+ .filter(
+ entry -> {
+ long startTime = entry.getKey().getStartTime();
+ return startTime < leftMargin || startTime > rightMargin;
+ })
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
+ }
+
+ // Return the DataPartition for each match TimePartitionSlot
partitionSlots.forEach(
timePartitionSlot -> {
if (seriesPartitionMap.containsKey(timePartitionSlot)) {
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtils.java b/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtils.java
index e91e1ab18d..ffa4b5cd01 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtils.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtils.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
+import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
@@ -157,6 +158,25 @@ public class ThriftCommonsSerDeUtils {
return seriesPartitionSlot;
}
+ public static void serializeTTimePartitionSlotList(
+ TTimeSlotList timePartitionSlotList, DataOutputStream stream) {
+ try {
+ timePartitionSlotList.write(generateWriteProtocol(stream));
+ } catch (TException e) {
+ throw new ThriftSerDeException("Write TTimeSlotList failed: ", e);
+ }
+ }
+
+ public static TTimeSlotList deserializeTTimePartitionSlotList(ByteBuffer buffer) {
+ TTimeSlotList timePartitionSlotList = new TTimeSlotList();
+ try {
+ timePartitionSlotList.read(generateWriteProtocol(buffer));
+ } catch (TException e) {
+ throw new ThriftSerDeException("Read TTimeSlotList failed: ", e);
+ }
+ return timePartitionSlotList;
+ }
+
public static void serializeTTimePartitionSlot(
TTimePartitionSlot timePartitionSlot, DataOutputStream stream) {
try {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 0a8966c95c..7bc61ec849 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
@@ -161,6 +162,8 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
private static final Logger logger = LoggerFactory.getLogger(Analyzer.class);
+ private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
+
private final IPartitionFetcher partitionFetcher;
private final ISchemaFetcher schemaFetcher;
@@ -358,18 +361,36 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
Set<String> deviceSet =
allSelectedPath.stream().map(MeasurementPath::getDevice).collect(Collectors.toSet());
- List<TTimePartitionSlot> timePartitionSlotList =
+ Pair<List<TTimePartitionSlot>, Pair<Boolean, Boolean>> res =
getTimePartitionSlotList(analysis.getGlobalTimeFilter());
- Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
- for (String devicePath : deviceSet) {
- DataPartitionQueryParam queryParam =
- new DataPartitionQueryParam(devicePath, timePartitionSlotList);
- sgNameToQueryParamsMap
- .computeIfAbsent(schemaTree.getBelongedDatabase(devicePath), key -> new ArrayList<>())
- .add(queryParam);
+ DataPartition dataPartition;
+
+ // there is no satisfied time range
+ if (res.left.isEmpty() && !res.right.left) {
+ dataPartition =
+ new DataPartition(
+ Collections.emptyMap(),
+ CONFIG.getSeriesPartitionExecutorClass(),
+ CONFIG.getSeriesPartitionSlotNum());
+ } else {
+ Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
+ for (String devicePath : deviceSet) {
+ DataPartitionQueryParam queryParam =
+ new DataPartitionQueryParam(devicePath, res.left, res.right.left, res.right.right);
+ sgNameToQueryParamsMap
+ .computeIfAbsent(schemaTree.getBelongedDatabase(devicePath), key -> new ArrayList<>())
+ .add(queryParam);
+ }
+
+ if (res.right.left || res.right.right) {
+ dataPartition =
+ partitionFetcher.getDataPartitionWithUnclosedTimeRange(sgNameToQueryParamsMap);
+ } else {
+ dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
+ }
}
- DataPartition dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
+
analysis.setDataPartitionInfo(dataPartition);
return analysis;
@@ -1122,39 +1143,83 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
private DataPartition fetchDataPartitionByDevices(
Set<String> deviceSet, ISchemaTree schemaTree, Filter globalTimeFilter) {
- List<TTimePartitionSlot> timePartitionSlotList = getTimePartitionSlotList(globalTimeFilter);
+ Pair<List<TTimePartitionSlot>, Pair<Boolean, Boolean>> res =
+ getTimePartitionSlotList(globalTimeFilter);
+ // there is no satisfied time range
+ if (res.left.isEmpty() && !res.right.left) {
+ return new DataPartition(
+ Collections.emptyMap(),
+ CONFIG.getSeriesPartitionExecutorClass(),
+ CONFIG.getSeriesPartitionSlotNum());
+ }
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
for (String devicePath : deviceSet) {
DataPartitionQueryParam queryParam =
- new DataPartitionQueryParam(devicePath, timePartitionSlotList);
+ new DataPartitionQueryParam(devicePath, res.left, res.right.left, res.right.right);
sgNameToQueryParamsMap
.computeIfAbsent(schemaTree.getBelongedDatabase(devicePath), key -> new ArrayList<>())
.add(queryParam);
}
- return partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
+
+ if (res.right.left || res.right.right) {
+ return partitionFetcher.getDataPartitionWithUnclosedTimeRange(sgNameToQueryParamsMap);
+ } else {
+ return partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
+ }
}
- public static List<TTimePartitionSlot> getTimePartitionSlotList(Filter timeFilter) {
+ /**
+ * get TTimePartitionSlot list about this time filter
+ *
+ * @return List<TTimePartitionSlot>, if contains (-oo, XXX] time range, res.right.left = true; if
+ * contains [XX, +oo), res.right.right = true
+ */
+ public static Pair<List<TTimePartitionSlot>, Pair<Boolean, Boolean>> getTimePartitionSlotList(
+ Filter timeFilter) {
if (timeFilter == null) {
- return Collections.emptyList();
+ // (-oo, +oo)
+ return new Pair<>(Collections.emptyList(), new Pair<>(true, true));
}
List<TimeRange> timeRangeList = timeFilter.getTimeRanges();
if (timeRangeList.isEmpty()) {
- return Collections.emptyList();
+ // no satisfied time range
+ return new Pair<>(Collections.emptyList(), new Pair<>(false, false));
+ } else if (timeRangeList.size() == 1
+ && (timeRangeList.get(0).getMin() == Long.MIN_VALUE
+ && timeRangeList.get(timeRangeList.size() - 1).getMax() == Long.MAX_VALUE)) {
+ // (-oo, +oo)
+ return new Pair<>(Collections.emptyList(), new Pair<>(true, true));
+ }
+
+ boolean needLeftAll, needRightAll;
+ long startTime, endTime;
+ TTimePartitionSlot timePartitionSlot;
+ int index = 0, size = timeRangeList.size();
+
+ if (timeRangeList.get(0).getMin() == Long.MIN_VALUE) {
+ needLeftAll = true;
+ startTime =
+ (timeRangeList.get(0).getMax() / TimePartitionUtils.timePartitionInterval)
+ * TimePartitionUtils.timePartitionInterval; // included
+ endTime = startTime + TimePartitionUtils.timePartitionInterval; // excluded
+ timePartitionSlot = TimePartitionUtils.getTimePartition(timeRangeList.get(0).getMax());
+ } else {
+ startTime =
+ (timeRangeList.get(0).getMin() / TimePartitionUtils.timePartitionInterval)
+ * TimePartitionUtils.timePartitionInterval; // included
+ endTime = startTime + TimePartitionUtils.timePartitionInterval; // excluded
+ timePartitionSlot = TimePartitionUtils.getTimePartition(timeRangeList.get(0).getMin());
+ needLeftAll = false;
}
- if (timeRangeList.get(0).getMin() == Long.MIN_VALUE
- || timeRangeList.get(timeRangeList.size() - 1).getMax() == Long.MAX_VALUE) {
- return Collections.emptyList();
+
+ if (timeRangeList.get(size - 1).getMax() == Long.MAX_VALUE) {
+ needRightAll = true;
+ size--;
+ } else {
+ needRightAll = false;
}
List<TTimePartitionSlot> result = new ArrayList<>();
- long startTime =
- (timeRangeList.get(0).getMin() / TimePartitionUtils.timePartitionInterval)
- * TimePartitionUtils.timePartitionInterval; // included
- long endTime = startTime + TimePartitionUtils.timePartitionInterval; // excluded
- TTimePartitionSlot timePartitionSlot =
- TimePartitionUtils.getTimePartition(timeRangeList.get(0).getMin());
- int index = 0, size = timeRangeList.size();
while (index < size) {
long curLeft = timeRangeList.get(index).getMin();
long curRight = timeRangeList.get(index).getMax();
@@ -1175,7 +1240,15 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
}
}
result.add(timePartitionSlot);
- return result;
+
+ if (needRightAll) {
+ TTimePartitionSlot lastTimePartitionSlot =
+ TimePartitionUtils.getTimePartition(timeRangeList.get(timeRangeList.size() - 1).getMin());
+ if (lastTimePartitionSlot.startTime != timePartitionSlot.startTime) {
+ result.add(lastTimePartitionSlot);
+ }
+ }
+ return new Pair<>(result, new Pair<>(needLeftAll, needRightAll));
}
private void analyzeInto(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
index 089a661ac8..b803893288 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import org.apache.iotdb.db.client.ConfigNodeClient;
import org.apache.iotdb.db.client.ConfigNodeInfo;
import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
@@ -59,7 +60,6 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
public class ClusterPartitionFetcher implements IPartitionFetcher {
private static final Logger logger = LoggerFactory.getLogger(ClusterPartitionFetcher.class);
@@ -173,10 +173,10 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
@Override
public DataPartition getDataPartition(
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
- try (ConfigNodeClient client =
- configNodeClientManager.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
- DataPartition dataPartition = partitionCache.getDataPartition(sgNameToQueryParamsMap);
- if (null == dataPartition) {
+ DataPartition dataPartition = partitionCache.getDataPartition(sgNameToQueryParamsMap);
+ if (null == dataPartition) {
+ try (ConfigNodeClient client =
+ configNodeClientManager.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
TDataPartitionTableResp dataPartitionTableResp =
client.getDataPartitionTable(constructDataPartitionReq(sgNameToQueryParamsMap));
if (dataPartitionTableResp.getStatus().getCode()
@@ -188,8 +188,32 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
"An error occurred when executing getDataPartition():"
+ dataPartitionTableResp.getStatus().getMessage());
}
+ } catch (TException | IOException e) {
+ throw new StatementAnalyzeException(
+ "An error occurred when executing getDataPartition():" + e.getMessage());
+ }
+ }
+ return dataPartition;
+ }
+
+ @Override
+ public DataPartition getDataPartitionWithUnclosedTimeRange(
+ Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
+ // In this method, we must fetch from config node because it contains -oo or +oo
+ // and there is no need to update cache because since we will never fetch it from cache, the
+ // update operation will be only time waste
+ try (ConfigNodeClient client =
+ configNodeClientManager.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
+ TDataPartitionTableResp dataPartitionTableResp =
+ client.getDataPartitionTable(constructDataPartitionReq(sgNameToQueryParamsMap));
+ if (dataPartitionTableResp.getStatus().getCode()
+ == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return parseDataPartitionResp(dataPartitionTableResp);
+ } else {
+ throw new StatementAnalyzeException(
+ "An error occurred when executing getDataPartition():"
+ + dataPartitionTableResp.getStatus().getMessage());
}
- return dataPartition;
} catch (TException | IOException e) {
throw new StatementAnalyzeException(
"An error occurred when executing getDataPartition():" + e.getMessage());
@@ -199,11 +223,11 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
@Override
public DataPartition getOrCreateDataPartition(
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
- // Do not use data partition cache
- try (ConfigNodeClient client =
- configNodeClientManager.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
- DataPartition dataPartition = partitionCache.getDataPartition(sgNameToQueryParamsMap);
- if (null == dataPartition) {
+ DataPartition dataPartition = partitionCache.getDataPartition(sgNameToQueryParamsMap);
+ if (null == dataPartition) {
+ // Do not use data partition cache
+ try (ConfigNodeClient client =
+ configNodeClientManager.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
TDataPartitionTableResp dataPartitionTableResp =
client.getOrCreateDataPartitionTable(constructDataPartitionReq(sgNameToQueryParamsMap));
if (dataPartitionTableResp.getStatus().getCode()
@@ -215,23 +239,25 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
"An error occurred when executing getOrCreateDataPartition():"
+ dataPartitionTableResp.getStatus().getMessage());
}
+ } catch (TException | IOException e) {
+ throw new StatementAnalyzeException(
+ "An error occurred when executing getOrCreateDataPartition():" + e.getMessage());
}
- return dataPartition;
- } catch (TException | IOException e) {
- throw new StatementAnalyzeException(
- "An error occurred when executing getOrCreateDataPartition():" + e.getMessage());
}
+ return dataPartition;
}
@Override
public DataPartition getOrCreateDataPartition(
List<DataPartitionQueryParam> dataPartitionQueryParams) {
- try (ConfigNodeClient client =
- configNodeClientManager.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
- Map<String, List<DataPartitionQueryParam>> splitDataPartitionQueryParams =
- splitDataPartitionQueryParam(dataPartitionQueryParams, true);
- DataPartition dataPartition = partitionCache.getDataPartition(splitDataPartitionQueryParams);
- if (null == dataPartition) {
+
+ Map<String, List<DataPartitionQueryParam>> splitDataPartitionQueryParams =
+ splitDataPartitionQueryParam(dataPartitionQueryParams, true);
+ DataPartition dataPartition = partitionCache.getDataPartition(splitDataPartitionQueryParams);
+
+ if (null == dataPartition) {
+ try (ConfigNodeClient client =
+ configNodeClientManager.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
TDataPartitionTableResp dataPartitionTableResp =
client.getOrCreateDataPartitionTable(
constructDataPartitionReq(splitDataPartitionQueryParams));
@@ -246,12 +272,12 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
dataPartitionTableResp.getStatus().getMessage(),
dataPartitionTableResp.getStatus().getCode()));
}
+ } catch (TException | IOException e) {
+ throw new StatementAnalyzeException(
+ "An error occurred when executing getOrCreateDataPartition():" + e.getMessage());
}
- return dataPartition;
- } catch (TException | IOException e) {
- throw new StatementAnalyzeException(
- "An error occurred when executing getOrCreateDataPartition():" + e.getMessage());
}
+ return dataPartition;
}
@Override
@@ -323,19 +349,20 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
private TDataPartitionReq constructDataPartitionReq(
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
- Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap =
- new HashMap<>();
+ Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap = new HashMap<>();
for (Map.Entry<String, List<DataPartitionQueryParam>> entry :
sgNameToQueryParamsMap.entrySet()) {
// for each sg
- Map<TSeriesPartitionSlot, List<TTimePartitionSlot>> deviceToTimePartitionMap =
- new HashMap<>();
+ Map<TSeriesPartitionSlot, TTimeSlotList> deviceToTimePartitionMap = new HashMap<>();
for (DataPartitionQueryParam queryParam : entry.getValue()) {
+ TTimeSlotList timePartitionSlotList =
+ new TTimeSlotList(
+ queryParam.getTimePartitionSlotList(),
+ queryParam.isNeedLeftAll(),
+ queryParam.isNeedRightAll());
deviceToTimePartitionMap.put(
partitionExecutor.getSeriesPartitionSlot(queryParam.getDevicePath()),
- queryParam.getTimePartitionSlotList().stream()
- .map(timePartitionSlot -> new TTimePartitionSlot(timePartitionSlot.getStartTime()))
- .collect(Collectors.toList()));
+ timePartitionSlotList);
}
partitionSlotsMap.put(entry.getKey(), deviceToTimePartitionMap);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/IPartitionFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/IPartitionFetcher.java
index 956b482556..e2c4b4a7c8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/IPartitionFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/IPartitionFetcher.java
@@ -46,6 +46,15 @@ public interface IPartitionFetcher {
*/
DataPartition getDataPartition(Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap);
+ /**
+ * Get data partition, used in query scenarios which contains time filter like: time < XX or time
+ * > XX
+ *
+ * @return sgNameToQueryParamsMap database name -> the list of DataPartitionQueryParams
+ */
+ DataPartition getDataPartitionWithUnclosedTimeRange(
+ Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap);
+
/**
* Get or create data partition, used in standalone write scenarios. if enableAutoCreateSchema is
* true and database/series/time slots not exists, then automatically create.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandalonePartitionFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandalonePartitionFetcher.java
index 07ff4497c2..a4484debb2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandalonePartitionFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandalonePartitionFetcher.java
@@ -146,6 +146,13 @@ public class StandalonePartitionFetcher implements IPartitionFetcher {
}
}
+ @Override
+ public DataPartition getDataPartitionWithUnclosedTimeRange(
+ Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
+ throw new UnsupportedOperationException(
+ "getDataPartitionWithUnclosedTimeRange is not supported");
+ }
+
@Override
public DataPartition getOrCreateDataPartition(
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakePartitionFetcherImpl.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakePartitionFetcherImpl.java
index a799d5ac3b..b15229ca77 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakePartitionFetcherImpl.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakePartitionFetcherImpl.java
@@ -209,6 +209,12 @@ public class FakePartitionFetcherImpl implements IPartitionFetcher {
return dataPartition;
}
+ @Override
+ public DataPartition getDataPartitionWithUnclosedTimeRange(
+ Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
+ return getDataPartition(sgNameToQueryParamsMap);
+ }
+
@Override
public DataPartition getOrCreateDataPartition(
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/QueryTimePartitionTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/QueryTimePartitionTest.java
index 0830dbfd0f..e795634438 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/QueryTimePartitionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/QueryTimePartitionTest.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
import org.apache.iotdb.tsfile.read.filter.operator.NotFilter;
import org.apache.iotdb.tsfile.read.filter.operator.OrFilter;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.junit.Test;
@@ -34,6 +35,7 @@ import java.util.List;
import static org.apache.iotdb.db.mpp.plan.analyze.AnalyzeVisitor.getTimePartitionSlotList;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class QueryTimePartitionTest {
@@ -350,28 +352,46 @@ public class QueryTimePartitionTest {
public void testGetTimePartitionSlotList() {
// time >= 10 and time <= 9
- List<TTimePartitionSlot> res =
+ Pair<List<TTimePartitionSlot>, Pair<Boolean, Boolean>> res =
getTimePartitionSlotList(new AndFilter(TimeFilter.gtEq(10), TimeFilter.ltEq(9)));
- assertTrue(res.isEmpty());
+ assertTrue(res.left.isEmpty());
+ assertFalse(res.right.left);
+ assertFalse(res.right.right);
// time >= 10
res = getTimePartitionSlotList(TimeFilter.gtEq(10));
- assertTrue(res.isEmpty());
+ assertEquals(1, res.left.size());
+ List<TTimePartitionSlot> expected = Collections.singletonList(new TTimePartitionSlot(0));
+ assertEquals(expected.size(), res.left.size());
+ for (int i = 0; i < expected.size(); i++) {
+ assertEquals(expected.get(i), res.left.get(i));
+ }
+ assertFalse(res.right.left);
+ assertTrue(res.right.right);
// time < 20
res = getTimePartitionSlotList(TimeFilter.lt(20));
- assertTrue(res.isEmpty());
+ assertEquals(1, res.left.size());
+ expected = Collections.singletonList(new TTimePartitionSlot(0));
+ assertEquals(expected.size(), res.left.size());
+ for (int i = 0; i < expected.size(); i++) {
+ assertEquals(expected.get(i), res.left.get(i));
+ }
+ assertTrue(res.right.left);
+ assertFalse(res.right.right);
// time > 10 and time <= 20
res = getTimePartitionSlotList(new AndFilter(TimeFilter.gt(10), TimeFilter.ltEq(20)));
- List<TTimePartitionSlot> expected = Collections.singletonList(new TTimePartitionSlot(0));
- assertEquals(expected.size(), res.size());
+ expected = Collections.singletonList(new TTimePartitionSlot(0));
+ assertEquals(expected.size(), res.left.size());
for (int i = 0; i < expected.size(); i++) {
- assertEquals(expected.get(i), res.get(i));
+ assertEquals(expected.get(i), res.left.get(i));
}
+ assertFalse(res.right.left);
+ assertFalse(res.right.right);
- // time > 0 and time <= IoTDBDescriptor.getInstance().getConfig().getTimePartitionInterval() * 3
- // + 1
+ // time > 0 and time <= IoTDBDescriptor.getInstance()
+ // .getConfig().getTimePartitionInterval() * 3 + 1
res =
getTimePartitionSlotList(
new AndFilter(
@@ -387,10 +407,12 @@ public class QueryTimePartitionTest {
IoTDBDescriptor.getInstance().getConfig().getTimePartitionInterval() * 2),
new TTimePartitionSlot(
IoTDBDescriptor.getInstance().getConfig().getTimePartitionInterval() * 3));
- assertEquals(expected.size(), res.size());
+ assertEquals(expected.size(), res.left.size());
for (int i = 0; i < expected.size(); i++) {
- assertEquals(expected.get(i), res.get(i));
+ assertEquals(expected.get(i), res.left.get(i));
}
+ assertFalse(res.right.left);
+ assertFalse(res.right.right);
// time >= IoTDBDescriptor.getInstance().getConfig().getTimePartitionInterval() - 1 and time <
// IoTDBDescriptor.getInstance().getConfig().getTimePartitionInterval() + 1
@@ -406,10 +428,12 @@ public class QueryTimePartitionTest {
new TTimePartitionSlot(0),
new TTimePartitionSlot(
IoTDBDescriptor.getInstance().getConfig().getTimePartitionInterval()));
- assertEquals(expected.size(), res.size());
+ assertEquals(expected.size(), res.left.size());
for (int i = 0; i < expected.size(); i++) {
- assertEquals(expected.get(i), res.get(i));
+ assertEquals(expected.get(i), res.left.get(i));
}
+ assertFalse(res.right.left);
+ assertFalse(res.right.right);
// time between IoTDBDescriptor.getInstance().getConfig().getTimePartitionInterval() - 1 and
// time < IoTDBDescriptor.getInstance().getConfig().getTimePartitionInterval()
@@ -424,10 +448,12 @@ public class QueryTimePartitionTest {
new TTimePartitionSlot(0),
new TTimePartitionSlot(
IoTDBDescriptor.getInstance().getConfig().getTimePartitionInterval()));
- assertEquals(expected.size(), res.size());
+ assertEquals(expected.size(), res.left.size());
for (int i = 0; i < expected.size(); i++) {
- assertEquals(expected.get(i), res.get(i));
+ assertEquals(expected.get(i), res.left.get(i));
}
+ assertFalse(res.right.left);
+ assertFalse(res.right.right);
// time >= IoTDBDescriptor.getInstance().getConfig().getTimePartitionInterval() and time <=
// IoTDBDescriptor.getInstance().getConfig().getTimePartitionInterval() + 1
@@ -442,10 +468,12 @@ public class QueryTimePartitionTest {
Collections.singletonList(
new TTimePartitionSlot(
IoTDBDescriptor.getInstance().getConfig().getTimePartitionInterval()));
- assertEquals(expected.size(), res.size());
+ assertEquals(expected.size(), res.left.size());
for (int i = 0; i < expected.size(); i++) {
- assertEquals(expected.get(i), res.get(i));
+ assertEquals(expected.get(i), res.left.get(i));
}
+ assertFalse(res.right.left);
+ assertFalse(res.right.right);
// time between IoTDBDescriptor.getInstance().getConfig().getTimePartitionInterval() and time <=
// IoTDBDescriptor.getInstance().getConfig().getTimePartitionInterval() + 1
@@ -459,10 +487,12 @@ public class QueryTimePartitionTest {
Collections.singletonList(
new TTimePartitionSlot(
IoTDBDescriptor.getInstance().getConfig().getTimePartitionInterval()));
- assertEquals(expected.size(), res.size());
+ assertEquals(expected.size(), res.left.size());
for (int i = 0; i < expected.size(); i++) {
- assertEquals(expected.get(i), res.get(i));
+ assertEquals(expected.get(i), res.left.get(i));
}
+ assertFalse(res.right.left);
+ assertFalse(res.right.right);
// (time >= 10 and time < IoTDBDescriptor.getInstance().getConfig().getTimePartitionInterval())
// or (time > IoTDBDescriptor.getInstance().getConfig().getTimePartitionInterval() and time <
@@ -525,8 +555,11 @@ public class QueryTimePartitionTest {
IoTDBDescriptor.getInstance().getConfig().getTimePartitionInterval() * 3),
new TTimePartitionSlot(
IoTDBDescriptor.getInstance().getConfig().getTimePartitionInterval() * 5));
+ assertEquals(expected.size(), res.left.size());
for (int i = 0; i < expected.size(); i++) {
- assertEquals(expected.get(i), res.get(i));
+ assertEquals(expected.get(i), res.left.get(i));
}
+ assertFalse(res.right.left);
+ assertFalse(res.right.right);
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java
index 654e987931..a4a3c6c580 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java
@@ -370,6 +370,12 @@ public class Util {
return ANALYSIS.getDataPartitionInfo();
}
+ @Override
+ public DataPartition getDataPartitionWithUnclosedTimeRange(
+ Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
+ return ANALYSIS.getDataPartitionInfo();
+ }
+
@Override
public DataPartition getOrCreateDataPartition(
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index 1f4494cbea..3a51d98e2b 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -198,10 +198,16 @@ struct TSchemaNodeManagementResp {
3: optional set<common.TSchemaNode> matchedNode
}
+struct TTimeSlotList {
+ 1: required list<common.TTimePartitionSlot> timePartitionSlots
+ 2: required bool needLeftAll
+ 3: required bool needRightAll
+}
+
// Data
struct TDataPartitionReq {
- // map<StorageGroupName, map<TSeriesPartitionSlot, list<TTimePartitionSlot>>>
- 1: required map<string, map<common.TSeriesPartitionSlot, list<common.TTimePartitionSlot>>> partitionSlotsMap
+ // map<StorageGroupName, map<TSeriesPartitionSlot, TTimePartionSlotList>>
+ 1: required map<string, map<common.TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap
}
struct TDataPartitionTableResp {