You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/12/08 01:57:24 UTC

[iotdb] branch 0.12pushdownlimit created (now ec578a5)

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a change to branch 0.12pushdownlimit
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at ec578a5  push down limit to read task in rawdatasetwithoutValueFilter

This branch includes the following new commits:

     new ec578a5  push down limit to read task in rawdatasetwithoutValueFilter

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: push down limit to read task in rawdatasetwithoutValueFilter

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch 0.12pushdownlimit
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ec578a5a9c1bd5bb3e55a6f58e7ef9ffc592d529
Author: Alima777 <wx...@gmail.com>
AuthorDate: Wed Dec 8 09:56:35 2021 +0800

    push down limit to read task in rawdatasetwithoutValueFilter
---
 .../dataset/RawQueryDataSetWithoutValueFilter.java | 40 ++++++++++++++++++++--
 1 file changed, 37 insertions(+), 3 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
index 7640b08..a8d7e79 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
@@ -56,12 +56,23 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet
     private final ManagedSeriesReader reader;
     private final String pathName;
     private final BlockingQueue<BatchData> blockingQueue;
+    private int[] batchDataLengthList;
+    private final int seriesIndex;
+    private final int fetchLimit;
 
     public ReadTask(
-        ManagedSeriesReader reader, BlockingQueue<BatchData> blockingQueue, String pathName) {
+        ManagedSeriesReader reader,
+        BlockingQueue<BatchData> blockingQueue,
+        String pathName,
+        int[] batchDataLengthList,
+        int seriesIndex,
+        int fetchLimit) {
       this.reader = reader;
       this.blockingQueue = blockingQueue;
       this.pathName = pathName;
+      this.batchDataLengthList = batchDataLengthList;
+      this.seriesIndex = seriesIndex;
+      this.fetchLimit = fetchLimit;
     }
 
     @Override
@@ -81,6 +92,13 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet
               continue;
             }
             blockingQueue.put(batchData);
+
+            if (batchDataLengthList != null) {
+              batchDataLengthList[seriesIndex] += batchData.length();
+              if (batchDataLengthList[seriesIndex] >= fetchLimit) {
+                break;
+              }
+            }
             // if the queue also has free space, just submit another itself
             if (blockingQueue.remainingCapacity() > 0) {
               TASK_POOL_MANAGER.submit(this);
@@ -144,6 +162,8 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet
 
   protected BatchData[] cachedBatchDataArray;
 
+  protected int[] batchDataLengthList;
+
   // capacity for blocking queue
   private static final int BLOCKING_QUEUE_CAPACITY = 5;
 
@@ -177,6 +197,9 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet
     }
     cachedBatchDataArray = new BatchData[readers.size()];
     noMoreDataInQueueArray = new boolean[readers.size()];
+    if (rowLimit != 0) {
+      batchDataLengthList = new int[readers.size()];
+    }
     init();
   }
 
@@ -198,7 +221,13 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet
       reader.setHasRemaining(true);
       reader.setManagedByQueryManager(true);
       TASK_POOL_MANAGER.submit(
-          new ReadTask(reader, blockingQueueArray[i], paths.get(i).getFullPath()));
+          new ReadTask(
+              reader,
+              blockingQueueArray[i],
+              paths.get(i).getFullPath(),
+              batchDataLengthList,
+              i,
+              rowLimit + rowOffset));
     }
     for (int i = 0; i < seriesReaderList.size(); i++) {
       // check the interrupted status of query before taking next batch
@@ -442,7 +471,12 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet
             reader.setManagedByQueryManager(true);
             TASK_POOL_MANAGER.submit(
                 new ReadTask(
-                    reader, blockingQueueArray[seriesIndex], paths.get(seriesIndex).getFullPath()));
+                    reader,
+                    blockingQueueArray[seriesIndex],
+                    paths.get(seriesIndex).getFullPath(),
+                    batchDataLengthList,
+                    seriesIndex,
+                    rowLimit + rowOffset));
           }
         }
       }