You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/03/17 10:32:08 UTC
[iotdb] 01/04: Filter unsatisfied TimePartition while preparing resource list
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch QueryImprove
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4b0c9843f2990062da101a8e1eec7979d577bb94
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Fri Mar 17 14:28:35 2023 +0800
Filter unsatisfied TimePartition while preparing resource list
---
.../apache/iotdb/db/engine/storagegroup/DataRegion.java | 5 +++--
.../db/engine/storagegroup/HashLastFlushTimeMap.java | 12 ++++++++++++
.../db/engine/storagegroup/IDTableLastFlushTimeMap.java | 6 ++++++
.../iotdb/db/engine/storagegroup/ILastFlushTimeMap.java | 3 +++
.../iotdb/db/engine/storagegroup/TsFileManager.java | 16 ++++++++++++++++
5 files changed, 40 insertions(+), 2 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index 9fe0e3346f..1864b0425f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -1700,9 +1700,10 @@ public class DataRegion implements IDataRegionForQuery {
List<PartialPath> pathList, String singleDeviceId, QueryContext context, Filter timeFilter)
throws QueryProcessException {
try {
+ List<Long> timePartitions = lastFlushTimeMap.getAllSatisfiedTimePartitions(singleDeviceId);
List<TsFileResource> seqResources =
getFileResourceListForQuery(
- tsFileManager.getTsFileList(true),
+ tsFileManager.getTsFileList(timePartitions, true),
upgradeSeqFileList,
pathList,
singleDeviceId,
@@ -1711,7 +1712,7 @@ public class DataRegion implements IDataRegionForQuery {
true);
List<TsFileResource> unseqResources =
getFileResourceListForQuery(
- tsFileManager.getTsFileList(false),
+ tsFileManager.getTsFileList(timePartitions, false),
upgradeUnseqFileList,
pathList,
singleDeviceId,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java
index 50d524a3e5..d34a52cdad 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java
@@ -22,9 +22,11 @@ package org.apache.iotdb.db.engine.storagegroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
public class HashLastFlushTimeMap implements ILastFlushTimeMap {
@@ -245,4 +247,14 @@ public class HashLastFlushTimeMap implements ILastFlushTimeMap {
}
return 0;
}
+
+ @Override
+ public List<Long> getAllSatisfiedTimePartitions(String deviceId) {
+ return deviceId == null
+ ? new ArrayList<>(newlyFlushedPartitionLatestFlushedTimeForEachDevice.keySet())
+ : newlyFlushedPartitionLatestFlushedTimeForEachDevice.entrySet().stream()
+ .filter(entry -> entry.getValue().containsKey(deviceId))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toList());
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMap.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMap.java
index 03dd87476a..3a56a2dd41 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMap.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMap.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.engine.storagegroup;
import org.apache.iotdb.db.metadata.idtable.IDTable;
import org.apache.iotdb.db.metadata.idtable.entry.DeviceEntry;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -184,4 +185,9 @@ public class IDTableLastFlushTimeMap implements ILastFlushTimeMap {
}
return 0;
}
+
+ @Override
+ public List<Long> getAllSatisfiedTimePartitions(String deviceId) {
+ return new ArrayList<>(partitionSet);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeMap.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeMap.java
index f344b73f31..0da4674073 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeMap.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeMap.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.engine.storagegroup;
+import java.util.List;
import java.util.Map;
/** This interface manages last time and flush time for sequence and unsequence determination */
@@ -70,4 +71,6 @@ public interface ILastFlushTimeMap {
void removePartition(long partitionId);
long getMemSize(long partitionId);
+
+ List<Long> getAllSatisfiedTimePartitions(String deviceId);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
index 86fa5e742e..ffd38cbde2 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
@@ -85,6 +85,22 @@ public class TsFileManager {
}
}
+ public List<TsFileResource> getTsFileList(List<Long> timePartitions, boolean sequence) {
+ // the iteration of ConcurrentSkipListMap is not concurrent secure
+ // so we must add read lock here
+ readLock();
+ try {
+ List<TsFileResource> allResources = new ArrayList<>();
+ Map<Long, TsFileResourceList> chosenMap = sequence ? sequenceFiles : unsequenceFiles;
+ for (Long timePartition : timePartitions) {
+ allResources.addAll(chosenMap.get(timePartition).getArrayList());
+ }
+ return allResources;
+ } finally {
+ readUnlock();
+ }
+ }
+
public TsFileResourceList getOrCreateSequenceListByTimePartition(long timePartition) {
writeLock("getOrCreateSequenceListByTimePartition");
try {