You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/03/17 07:43:46 UTC

[iotdb] 01/02: Filter unsatisfied TimePartition while preparing resource list

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch QueryImprove1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3732f877329898138412a2c090520c4110840118
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Fri Mar 17 14:28:35 2023 +0800

    Filter unsatisfied TimePartition while preparing resource list
---
 .../apache/iotdb/db/engine/storagegroup/DataRegion.java  |  5 +++--
 .../db/engine/storagegroup/HashLastFlushTimeMap.java     | 12 ++++++++++++
 .../db/engine/storagegroup/IDTableLastFlushTimeMap.java  |  6 ++++++
 .../iotdb/db/engine/storagegroup/ILastFlushTimeMap.java  |  3 +++
 .../iotdb/db/engine/storagegroup/TsFileManager.java      | 16 ++++++++++++++++
 5 files changed, 40 insertions(+), 2 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index 9fe0e3346f..1864b0425f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -1700,9 +1700,10 @@ public class DataRegion implements IDataRegionForQuery {
       List<PartialPath> pathList, String singleDeviceId, QueryContext context, Filter timeFilter)
       throws QueryProcessException {
     try {
+      List<Long> timePartitions = lastFlushTimeMap.getAllSatisfiedTimePartitions(singleDeviceId);
       List<TsFileResource> seqResources =
           getFileResourceListForQuery(
-              tsFileManager.getTsFileList(true),
+              tsFileManager.getTsFileList(timePartitions, true),
               upgradeSeqFileList,
               pathList,
               singleDeviceId,
@@ -1711,7 +1712,7 @@ public class DataRegion implements IDataRegionForQuery {
               true);
       List<TsFileResource> unseqResources =
           getFileResourceListForQuery(
-              tsFileManager.getTsFileList(false),
+              tsFileManager.getTsFileList(timePartitions, false),
               upgradeUnseqFileList,
               pathList,
               singleDeviceId,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java
index 50d524a3e5..d34a52cdad 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java
@@ -22,9 +22,11 @@ package org.apache.iotdb.db.engine.storagegroup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 public class HashLastFlushTimeMap implements ILastFlushTimeMap {
 
@@ -245,4 +247,14 @@ public class HashLastFlushTimeMap implements ILastFlushTimeMap {
     }
     return 0;
   }
+
+  @Override
+  public List<Long> getAllSatisfiedTimePartitions(String deviceId) {
+    return deviceId == null
+        ? new ArrayList<>(newlyFlushedPartitionLatestFlushedTimeForEachDevice.keySet())
+        : newlyFlushedPartitionLatestFlushedTimeForEachDevice.entrySet().stream()
+            .filter(entry -> entry.getValue().containsKey(deviceId))
+            .map(Map.Entry::getKey)
+            .collect(Collectors.toList());
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMap.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMap.java
index 03dd87476a..3a56a2dd41 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMap.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMap.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.engine.storagegroup;
 import org.apache.iotdb.db.metadata.idtable.IDTable;
 import org.apache.iotdb.db.metadata.idtable.entry.DeviceEntry;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -184,4 +185,9 @@ public class IDTableLastFlushTimeMap implements ILastFlushTimeMap {
     }
     return 0;
   }
+
+  @Override
+  public List<Long> getAllSatisfiedTimePartitions(String deviceId) {
+    return new ArrayList<>(partitionSet);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeMap.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeMap.java
index f344b73f31..0da4674073 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeMap.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeMap.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.engine.storagegroup;
 
+import java.util.List;
 import java.util.Map;
 
 /** This interface manages last time and flush time for sequence and unsequence determination */
@@ -70,4 +71,6 @@ public interface ILastFlushTimeMap {
   void removePartition(long partitionId);
 
   long getMemSize(long partitionId);
+
+  List<Long> getAllSatisfiedTimePartitions(String deviceId);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
index 86fa5e742e..ffd38cbde2 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
@@ -85,6 +85,22 @@ public class TsFileManager {
     }
   }
 
+  public List<TsFileResource> getTsFileList(List<Long> timePartitions, boolean sequence) {
+    // the iteration of ConcurrentSkipListMap is not concurrent secure
+    // so we must add read lock here
+    readLock();
+    try {
+      List<TsFileResource> allResources = new ArrayList<>();
+      Map<Long, TsFileResourceList> chosenMap = sequence ? sequenceFiles : unsequenceFiles;
+      for (Long timePartition : timePartitions) {
+        allResources.addAll(chosenMap.get(timePartition).getArrayList());
+      }
+      return allResources;
+    } finally {
+      readUnlock();
+    }
+  }
+
   public TsFileResourceList getOrCreateSequenceListByTimePartition(long timePartition) {
     writeLock("getOrCreateSequenceListByTimePartition");
     try {