You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/03/01 10:06:52 UTC
[incubator-iotdb] branch optimize_series_reader updated: combine
nextPage (#861)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch optimize_series_reader
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/optimize_series_reader by this push:
new 1dadf07 combine nextPage (#861)
1dadf07 is described below
commit 1dadf07efb7f81a048879ee0066f671a7a89bf74
Author: Dawei Liu <at...@163.com>
AuthorDate: Sun Mar 1 18:06:42 2020 +0800
combine nextPage (#861)
* fix valueFilterBug for nextOverlappedPage
---
.../reader/series/SeriesRawDataBatchReader.java | 64 ++++++++++++---------
.../iotdb/db/query/reader/series/SeriesReader.java | 23 +++++---
.../reader/series/SeriesReaderByTimestamp.java | 65 +++++++++++-----------
3 files changed, 85 insertions(+), 67 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java
index d1e6ea5..553fdf8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java
@@ -69,41 +69,24 @@ public class SeriesRawDataBatchReader implements ManagedSeriesReader {
if (hasCachedBatchData) {
return true;
}
-
/*
* consume overlapped data firstly
*/
- if (seriesReader.hasNextOverlappedPage()) {
- batchData = seriesReader.nextOverlappedPage();
- hasCachedBatchData = true;
- return true;
+ if (readOverlappedPage()) {
+ return hasCachedBatchData = true;
}
-
-
/*
- * consume pages secondly
+ * consume page data secondly
*/
- if (seriesReader.hasNextPage()) {
- if (!seriesReader.isPageOverlapped()) {
- batchData = seriesReader.nextPage();
- } else if (seriesReader.hasNextOverlappedPage()) {
- batchData = seriesReader.nextOverlappedPage();
- }
- hasCachedBatchData = true;
- return true;
+ if (readPageData()) {
+ return hasCachedBatchData = true;
}
-
/*
* consume next chunk finally
*/
- if (seriesReader.hasNextChunk()) {
- if (seriesReader.hasNextPage()) {
- if (!seriesReader.isPageOverlapped()) {
- batchData = seriesReader.nextPage();
- } else if (seriesReader.hasNextOverlappedPage()) {
- batchData = seriesReader.nextOverlappedPage();
- }
- hasCachedBatchData = true;
+ while (seriesReader.hasNextChunk()) {
+ if (readPageData()) {
+ return hasCachedBatchData = true;
}
}
return hasCachedBatchData;
@@ -124,7 +107,6 @@ public class SeriesRawDataBatchReader implements ManagedSeriesReader {
//no resources need to close
}
-
@Override
public boolean isManagedByQueryManager() {
return managedByQueryManager;
@@ -145,4 +127,34 @@ public class SeriesRawDataBatchReader implements ManagedSeriesReader {
this.hasRemaining = hasRemaining;
}
+
+ private boolean readPageData() throws IOException {
+ while (seriesReader.hasNextPage()) {
+ if (!seriesReader.isPageOverlapped()) {
+ batchData = seriesReader.nextPage();
+ if (!isEmpty(batchData)) {
+ return true;
+ }
+ continue;
+ }
+ if (readOverlappedPage()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean readOverlappedPage() throws IOException {
+ while (seriesReader.hasNextOverlappedPage()) {
+ batchData = seriesReader.nextOverlappedPage();
+ if (!isEmpty(batchData)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean isEmpty(BatchData batchData) {
+ return batchData == null || !batchData.hasCurrent();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index 7ef65e4..7b5fc3e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -121,9 +121,11 @@ public class SeriesReader {
public boolean hasNextChunk() throws IOException {
- if (!cachedPageReaders.isEmpty() || firstPageReader != null || mergeReader.hasNextTimeValuePair()) {
+ if (!cachedPageReaders.isEmpty() || firstPageReader != null || mergeReader
+ .hasNextTimeValuePair()) {
throw new IOException("all cached pages should be consumed first");
}
+ firstChunkMetaData = null;
// init first chunk metadata whose startTime is minimum
tryToUnpackAllOverlappedFilesToChunkMetadatas();
@@ -149,8 +151,8 @@ public class SeriesReader {
}
/**
- * This method should be called after hasNextChunk()
- * make sure that all overlapped pages are consumed before
+ * This method should be called after hasNextChunk() make sure that all overlapped pages are
+ * consumed before
*/
public boolean hasNextPage() throws IOException {
if (mergeReader.hasNextTimeValuePair()) {
@@ -182,7 +184,8 @@ public class SeriesReader {
}
- private void unpackAllOverlappedChunkMetadataToCachedPageReaders(long endTime) throws IOException {
+ private void unpackAllOverlappedChunkMetadataToCachedPageReaders(long endTime)
+ throws IOException {
while (!seqChunkMetadatas.isEmpty() && endTime >= seqChunkMetadatas.get(0).getStartTime()) {
unpackOneChunkMetaData(seqChunkMetadatas.remove(0));
}
@@ -233,7 +236,7 @@ public class SeriesReader {
}
BatchData pageData = firstPageReader.data.getAllSatisfiedPageData();
-
+ firstPageReader = null;
/*
* no value filter
* only need to consider valueFilter because timeFilter has been set into the page reader
@@ -293,8 +296,11 @@ public class SeriesReader {
unpackAllOverlappedChunkMetadataToCachedPageReaders(timeValuePair.getTimestamp());
unpackAllOverlappedCachedPageReadersToMergeReader(timeValuePair.getTimestamp());
- cachedBatchData.putAnObject(
- timeValuePair.getTimestamp(), timeValuePair.getValue().getValue());
+ if (valueFilter == null || valueFilter
+ .satisfy(timeValuePair.getTimestamp(), timeValuePair.getValue().getValue())) {
+ cachedBatchData.putAnObject(
+ timeValuePair.getTimestamp(), timeValuePair.getValue().getValue());
+ }
mergeReader.nextTimeValuePair();
@@ -433,9 +439,8 @@ public class SeriesReader {
/**
- *
* unpack all overlapped seq/unseq files and find the first chunk metadata
- *
+ * <p>
* Because there may be too many files in the scenario used by the user, we cannot open all the
* chunks at once, which may cause OOM, so we can only unpack one file at a time when needed. This
* approach is likely to be ubiquitous, but it keeps the system running smoothly
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
index a464d32..d5eb43c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
@@ -65,16 +65,29 @@ public class SeriesReaderByTimestamp implements IReaderByTimestamp {
/*
* consume overlapped data firstly
*/
- while (seriesReader.hasNextOverlappedPage()) {
- batchData = seriesReader.nextOverlappedPage();
- if (batchData.getTimeByIndex(batchData.length() - 1) >= timestamp) {
- return true;
- }
+ if (readOverlappedPage(timestamp)) {
+ return true;
}
/*
* consume pages secondly
*/
+ if (readPageData(timestamp)) {
+ return true;
+ }
+
+ /*
+ * consume next chunk
+ */
+ while (seriesReader.hasNextChunk()) {
+ if (readPageData(timestamp)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean readPageData(long timestamp) throws IOException {
while (seriesReader.hasNextPage()) {
if (!seriesReader.isPageOverlapped()) {
if (!satisfyTimeFilter(seriesReader.currentPageStatistics())) {
@@ -87,36 +100,21 @@ public class SeriesReaderByTimestamp implements IReaderByTimestamp {
}
}
}
- while (seriesReader.hasNextOverlappedPage()) {
- batchData = seriesReader.nextOverlappedPage();
- if (batchData.getTimeByIndex(batchData.length() - 1) >= timestamp) {
- return true;
- }
+ if (readOverlappedPage(timestamp)) {
+ return true;
}
}
+ return false;
+ }
- /*
- * consume next chunk
- */
- while (seriesReader.hasNextChunk()) {
- while (seriesReader.hasNextPage()) {
- if (!seriesReader.isPageOverlapped()) {
- if (!satisfyTimeFilter(seriesReader.currentPageStatistics())) {
- seriesReader.skipCurrentPage();
- continue;
- } else {
- batchData = seriesReader.nextPage();
- if (batchData.getTimeByIndex(batchData.length() - 1) >= timestamp) {
- return true;
- }
- }
- }
- while (seriesReader.hasNextOverlappedPage()) {
- batchData = seriesReader.nextOverlappedPage();
- if (batchData.getTimeByIndex(batchData.length() - 1) >= timestamp) {
- return true;
- }
- }
+ private boolean readOverlappedPage(long timestamp) throws IOException {
+ while (seriesReader.hasNextOverlappedPage()) {
+ batchData = seriesReader.nextOverlappedPage();
+ if (isEmpty(batchData)) {
+ continue;
+ }
+ if (batchData.getTimeByIndex(batchData.length() - 1) >= timestamp) {
+ return true;
}
}
return false;
@@ -126,4 +124,7 @@ public class SeriesReaderByTimestamp implements IReaderByTimestamp {
return seriesReader.getTimeFilter().satisfy(statistics);
}
+ private boolean isEmpty(BatchData batchData) {
+ return batchData == null || !batchData.hasCurrent();
+ }
}