You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/12/08 00:57:16 UTC
[iotdb] branch master updated: [IOTDB-2102] Push down limit to ReadTask in RawDataSetWithoutValueFilter (#4534)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7aabea4 [IOTDB-2102] Push down limit to ReadTask in RawDataSetWithoutValueFilter (#4534)
7aabea4 is described below
commit 7aabea4d2d8224ddf13f9b929287d954ac2d3e11
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Wed Dec 8 08:56:37 2021 +0800
[IOTDB-2102] Push down limit to ReadTask in RawDataSetWithoutValueFilter (#4534)
---
.../dataset/RawQueryDataSetWithoutValueFilter.java | 40 ++++++++++++++++++++--
1 file changed, 37 insertions(+), 3 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
index 5f83c74..af89b39 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
@@ -58,12 +58,23 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet
private final ManagedSeriesReader reader;
private final String pathName;
private final BlockingQueue<BatchData> blockingQueue;
+ private int[] batchDataLengthList;
+ private final int seriesIndex;
+ private final int fetchLimit;
public ReadTask(
- ManagedSeriesReader reader, BlockingQueue<BatchData> blockingQueue, String pathName) {
+ ManagedSeriesReader reader,
+ BlockingQueue<BatchData> blockingQueue,
+ String pathName,
+ int[] batchDataLengthList,
+ int seriesIndex,
+ int fetchLimit) {
this.reader = reader;
this.blockingQueue = blockingQueue;
this.pathName = pathName;
+ this.batchDataLengthList = batchDataLengthList;
+ this.seriesIndex = seriesIndex;
+ this.fetchLimit = fetchLimit;
}
@Override
@@ -87,6 +98,13 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet
continue;
}
blockingQueue.put(batchData);
+
+ if (batchDataLengthList != null) {
+ batchDataLengthList[seriesIndex] += batchData.length();
+ if (batchDataLengthList[seriesIndex] >= fetchLimit) {
+ break;
+ }
+ }
// if the queue also has free space, just submit another itself
if (blockingQueue.remainingCapacity() > 0) {
TASK_POOL_MANAGER.submit(this);
@@ -150,6 +168,8 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet
protected BatchData[] cachedBatchDataArray;
+ protected int[] batchDataLengthList;
+
private int bufferNum;
// capacity for blocking queue
@@ -193,6 +213,9 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet
bufferNum += 1;
}
}
+ if (rowLimit != 0) {
+ batchDataLengthList = new int[readers.size()];
+ }
init();
}
@@ -214,7 +237,13 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet
reader.setHasRemaining(true);
reader.setManagedByQueryManager(true);
TASK_POOL_MANAGER.submit(
- new ReadTask(reader, blockingQueueArray[i], paths.get(i).getFullPath()));
+ new ReadTask(
+ reader,
+ blockingQueueArray[i],
+ paths.get(i).getFullPath(),
+ batchDataLengthList,
+ i,
+ rowLimit + rowOffset));
}
for (int i = 0; i < seriesReaderList.size(); i++) {
// check the interrupted status of query before taking next batch
@@ -538,7 +567,12 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet
reader.setManagedByQueryManager(true);
TASK_POOL_MANAGER.submit(
new ReadTask(
- reader, blockingQueueArray[seriesIndex], paths.get(seriesIndex).getFullPath()));
+ reader,
+ blockingQueueArray[seriesIndex],
+ paths.get(seriesIndex).getFullPath(),
+ batchDataLengthList,
+ seriesIndex,
+ rowLimit + rowOffset));
}
}
}