You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/12/08 00:57:16 UTC

[iotdb] branch master updated: [IOTDB-2102] Push down limit to ReadTask in RawDataSetWithoutValueFilter (#4534)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 7aabea4  [IOTDB-2102] Push down limit to ReadTask in RawDataSetWithoutValueFilter (#4534)
7aabea4 is described below

commit 7aabea4d2d8224ddf13f9b929287d954ac2d3e11
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Wed Dec 8 08:56:37 2021 +0800

    [IOTDB-2102] Push down limit to ReadTask in RawDataSetWithoutValueFilter (#4534)
---
 .../dataset/RawQueryDataSetWithoutValueFilter.java | 40 ++++++++++++++++++++--
 1 file changed, 37 insertions(+), 3 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
index 5f83c74..af89b39 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
@@ -58,12 +58,23 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet
     private final ManagedSeriesReader reader;
     private final String pathName;
     private final BlockingQueue<BatchData> blockingQueue;
+    private int[] batchDataLengthList;
+    private final int seriesIndex;
+    private final int fetchLimit;
 
     public ReadTask(
-        ManagedSeriesReader reader, BlockingQueue<BatchData> blockingQueue, String pathName) {
+        ManagedSeriesReader reader,
+        BlockingQueue<BatchData> blockingQueue,
+        String pathName,
+        int[] batchDataLengthList,
+        int seriesIndex,
+        int fetchLimit) {
       this.reader = reader;
       this.blockingQueue = blockingQueue;
       this.pathName = pathName;
+      this.batchDataLengthList = batchDataLengthList;
+      this.seriesIndex = seriesIndex;
+      this.fetchLimit = fetchLimit;
     }
 
     @Override
@@ -87,6 +98,13 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet
               continue;
             }
             blockingQueue.put(batchData);
+
+            if (batchDataLengthList != null) {
+              batchDataLengthList[seriesIndex] += batchData.length();
+              if (batchDataLengthList[seriesIndex] >= fetchLimit) {
+                break;
+              }
+            }
             // if the queue also has free space, just submit another itself
             if (blockingQueue.remainingCapacity() > 0) {
               TASK_POOL_MANAGER.submit(this);
@@ -150,6 +168,8 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet
 
   protected BatchData[] cachedBatchDataArray;
 
+  protected int[] batchDataLengthList;
+
   private int bufferNum;
 
   // capacity for blocking queue
@@ -193,6 +213,9 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet
         bufferNum += 1;
       }
     }
+    if (rowLimit != 0) {
+      batchDataLengthList = new int[readers.size()];
+    }
     init();
   }
 
@@ -214,7 +237,13 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet
       reader.setHasRemaining(true);
       reader.setManagedByQueryManager(true);
       TASK_POOL_MANAGER.submit(
-          new ReadTask(reader, blockingQueueArray[i], paths.get(i).getFullPath()));
+          new ReadTask(
+              reader,
+              blockingQueueArray[i],
+              paths.get(i).getFullPath(),
+              batchDataLengthList,
+              i,
+              rowLimit + rowOffset));
     }
     for (int i = 0; i < seriesReaderList.size(); i++) {
       // check the interrupted status of query before taking next batch
@@ -538,7 +567,12 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet
             reader.setManagedByQueryManager(true);
             TASK_POOL_MANAGER.submit(
                 new ReadTask(
-                    reader, blockingQueueArray[seriesIndex], paths.get(seriesIndex).getFullPath()));
+                    reader,
+                    blockingQueueArray[seriesIndex],
+                    paths.get(seriesIndex).getFullPath(),
+                    batchDataLengthList,
+                    seriesIndex,
+                    rowLimit + rowOffset));
           }
         }
       }