You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/12/12 01:14:47 UTC
[incubator-iotdb] branch master updated: [IOTDB-306] new reader
with ChunkMetaData for aggregateFunction (#614)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f951bd3 [IOTDB-306] new reader with ChunkMetaData for aggregateFunction (#614)
f951bd3 is described below
commit f951bd378c75b3b432612031fc8365891a0b0d9c
Author: Dawei Liu <13...@qq.com>
AuthorDate: Thu Dec 12 09:14:37 2019 +0800
[IOTDB-306] new reader with ChunkMetaData for aggregateFunction (#614)
* new hasNextChunk and hasNextPageInCurrentChunk for IAggregateReader
---
.../db/query/aggregation/AggregateFunction.java | 4 +
.../db/query/aggregation/impl/AvgAggrFunc.java | 7 ++
.../db/query/aggregation/impl/CountAggrFunc.java | 8 ++
.../query/aggregation/impl/FirstValueAggrFunc.java | 15 +++
.../query/aggregation/impl/LastValueAggrFunc.java | 7 ++
.../db/query/aggregation/impl/MaxTimeAggrFunc.java | 7 ++
.../query/aggregation/impl/MaxValueAggrFunc.java | 7 ++
.../db/query/aggregation/impl/MinTimeAggrFunc.java | 10 ++
.../query/aggregation/impl/MinValueAggrFunc.java | 7 ++
.../db/query/executor/AggregateEngineExecutor.java | 126 ++++++++++++---------
.../iotdb/db/query/reader/IAggregateReader.java | 6 +
.../query/reader/chunkRelated/MemChunkReader.java | 17 +++
.../fileRelated/FileSeriesReaderAdapter.java | 16 +++
.../db/query/reader/universal/IterateReader.java | 30 +++++
.../read/reader/series/FileSeriesReader.java | 40 ++++++-
15 files changed, 253 insertions(+), 54 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateFunction.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateFunction.java
index 3fae3bd..0d0fb32 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateFunction.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateFunction.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -119,4 +120,7 @@ public abstract class AggregateFunction {
public TSDataType getResultDataType() {
return resultDataType;
}
+
+ public abstract void calculateValueFromChunkMetaData(
+ ChunkMetaData chunkMetaData) throws QueryProcessException;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrFunc.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrFunc.java
index 7888021..2f6a676 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrFunc.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrFunc.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -163,6 +164,12 @@ public class AvgAggrFunc extends AggregateFunction {
return false;
}
+ @Override
+ public void calculateValueFromChunkMetaData(ChunkMetaData chunkMetaData) {
+ sum += chunkMetaData.getStatistics().getSumValue();
+ cnt += chunkMetaData.getNumOfPoints();
+ }
+
/**
* Return type name of aggregation
*/
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
index cab4e90..a89d063 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.slf4j.Logger;
@@ -152,4 +153,11 @@ public class CountAggrFunc extends AggregateFunction {
public boolean isCalculatedAggregationResult() {
return false;
}
+
+ @Override
+ public void calculateValueFromChunkMetaData(ChunkMetaData chunkMetaData) {
+ long preValue = resultData.getLongRet();
+ preValue += chunkMetaData.getNumOfPoints();
+ resultData.setLongRet(preValue);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrFunc.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrFunc.java
index 6a64c6c..acf9d95 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrFunc.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrFunc.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -151,4 +152,18 @@ public class FirstValueAggrFunc extends AggregateFunction {
public boolean isCalculatedAggregationResult() {
return resultData.isSetTime();
}
+
+ @Override
+ public void calculateValueFromChunkMetaData(ChunkMetaData chunkMetaData)
+ throws QueryProcessException {
+ if (resultData.isSetTime()) {
+ return;
+ }
+
+ Object firstVal = chunkMetaData.getStatistics().getFirstValue();
+ if (firstVal == null) {
+ throw new QueryProcessException("chunkMetaData contains no FIRST value");
+ }
+ resultData.putTimeAndValue(0, firstVal);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrFunc.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrFunc.java
index 643a2c9..94a58f8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrFunc.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrFunc.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -138,6 +139,12 @@ public class LastValueAggrFunc extends AggregateFunction {
return false;
}
+ @Override
+ public void calculateValueFromChunkMetaData(ChunkMetaData chunkMetaData) {
+ Object lastVal = chunkMetaData.getStatistics().getLastValue();
+ updateLastResult(chunkMetaData.getEndTime(), lastVal);
+ }
+
private void updateLastResult(long time, Object value) {
if (!resultData.isSetTime()) {
resultData.putTimeAndValue(time, value);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java
index 5a566a7..d4251f3 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -126,6 +127,12 @@ public class MaxTimeAggrFunc extends AggregateFunction {
return false;
}
+ @Override
+ public void calculateValueFromChunkMetaData(ChunkMetaData chunkMetaData) {
+ long maxTimestamp = chunkMetaData.getEndTime();
+ updateMaxTimeResult(0, maxTimestamp);
+ }
+
private void updateMaxTimeResult(long time, long value) {
if (!resultData.isSetValue() || value >= resultData.getLongRet()) {
resultData.setTimestamp(time);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java
index c4d39a5..a459793 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -159,6 +160,12 @@ public class MaxValueAggrFunc extends AggregateFunction {
return false;
}
+ @Override
+ public void calculateValueFromChunkMetaData(ChunkMetaData chunkMetaData) {
+ Comparable<Object> maxVal = (Comparable<Object>) chunkMetaData.getStatistics().getMaxValue();
+ updateResult(maxVal);
+ }
+
private void updateResult(Comparable<Object> maxVal) {
if (maxVal == null) {
return;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java
index b237bcb..7e1487a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -153,4 +154,13 @@ public class MinTimeAggrFunc extends AggregateFunction {
return resultData.isSetValue();
}
+ @Override
+ public void calculateValueFromChunkMetaData(ChunkMetaData chunkMetaData) {
+ if (resultData.isSetValue()) {
+ return;
+ }
+ long time = chunkMetaData.getStartTime();
+ resultData.putTimeAndValue(0, time);
+ }
+
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java
index 323f3c4..b91fd2e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -154,6 +155,12 @@ public class MinValueAggrFunc extends AggregateFunction {
return false;
}
+ @Override
+ public void calculateValueFromChunkMetaData(ChunkMetaData chunkMetaData) {
+ Comparable<Object> minVal = (Comparable<Object>) chunkMetaData.getStatistics().getMinValue();
+ updateResult(minVal);
+ }
+
private void updateResult(Comparable<Object> minVal) {
if (minVal == null) {
return;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
index 84fd17d..73536e9 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
@@ -30,8 +30,10 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.query.aggregation.AggreResultData;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
+import org.apache.iotdb.db.query.aggregation.impl.FirstValueAggrFunc;
import org.apache.iotdb.db.query.aggregation.impl.LastValueAggrFunc;
import org.apache.iotdb.db.query.aggregation.impl.MaxTimeAggrFunc;
+import org.apache.iotdb.db.query.aggregation.impl.MinTimeAggrFunc;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.dataset.AggreResultDataPointReader;
@@ -45,6 +47,7 @@ import org.apache.iotdb.db.query.reader.resourceRelated.UnseqResourceMergeReader
import org.apache.iotdb.db.query.reader.seriesRelated.SeriesReaderByTimestamp;
import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Path;
@@ -134,10 +137,10 @@ public class AggregateEngineExecutor {
/**
* calculation aggregate result with only time filter or no filter for one series.
*
- * @param function aggregate function
- * @param sequenceReader sequence data reader
+ * @param function aggregate function
+ * @param sequenceReader sequence data reader
* @param unSequenceReader unsequence data reader
- * @param filter time filter or null
+ * @param filter time filter or null
* @return one series aggregate result data
*/
private AggreResultData aggregateWithoutValueFilter(AggregateFunction function,
@@ -148,23 +151,38 @@ public class AggregateEngineExecutor {
filter);
}
- while (sequenceReader.hasNext()) {
- PageHeader pageHeader = sequenceReader.nextPageHeader();
- // judge if overlap with unsequence data
- if (canUseHeader(function, pageHeader, unSequenceReader, filter)) {
- // cal by pageHeader
- function.calculateValueFromPageHeader(pageHeader);
- sequenceReader.skipPageData();
- } else {
- // cal by pageData
- function.calculateValueFromPageData(sequenceReader.nextBatch(), unSequenceReader);
+ while (sequenceReader.hasNextChunk()) {
+ ChunkMetaData chunkMetaData = sequenceReader.nextChunkMeta();
+ if (chunkMetaData != null && canUseHeader(function, chunkMetaData.getStartTime(),
+ chunkMetaData.getEndTime(), unSequenceReader, filter)) {
+ function.calculateValueFromChunkMetaData(chunkMetaData);
+ if (isEarlyBreakFunc(function)) {
+ break;
+ }
+ continue;
}
+ while (sequenceReader.hasNextPageInCurrentChunk()) {
+ PageHeader pageHeader = sequenceReader.nextPageHeader();
+ // judge if overlap with unsequence data
+ if (pageHeader != null && canUseHeader(function, pageHeader.getStartTime(),
+ pageHeader.getEndTime(),
+ unSequenceReader, filter)) {
+ // cal by pageHeader
+ function.calculateValueFromPageHeader(pageHeader);
+ if (isEarlyBreakFunc(function)) {
+ break;
+ }
+ sequenceReader.skipPageData();
+ } else {
+ // cal by pageData
+ function.calculateValueFromPageData(sequenceReader.nextBatch(), unSequenceReader);
+ }
- if (function.isCalculatedAggregationResult()) {
- return function.getResult();
+ if (function.isCalculatedAggregationResult()) {
+ return function.getResult();
+ }
}
}
-
// cal with unsequence data
if (unSequenceReader.hasNext()) {
function.calculateValueFromUnsequenceReader(unSequenceReader);
@@ -172,20 +190,19 @@ public class AggregateEngineExecutor {
return function.getResult();
}
+ private boolean isEarlyBreakFunc(AggregateFunction function) {
+ if (function instanceof FirstValueAggrFunc || function instanceof MinTimeAggrFunc) {
+ return true;
+ }
+ return false;
+ }
+
/**
* determine whether pageHeader can be used to compute aggregation results.
*/
- private boolean canUseHeader(AggregateFunction function, PageHeader pageHeader,
+ private boolean canUseHeader(AggregateFunction function, long minTime, long maxTime,
IPointReader unSequenceReader, Filter filter)
throws IOException, QueryProcessException {
- // if page data is memory data.
- if (pageHeader == null) {
- return false;
- }
-
- long minTime = pageHeader.getStartTime();
- long maxTime = pageHeader.getEndTime();
-
// If there are points in the page that do not satisfy the time filter,
// page header cannot be used to calculate.
if (filter != null && !filter.containStartEndTime(minTime, maxTime)) {
@@ -202,8 +219,8 @@ public class AggregateEngineExecutor {
/**
* handle last and max_time aggregate function with only time filter or no filter.
*
- * @param function aggregate function
- * @param sequenceReader sequence data reader
+ * @param function aggregate function
+ * @param sequenceReader sequence data reader
* @param unSequenceReader unsequence data reader
* @return BatchData-aggregate result
*/
@@ -211,41 +228,46 @@ public class AggregateEngineExecutor {
IAggregateReader sequenceReader, IPointReader unSequenceReader, Filter timeFilter)
throws IOException, QueryProcessException {
long lastBatchTimeStamp = Long.MIN_VALUE;
- boolean isChunkEnd = false;
- while (sequenceReader.hasNext()) {
- PageHeader pageHeader = sequenceReader.nextPageHeader();
- // judge if overlap with unsequence data
- if (canUseHeader(function, pageHeader, unSequenceReader, timeFilter)) {
- // cal by pageHeader
- function.calculateValueFromPageHeader(pageHeader);
- sequenceReader.skipPageData();
-
- if (lastBatchTimeStamp > pageHeader.getStartTime()) {
- // the chunk is end.
- isChunkEnd = true;
- } else {
- // current page and last page are in the same chunk.
+
+ while (sequenceReader.hasNextChunk()) {
+ ChunkMetaData chunkMetaData = sequenceReader.nextChunkMeta();
+ // if can use chunkMetaData ,skip this chunk
+ if (chunkMetaData != null && canUseHeader(function, chunkMetaData.getStartTime(),
+ chunkMetaData.getEndTime(), unSequenceReader, timeFilter)) {
+ function.calculateValueFromChunkMetaData(chunkMetaData);
+ //if this chunk is last chunk, broken out of the loop
+ if (lastBatchTimeStamp > chunkMetaData.getStartTime()) {
+ break;
+ }
+ lastBatchTimeStamp = chunkMetaData.getStartTime();
+ continue;
+ }
+ //if can't use chunkMetaData, try to use pageHeader
+ while (sequenceReader.hasNextPageInCurrentChunk()) {
+ PageHeader pageHeader = sequenceReader.nextPageHeader();
+ if (pageHeader != null && canUseHeader(function, pageHeader.getStartTime(),
+ pageHeader.getEndTime(),
+ unSequenceReader, timeFilter)) {
+ // cal by pageHeader
+ function.calculateValueFromPageHeader(pageHeader);
+ sequenceReader.skipPageData();
+ //if this page is last chunk, broken out of the loop
+ if (lastBatchTimeStamp > pageHeader.getStartTime()) {
+ break;
+ }
lastBatchTimeStamp = pageHeader.getStartTime();
+ continue;
}
- } else {
- // cal by pageData
BatchData batchData = sequenceReader.nextBatch();
if (batchData.length() > 0) {
if (lastBatchTimeStamp > batchData.currentTime()) {
- // the chunk is end.
- isChunkEnd = true;
- } else {
- // current page and last page are in the same chunk.
- lastBatchTimeStamp = batchData.currentTime();
+ break;
}
+ lastBatchTimeStamp = batchData.currentTime();
function.calculateValueFromPageData(batchData, unSequenceReader);
}
}
- if (isChunkEnd) {
- break;
- }
}
-
// cal with unsequence data
if (unSequenceReader.hasNext()) {
function.calculateValueFromUnsequenceReader(unSequenceReader);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/IAggregateReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/IAggregateReader.java
index 7cb9112..419ca32 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/IAggregateReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/IAggregateReader.java
@@ -20,9 +20,15 @@ package org.apache.iotdb.db.query.reader;
import java.io.IOException;
import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
public interface IAggregateReader extends IBatchReader {
+ boolean hasNextChunk() throws IOException;
+
+ ChunkMetaData nextChunkMeta();
+
+ boolean hasNextPageInCurrentChunk() throws IOException;
/**
* Returns meta-information of batch data.
* <p>
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/MemChunkReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/MemChunkReader.java
index b0885ae..1193ff9 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/MemChunkReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/MemChunkReader.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.query.reader.chunkRelated;
+import java.io.IOException;
import java.util.Iterator;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.query.reader.IAggregateReader;
@@ -25,6 +26,7 @@ import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.fileRelated.UnSealedTsFileIterateReader;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
@@ -112,6 +114,21 @@ public class MemChunkReader implements IPointReader, IAggregateReader {
}
@Override
+ public boolean hasNextChunk() throws IOException {
+ return hasNext();
+ }
+
+ @Override
+ public ChunkMetaData nextChunkMeta() {
+ return null;
+ }
+
+ @Override
+ public boolean hasNextPageInCurrentChunk() throws IOException {
+ return hasNext();
+ }
+
+ @Override
public PageHeader nextPageHeader() {
return null;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/fileRelated/FileSeriesReaderAdapter.java b/server/src/main/java/org/apache/iotdb/db/query/reader/fileRelated/FileSeriesReaderAdapter.java
index ff048fd..e79e10a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/fileRelated/FileSeriesReaderAdapter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/fileRelated/FileSeriesReaderAdapter.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.query.reader.fileRelated;
import java.io.IOException;
import org.apache.iotdb.db.query.reader.IAggregateReader;
import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReader;
@@ -43,6 +44,21 @@ public class FileSeriesReaderAdapter implements IAggregateReader {
}
@Override
+ public boolean hasNextChunk() throws IOException {
+ return fileSeriesReader.hasNextChunk();
+ }
+
+ @Override
+ public ChunkMetaData nextChunkMeta() {
+ return fileSeriesReader.currentChunkMeta();
+ }
+
+ @Override
+ public boolean hasNextPageInCurrentChunk() throws IOException {
+ return fileSeriesReader.hasNextPageInCurrentChunk();
+ }
+
+ @Override
public PageHeader nextPageHeader() throws IOException {
return fileSeriesReader.nextPageHeader();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/IterateReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/IterateReader.java
index 58ce739..36d9f66 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/IterateReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/IterateReader.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.query.reader.universal;
import java.io.IOException;
import org.apache.iotdb.db.query.reader.IAggregateReader;
import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.read.common.BatchData;
/**
@@ -84,6 +85,35 @@ public abstract class IterateReader implements IAggregateReader {
}
@Override
+ public boolean hasNextChunk() throws IOException {
+
+ if (curReaderInitialized && currentSeriesReader.hasNextChunk()) {
+ return true;
+ } else {
+ curReaderInitialized = false;
+ }
+
+ while (nextSeriesReaderIndex < readerSize) {
+ boolean isConstructed = constructNextReader(nextSeriesReaderIndex++);
+ if (isConstructed && currentSeriesReader.hasNextChunk()) {
+ curReaderInitialized = true;
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public boolean hasNextPageInCurrentChunk() throws IOException {
+ return currentSeriesReader.hasNextPageInCurrentChunk();
+ }
+
+ @Override
+ public ChunkMetaData nextChunkMeta() {
+ return currentSeriesReader.nextChunkMeta();
+ }
+
+ @Override
public void close() {
// file stream is managed in QueryResourceManager.
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReader.java
index ca43763..5f71935 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReader.java
@@ -38,6 +38,7 @@ public abstract class FileSeriesReader {
private int chunkToRead;
private BatchData data;
+ private ChunkMetaData chunkMetaData;
/**
* constructor of FileSeriesReader.
@@ -49,9 +50,9 @@ public abstract class FileSeriesReader {
}
/**
- * check if current chunk has next batch data.
+ * check if all chunks has next batch data.
*
- * @return True if current chunk has next batch data
+ * @return True if all chunks has next batch data
*/
public boolean hasNextBatch() throws IOException {
@@ -107,4 +108,39 @@ public abstract class FileSeriesReader {
private ChunkMetaData nextChunkMeta() {
return chunkMetaDataList.get(chunkToRead++);
}
+
+ /**
+ * check current file has next chunk.
+ *
+ * @return True if current file has next chunk data
+ */
+ public boolean hasNextPageInCurrentChunk() throws IOException {
+ if (chunkReader != null && chunkReader.hasNextBatch()) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * check current chunk has next batch data.
+ *
+ * @return
+ */
+ public boolean hasNextChunk() throws IOException {
+ // current file still have chunks, init new chunk reader
+ while (chunkToRead < chunkMetaDataList.size()) {
+
+ chunkMetaData = nextChunkMeta();
+ if (chunkSatisfied(chunkMetaData)) {
+ // chunk metadata satisfy the condition
+ initChunkReader(chunkMetaData);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public ChunkMetaData currentChunkMeta() {
+ return chunkMetaData;
+ }
}