You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/12/09 07:01:46 UTC
[incubator-iotdb] 01/01: add batch reader interfaces
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch f_batch_reader
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 998eec6bf32035dbd2ec6805cd347695ee8c570c
Author: qiaojialin <64...@qq.com>
AuthorDate: Mon Dec 9 15:01:25 2019 +0800
add batch reader interfaces
---
.../resources/conf/iotdb-engine.properties | 4 +-
.../db/engine/merge/task/MergeMultiChunkTask.java | 8 +-
.../engine/storagegroup/StorageGroupProcessor.java | 11 +-
.../db/engine/storagegroup/TsFileResource.java | 3 +
.../groupby/GroupByWithoutValueFilterDataSet.java | 10 +-
.../db/query/executor/AggregateEngineExecutor.java | 10 +-
.../query/externalsort/ExternalSortJobEngine.java | 3 +
.../MultiSourceExternalSortJobPart.java | 62 ++--
.../externalsort/SimpleExternalSortEngine.java | 329 +++++++++++----------
.../iotdb/db/query/reader/IAggregateReader.java | 35 ---
.../reader/chunkRelated/CachedDiskChunkReader.java | 22 +-
.../query/reader/chunkRelated/ChunkReaderWrap.java | 20 +-
.../query/reader/chunkRelated/DiskChunkReader.java | 29 +-
.../query/reader/chunkRelated/MemChunkReader.java | 23 +-
.../fileRelated/FileSeriesReaderAdapter.java | 69 -----
.../fileRelated/UnSealedTsFileIterateReader.java | 28 +-
.../CachedUnseqResourceMergeReader.java | 6 +-
.../resourceRelated/SeqResourceIterateReader.java | 20 +-
.../resourceRelated/UnseqResourceMergeReader.java | 61 +++-
.../seriesRelated/SeriesReaderWithValueFilter.java | 62 ++--
.../SeriesReaderWithoutValueFilter.java | 144 +++++----
.../universal/CachedPriorityMergeReader.java | 2 +
.../db/query/reader/universal/IterateReader.java | 27 +-
.../reader/universal/PriorityMergeReader.java | 110 ++++---
.../java/org/apache/iotdb/db/utils/MergeUtils.java | 8 +-
.../db/query/externalsort/FakeChunkReaderWrap.java | 3 +
.../reader/seriesRelated/FakedIBatchPoint.java | 4 +-
.../SeriesReaderWithValueFilterTest.java | 5 +-
.../SeriesReaderWithoutValueFilterTest.java | 2 +-
.../writelog/recover/UnseqTsFileRecoverTest.java | 8 +-
.../query/dataset/DataSetWithoutTimeGenerator.java | 10 +-
.../tsfile/read/query/executor/TsFileExecutor.java | 17 +-
.../query/timegenerator/TimeGeneratorImpl.java | 8 +-
.../read/query/timegenerator/node/LeafNode.java | 6 +-
.../iotdb/tsfile/read}/reader/IBatchReader.java | 4 +-
.../{ChunkReader.java => AbstractChunkReader.java} | 22 +-
.../tsfile/read/reader/chunk/ChunkReader.java | 148 +--------
.../read/reader/chunk/ChunkReaderByTimestamp.java | 4 +-
.../read/reader/chunk/ChunkReaderWithFilter.java | 42 ---
.../reader/chunk/ChunkReaderWithoutFilter.java | 35 ---
...esReader.java => AbstractFileSeriesReader.java} | 26 +-
.../read/reader/series/EmptyFileSeriesReader.java | 4 +-
.../read/reader/series/FileSeriesReader.java | 89 ++----
.../reader/series/FileSeriesReaderByTimestamp.java | 28 +-
.../reader/series/FileSeriesReaderWithFilter.java | 54 ----
.../series/FileSeriesReaderWithoutFilter.java | 49 ---
.../tsfile/read/query/timegenerator/NodeTest.java | 6 +-
.../query/timegenerator/ReaderByTimestampTest.java | 6 +-
.../iotdb/tsfile/read/reader/ReaderTest.java | 12 +-
49 files changed, 645 insertions(+), 1053 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index abc5411..3a269f5 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -304,13 +304,13 @@ schema_manager_cache_size=300000
### External sort Configuration
####################
# Is external sort enable
-enable_external_sort=true
+enable_external_sort=false
# The maximum number of simultaneous chunk reading for a single time series.
# If the num of simultaneous chunk reading is greater than external_sort_threshold, external sorting is used.
# When external_sort_threshold increases, the number of chunks sorted at the same time in memory may increase and this will occupy more memory.
# When external_sort_threshold decreases, triggering external sorting will increase the time-consuming.
-external_sort_threshold = 60
+external_sort_threshold = 1000
####################
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
index 7bf6488..730fde0 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
@@ -49,8 +49,8 @@ import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
-import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
@@ -401,9 +401,9 @@ class MergeMultiChunkTask {
private int writeChunkWithUnseq(Chunk chunk, IChunkWriter chunkWriter, IPointReader unseqReader,
long chunkLimitTime, int pathIdx) throws IOException {
int cnt = 0;
- ChunkReader chunkReader = new ChunkReaderWithoutFilter(chunk);
- while (chunkReader.hasNextBatch()) {
- BatchData batchData = chunkReader.nextBatch();
+ AbstractChunkReader AbstractChunkReader = new ChunkReader(chunk);
+ while (AbstractChunkReader.hasNextBatch()) {
+ BatchData batchData = AbstractChunkReader.nextBatch();
cnt += mergeWriteBatch(batchData, chunkWriter, unseqReader, pathIdx);
}
cnt += writeRemainingUnseq(chunkWriter, unseqReader, chunkLimitTime, pathIdx);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index ead863e..5369b88 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -801,14 +801,11 @@ public class StorageGroupProcessor {
tsfileResourcesForQuery.add(tsFileResource);
} else {
// left: in-memory data, right: meta of disk data
- Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair;
- pair = tsFileResource
- .getUnsealedFileProcessor()
+ Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = tsFileResource.getUnsealedFileProcessor()
.query(deviceId, measurementId, dataType, mSchema.getProps(), context);
- tsfileResourcesForQuery
- .add(new TsFileResource(tsFileResource.getFile(),
- tsFileResource.getStartTimeMap(),
- tsFileResource.getEndTimeMap(), pair.left, pair.right));
+
+ tsfileResourcesForQuery.add(new TsFileResource(tsFileResource.getFile(),
+ tsFileResource.getStartTimeMap(), tsFileResource.getEndTimeMap(), pair.left, pair.right));
}
} finally {
closeQueryLock.readLock().unlock();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 68a967a..32817cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -105,6 +105,9 @@ public class TsFileResource {
this.closed = true;
}
+ /**
+ * unsealed TsFile
+ */
public TsFileResource(File file,
Map<String, Long> startTimeMap,
Map<String, Long> endTimeMap,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
index 1cdb9c3..7ee1beb 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.db.query.aggregation.AggreResultData;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
-import org.apache.iotdb.db.query.reader.IAggregateReader;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.resourceRelated.SeqResourceIterateReader;
import org.apache.iotdb.db.query.reader.resourceRelated.UnseqResourceMergeReader;
@@ -44,11 +43,12 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.iotdb.tsfile.read.reader.IBatchReader;
public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
private List<IPointReader> unSequenceReaderList;
- private List<IAggregateReader> sequenceReaderList;
+ private List<IBatchReader> sequenceReaderList;
private List<BatchData> batchDataList;
private List<Boolean> hasCachedSequenceDataList;
private Filter timeFilter;
@@ -87,7 +87,7 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
timeFilter = queryDataSource.updateTimeFilter(timeFilter);
// sequence reader for sealed tsfile, unsealed tsfile, memory
- IAggregateReader seqResourceIterateReader = new SeqResourceIterateReader(
+ IBatchReader seqResourceIterateReader = new SeqResourceIterateReader(
queryDataSource.getSeriesPath(), queryDataSource.getSeqResources(), timeFilter, context,
false);
@@ -133,7 +133,7 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
*/
private AggreResultData nextSeries(int idx) throws IOException, QueryProcessException {
IPointReader unsequenceReader = unSequenceReaderList.get(idx);
- IAggregateReader sequenceReader = sequenceReaderList.get(idx);
+ IBatchReader sequenceReader = sequenceReaderList.get(idx);
AggregateFunction function = functions.get(idx);
function.init();
@@ -222,7 +222,7 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
* @param unsequenceReader unsequence Reader
* @throws IOException exception when reading file
*/
- private void skipBeforeStartTimeData(int idx, IAggregateReader sequenceReader,
+ private void skipBeforeStartTimeData(int idx, IBatchReader sequenceReader,
IPointReader unsequenceReader)
throws IOException {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
index 84fd17d..766b0c8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
@@ -37,7 +37,6 @@ import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.dataset.AggreResultDataPointReader;
import org.apache.iotdb.db.query.dataset.EngineDataSetWithoutValueFilter;
import org.apache.iotdb.db.query.factory.AggreFuncFactory;
-import org.apache.iotdb.db.query.reader.IAggregateReader;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.db.query.reader.resourceRelated.SeqResourceIterateReader;
@@ -52,6 +51,7 @@ import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.read.reader.IBatchReader;
public class AggregateEngineExecutor {
@@ -87,7 +87,7 @@ public class AggregateEngineExecutor {
timeFilter = ((GlobalTimeExpression) expression).getFilter();
}
- List<IAggregateReader> readersOfSequenceData = new ArrayList<>();
+ List<IBatchReader> readersOfSequenceData = new ArrayList<>();
List<IPointReader> readersOfUnSequenceData = new ArrayList<>();
List<AggregateFunction> aggregateFunctions = new ArrayList<>();
for (int i = 0; i < selectedSeries.size(); i++) {
@@ -104,7 +104,7 @@ public class AggregateEngineExecutor {
timeFilter = queryDataSource.updateTimeFilter(timeFilter);
// sequence reader for sealed tsfile, unsealed tsfile, memory
- IAggregateReader seqResourceIterateReader;
+ IBatchReader seqResourceIterateReader;
if (function instanceof MaxTimeAggrFunc || function instanceof LastValueAggrFunc) {
seqResourceIterateReader = new SeqResourceIterateReader(queryDataSource.getSeriesPath(),
queryDataSource.getSeqResources(), timeFilter, context, true);
@@ -141,7 +141,7 @@ public class AggregateEngineExecutor {
* @return one series aggregate result data
*/
private AggreResultData aggregateWithoutValueFilter(AggregateFunction function,
- IAggregateReader sequenceReader, IPointReader unSequenceReader, Filter filter)
+ IBatchReader sequenceReader, IPointReader unSequenceReader, Filter filter)
throws IOException, QueryProcessException {
if (function instanceof MaxTimeAggrFunc || function instanceof LastValueAggrFunc) {
return handleLastMaxTimeWithOutTimeGenerator(function, sequenceReader, unSequenceReader,
@@ -208,7 +208,7 @@ public class AggregateEngineExecutor {
* @return BatchData-aggregate result
*/
private AggreResultData handleLastMaxTimeWithOutTimeGenerator(AggregateFunction function,
- IAggregateReader sequenceReader, IPointReader unSequenceReader, Filter timeFilter)
+ IBatchReader sequenceReader, IPointReader unSequenceReader, Filter timeFilter)
throws IOException, QueryProcessException {
long lastBatchTimeStamp = Long.MIN_VALUE;
boolean isChunkEnd = false;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/ExternalSortJobEngine.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/ExternalSortJobEngine.java
index bb6dea9..3e67413 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/externalsort/ExternalSortJobEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/ExternalSortJobEngine.java
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.db.query.reader.chunkRelated.ChunkReaderWrap;
+import org.apache.iotdb.tsfile.read.reader.IBatchReader;
public interface ExternalSortJobEngine {
@@ -37,6 +38,8 @@ public interface ExternalSortJobEngine {
List<IPointReader> executeForIPointReader(long queryId, List<ChunkReaderWrap>
chunkReaderWraps) throws IOException;
+ List<IBatchReader> executeForIBatchReader(long queryId, List<IBatchReader> batchReaders)
+ throws IOException;
/**
* Receive a list of chunkReaderWraps and judge whether it should be processed using external
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/MultiSourceExternalSortJobPart.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/MultiSourceExternalSortJobPart.java
index c441615..80ab445 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/externalsort/MultiSourceExternalSortJobPart.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/MultiSourceExternalSortJobPart.java
@@ -16,45 +16,35 @@
* specific language governing permissions and limitations
* under the License.
*/
- package org.apache.iotdb.db.query.externalsort;
+package org.apache.iotdb.db.query.externalsort;
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.List;
- import org.apache.iotdb.db.query.reader.IPointReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.db.query.reader.IPointReader;
+public class MultiSourceExternalSortJobPart extends ExternalSortJobPart {
- public class MultiSourceExternalSortJobPart extends ExternalSortJobPart {
+ private String tmpFilePath;
+ private List<ExternalSortJobPart> source;
+ private long queryId;
- private String tmpFilePath;
- private List<ExternalSortJobPart> source;
- private long queryId;
+ public MultiSourceExternalSortJobPart(long queryId, String tmpFilePath,
+ List<ExternalSortJobPart> source) {
+ super(ExternalSortJobPartType.MULTIPLE_SOURCE);
+ this.source = source;
+ this.tmpFilePath = tmpFilePath;
+ this.queryId = queryId;
+ }
- public MultiSourceExternalSortJobPart(long queryId, String tmpFilePath,
- List<ExternalSortJobPart> source) {
- super(ExternalSortJobPartType.MULTIPLE_SOURCE);
- this.source = source;
- this.tmpFilePath = tmpFilePath;
- this.queryId = queryId;
- }
- public MultiSourceExternalSortJobPart(long queryId, String tmpFilePath,
- ExternalSortJobPart... externalSortJobParts) {
- super(ExternalSortJobPartType.MULTIPLE_SOURCE);
- source = new ArrayList<>();
- for (ExternalSortJobPart externalSortJobPart : externalSortJobParts) {
- source.add(externalSortJobPart);
- }
- this.tmpFilePath = tmpFilePath;
- }
-
- @Override
- public IPointReader executeForIPointReader() throws IOException {
- List<IPointReader> prioritySeriesReaders = new ArrayList<>();
- for (ExternalSortJobPart part : source) {
- prioritySeriesReaders.add(part.executeForIPointReader());
- }
- LineMerger merger = new LineMerger(queryId, tmpFilePath);
- return merger.merge(prioritySeriesReaders);
- }
- }
+ @Override
+ public IPointReader executeForIPointReader() throws IOException {
+ List<IPointReader> prioritySeriesReaders = new ArrayList<>();
+ for (ExternalSortJobPart part : source) {
+ prioritySeriesReaders.add(part.executeForIPointReader());
+ }
+ LineMerger merger = new LineMerger(queryId, tmpFilePath);
+ return merger.merge(prioritySeriesReaders);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/SimpleExternalSortEngine.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/SimpleExternalSortEngine.java
index 94b8e65..610cd27 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/externalsort/SimpleExternalSortEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/SimpleExternalSortEngine.java
@@ -16,164 +16,171 @@
* specific language governing permissions and limitations
* under the License.
*/
- package org.apache.iotdb.db.query.externalsort;
-
- import java.io.File;
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.List;
- import org.apache.commons.io.FileUtils;
- import org.apache.iotdb.db.conf.IoTDBDescriptor;
- import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException;
- import org.apache.iotdb.db.query.externalsort.adapter.ByTimestampReaderAdapter;
- import org.apache.iotdb.db.query.reader.IPointReader;
- import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
- import org.apache.iotdb.db.query.reader.chunkRelated.ChunkReaderWrap;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
-
- public class SimpleExternalSortEngine implements ExternalSortJobEngine {
-
- private ExternalSortJobScheduler scheduler;
-
- private String queryDir;
- private int minExternalSortSourceCount;
- private boolean enableExternalSort;
- private static final Logger logger = LoggerFactory.getLogger(SimpleExternalSortEngine.class);
-
- private SimpleExternalSortEngine() {
- queryDir = IoTDBDescriptor.getInstance().getConfig().getQueryDir() + File.separator;
- minExternalSortSourceCount = IoTDBDescriptor.getInstance().getConfig()
- .getExternalSortThreshold();
- enableExternalSort = IoTDBDescriptor.getInstance().getConfig().isEnableExternalSort();
- scheduler = ExternalSortJobScheduler.getInstance();
-
- // create queryDir
- try {
- FileUtils.deleteDirectory(new File(queryDir));
- FileUtils.forceMkdir(new File(queryDir));
- } catch (IOException e) {
- throw new StorageEngineFailureException(e);
- }
- }
-
- @Override
- public List<IPointReader> executeForIPointReader(long queryId,
- List<ChunkReaderWrap> chunkReaderWraps)
- throws IOException {
- if (!enableExternalSort || chunkReaderWraps.size() < minExternalSortSourceCount) {
- return generateIPointReader(chunkReaderWraps, 0, chunkReaderWraps.size());
- }
- if (logger.isInfoEnabled()) {
- logger.info("query {} measurement {} uses external sort.", queryId,
- chunkReaderWraps.get(0).getMeasurementUid());
- }
- ExternalSortJob job = createJob(queryId, chunkReaderWraps);
- return job.executeForIPointReader();
- }
-
- @Override
- public List<IReaderByTimestamp> executeForByTimestampReader(long queryId,
- List<ChunkReaderWrap> chunkReaderWraps) throws IOException {
- if (!enableExternalSort || chunkReaderWraps.size() < minExternalSortSourceCount) {
- return generateIReaderByTimestamp(chunkReaderWraps, 0, chunkReaderWraps.size());
- }
- if (logger.isInfoEnabled()) {
- logger.info("query {} measurement {} uses external sort.", queryId,
- chunkReaderWraps.get(0).getMeasurementUid());
- }
- ExternalSortJob job = createJob(queryId, chunkReaderWraps);
- return convert(job.executeForIPointReader());
- }
-
- @Override
- public ExternalSortJob createJob(long queryId, List<ChunkReaderWrap> readerWrapList) {
- long jodId = scheduler.genJobId();
- List<ExternalSortJobPart> ret = new ArrayList<>();
- for (ChunkReaderWrap readerWrap : readerWrapList) {
- ret.add(new SingleSourceExternalSortJobPart(readerWrap));
- }
-
- int partId = 0;
- while (ret.size() >= minExternalSortSourceCount) {
- List<ExternalSortJobPart> tmpPartList = new ArrayList<>();
- for (int i = 0; i < ret.size(); ) {
- int toIndex = Math.min(i + minExternalSortSourceCount, ret.size());
- List<ExternalSortJobPart> partGroup = ret.subList(i, toIndex);
- i = toIndex;
- StringBuilder tmpFilePath = new StringBuilder(queryDir).append(jodId).append("_")
- .append(partId);
- MultiSourceExternalSortJobPart part = new MultiSourceExternalSortJobPart(queryId,
- tmpFilePath.toString(), partGroup);
- tmpPartList.add(part);
- partId++;
- }
- ret = tmpPartList;
- }
- return new ExternalSortJob(jodId, ret);
- }
-
- public String getQueryDir() {
- return queryDir;
- }
-
- public void setQueryDir(String queryDir) {
- this.queryDir = queryDir;
- }
-
- public int getMinExternalSortSourceCount() {
- return minExternalSortSourceCount;
- }
-
- public void setMinExternalSortSourceCount(int minExternalSortSourceCount) {
- this.minExternalSortSourceCount = minExternalSortSourceCount;
- }
-
- /**
- * init IPointReader with ChunkReaderWrap.
- */
- private List<IPointReader> generateIPointReader(List<ChunkReaderWrap> readerWraps,
- final int start, final int size) throws IOException {
- List<IPointReader> pointReaderList = new ArrayList<>();
- for (int i = start; i < start + size; i++) {
- pointReaderList.add(readerWraps.get(i).getIPointReader());
- }
- return pointReaderList;
- }
-
- /**
- * init IReaderByTimestamp with ChunkReaderWrap.
- */
- private List<IReaderByTimestamp> generateIReaderByTimestamp(List<ChunkReaderWrap> readerWraps,
- final int start, final int size) throws IOException {
- List<IReaderByTimestamp> readerByTimestampList = new ArrayList<>();
- for (int i = start; i < start + size; i++) {
- readerByTimestampList.add(readerWraps.get(i).getIReaderByTimestamp());
- }
- return readerByTimestampList;
- }
-
- /**
- * convert IPointReader to implement interface of IReaderByTimestamp.
- *
- * @param pointReaderList reader list that implements IPointReader
- * @return reader list that implements IReaderByTimestamp
- */
- private List<IReaderByTimestamp> convert(List<IPointReader> pointReaderList) {
- List<IReaderByTimestamp> readerByTimestampList = new ArrayList<>();
- for (IPointReader pointReader : pointReaderList) {
- readerByTimestampList.add(new ByTimestampReaderAdapter(pointReader));
- }
- return readerByTimestampList;
- }
-
- private static class SimpleExternalSortJobEngineHelper {
-
- private static SimpleExternalSortEngine INSTANCE = new SimpleExternalSortEngine();
- }
-
- public static SimpleExternalSortEngine getInstance() {
- return SimpleExternalSortJobEngineHelper.INSTANCE;
- }
- }
+package org.apache.iotdb.db.query.externalsort;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException;
+import org.apache.iotdb.db.query.externalsort.adapter.ByTimestampReaderAdapter;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
+import org.apache.iotdb.db.query.reader.chunkRelated.ChunkReaderWrap;
+import org.apache.iotdb.tsfile.read.reader.IBatchReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class SimpleExternalSortEngine implements ExternalSortJobEngine {
+
+ private ExternalSortJobScheduler scheduler;
+
+ private String queryDir;
+ private int minExternalSortSourceCount;
+ private boolean enableExternalSort;
+ private static final Logger logger = LoggerFactory.getLogger(SimpleExternalSortEngine.class);
+
+ private SimpleExternalSortEngine() {
+ queryDir = IoTDBDescriptor.getInstance().getConfig().getQueryDir() + File.separator;
+ minExternalSortSourceCount = IoTDBDescriptor.getInstance().getConfig()
+ .getExternalSortThreshold();
+ enableExternalSort = IoTDBDescriptor.getInstance().getConfig().isEnableExternalSort();
+ scheduler = ExternalSortJobScheduler.getInstance();
+
+ // create queryDir
+ try {
+ FileUtils.deleteDirectory(new File(queryDir));
+ FileUtils.forceMkdir(new File(queryDir));
+ } catch (IOException e) {
+ throw new StorageEngineFailureException(e);
+ }
+ }
+
+ @Override
+ public List<IPointReader> executeForIPointReader(long queryId,
+ List<ChunkReaderWrap> chunkReaderWraps)
+ throws IOException {
+ if (!enableExternalSort || chunkReaderWraps.size() < minExternalSortSourceCount) {
+ return generateIPointReader(chunkReaderWraps, 0, chunkReaderWraps.size());
+ }
+ if (logger.isInfoEnabled()) {
+ logger.info("query {} measurement {} uses external sort.", queryId,
+ chunkReaderWraps.get(0).getMeasurementUid());
+ }
+ ExternalSortJob job = createJob(queryId, chunkReaderWraps);
+ return job.executeForIPointReader();
+ }
+
+ @Override
+ public List<IBatchReader> executeForIBatchReader(long queryId, List<IBatchReader> batchReaders)
+ throws IOException {
+ return null;
+ }
+
+ @Override
+ public List<IReaderByTimestamp> executeForByTimestampReader(long queryId,
+ List<ChunkReaderWrap> chunkReaderWraps) throws IOException {
+ if (!enableExternalSort || chunkReaderWraps.size() < minExternalSortSourceCount) {
+ return generateIReaderByTimestamp(chunkReaderWraps, 0, chunkReaderWraps.size());
+ }
+ if (logger.isInfoEnabled()) {
+ logger.info("query {} measurement {} uses external sort.", queryId,
+ chunkReaderWraps.get(0).getMeasurementUid());
+ }
+ ExternalSortJob job = createJob(queryId, chunkReaderWraps);
+ return convert(job.executeForIPointReader());
+ }
+
+ @Override
+ public ExternalSortJob createJob(long queryId, List<ChunkReaderWrap> readerWrapList) {
+ long jodId = scheduler.genJobId();
+ List<ExternalSortJobPart> ret = new ArrayList<>();
+ for (ChunkReaderWrap readerWrap : readerWrapList) {
+ ret.add(new SingleSourceExternalSortJobPart(readerWrap));
+ }
+
+ int partId = 0;
+ while (ret.size() >= minExternalSortSourceCount) {
+ List<ExternalSortJobPart> tmpPartList = new ArrayList<>();
+ for (int i = 0; i < ret.size(); ) {
+ int toIndex = Math.min(i + minExternalSortSourceCount, ret.size());
+ List<ExternalSortJobPart> partGroup = ret.subList(i, toIndex);
+ i = toIndex;
+ StringBuilder tmpFilePath = new StringBuilder(queryDir).append(jodId).append("_")
+ .append(partId);
+ MultiSourceExternalSortJobPart part = new MultiSourceExternalSortJobPart(queryId,
+ tmpFilePath.toString(), partGroup);
+ tmpPartList.add(part);
+ partId++;
+ }
+ ret = tmpPartList;
+ }
+ return new ExternalSortJob(jodId, ret);
+ }
+
+ public String getQueryDir() {
+ return queryDir;
+ }
+
+ public void setQueryDir(String queryDir) {
+ this.queryDir = queryDir;
+ }
+
+ public int getMinExternalSortSourceCount() {
+ return minExternalSortSourceCount;
+ }
+
+ public void setMinExternalSortSourceCount(int minExternalSortSourceCount) {
+ this.minExternalSortSourceCount = minExternalSortSourceCount;
+ }
+
+ /**
+ * init IPointReader with ChunkReaderWrap.
+ */
+ private List<IPointReader> generateIPointReader(List<ChunkReaderWrap> readerWraps,
+ final int start, final int size) throws IOException {
+ List<IPointReader> pointReaderList = new ArrayList<>();
+ for (int i = start; i < start + size; i++) {
+ pointReaderList.add(readerWraps.get(i).getIPointReader());
+ }
+ return pointReaderList;
+ }
+
+ /**
+ * init IReaderByTimestamp with ChunkReaderWrap.
+ */
+ private List<IReaderByTimestamp> generateIReaderByTimestamp(List<ChunkReaderWrap> readerWraps,
+ final int start, final int size) throws IOException {
+ List<IReaderByTimestamp> readerByTimestampList = new ArrayList<>();
+ for (int i = start; i < start + size; i++) {
+ readerByTimestampList.add(readerWraps.get(i).getIReaderByTimestamp());
+ }
+ return readerByTimestampList;
+ }
+
+ /**
+ * convert IPointReader to implement interface of IReaderByTimestamp.
+ *
+ * @param pointReaderList reader list that implements IPointReader
+ * @return reader list that implements IReaderByTimestamp
+ */
+ private List<IReaderByTimestamp> convert(List<IPointReader> pointReaderList) {
+ List<IReaderByTimestamp> readerByTimestampList = new ArrayList<>();
+ for (IPointReader pointReader : pointReaderList) {
+ readerByTimestampList.add(new ByTimestampReaderAdapter(pointReader));
+ }
+ return readerByTimestampList;
+ }
+
+ private static class SimpleExternalSortJobEngineHelper {
+
+ private static SimpleExternalSortEngine INSTANCE = new SimpleExternalSortEngine();
+ }
+
+ public static SimpleExternalSortEngine getInstance() {
+ return SimpleExternalSortJobEngineHelper.INSTANCE;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/IAggregateReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/IAggregateReader.java
deleted file mode 100644
index 7cb9112..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/IAggregateReader.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.query.reader;
-
-import java.io.IOException;
-import org.apache.iotdb.tsfile.file.header.PageHeader;
-
-public interface IAggregateReader extends IBatchReader {
-
- /**
- * Returns meta-information of batch data.
- * <p>
- * Returns null if batch data comes from memory. Returns pageHeader if batch data comes from page
- * data.
- */
- PageHeader nextPageHeader() throws IOException;
-
- void skipPageData() throws IOException;
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/CachedDiskChunkReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/CachedDiskChunkReader.java
index d9f86b9..e723e34 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/CachedDiskChunkReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/CachedDiskChunkReader.java
@@ -24,19 +24,19 @@ import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.db.utils.TimeValuePairUtils;
import org.apache.iotdb.tsfile.read.common.BatchData;
-import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
public class CachedDiskChunkReader implements IPointReader {
- private ChunkReader chunkReader;
+ private AbstractChunkReader AbstractChunkReader;
private BatchData data;
private TimeValuePair prev;
private TimeValuePair current;
- public CachedDiskChunkReader(ChunkReader chunkReader) {
- this.chunkReader = chunkReader;
+ public CachedDiskChunkReader(AbstractChunkReader AbstractChunkReader) {
+ this.AbstractChunkReader = AbstractChunkReader;
this.prev =
- TimeValuePairUtils.getEmptyTimeValuePair(chunkReader.getChunkHeader().getDataType());
+ TimeValuePairUtils.getEmptyTimeValuePair(AbstractChunkReader.getChunkHeader().getDataType());
}
@Override
@@ -44,8 +44,8 @@ public class CachedDiskChunkReader implements IPointReader {
if (data != null && data.hasNext()) {
return true;
}
- while (chunkReader.hasNextBatch()) {
- data = chunkReader.nextBatch();
+ while (AbstractChunkReader.hasNextBatch()) {
+ data = AbstractChunkReader.nextBatch();
if (data.hasNext()) {
return true;
}
@@ -60,8 +60,8 @@ public class CachedDiskChunkReader implements IPointReader {
if (data.hasNext()) {
TimeValuePairUtils.setCurrentTimeValuePair(data, current());
} else {
- while (chunkReader.hasNextBatch()) {
- data = chunkReader.nextBatch();
+ while (AbstractChunkReader.hasNextBatch()) {
+ data = AbstractChunkReader.nextBatch();
if (data.hasNext()) {
TimeValuePairUtils.setCurrentTimeValuePair(data, current());
break;
@@ -75,7 +75,7 @@ public class CachedDiskChunkReader implements IPointReader {
public TimeValuePair current() {
if (current == null) {
this.current =
- TimeValuePairUtils.getEmptyTimeValuePair(chunkReader.getChunkHeader().getDataType());
+ TimeValuePairUtils.getEmptyTimeValuePair(AbstractChunkReader.getChunkHeader().getDataType());
TimeValuePairUtils.setCurrentTimeValuePair(data, current);
}
return current;
@@ -83,6 +83,6 @@ public class CachedDiskChunkReader implements IPointReader {
@Override
public void close() {
- this.chunkReader.close();
+ this.AbstractChunkReader.close();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/ChunkReaderWrap.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/ChunkReaderWrap.java
index 403cfc5..a2f9630 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/ChunkReaderWrap.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/ChunkReaderWrap.java
@@ -26,10 +26,10 @@ import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.iotdb.tsfile.read.reader.IBatchReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderByTimestamp;
-import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithFilter;
-import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
public class ChunkReaderWrap {
@@ -72,10 +72,18 @@ public class ChunkReaderWrap {
public IPointReader getIPointReader() throws IOException {
if (type.equals(ChunkReaderType.DISK_CHUNK)) {
Chunk chunk = chunkLoader.getChunk(chunkMetaData);
- ChunkReader chunkReader = filter != null ? new ChunkReaderWithFilter(chunk, filter)
- : new ChunkReaderWithoutFilter(chunk);
+ AbstractChunkReader AbstractChunkReader = new ChunkReader(chunk, filter);
+ return new DiskChunkReader(AbstractChunkReader);
+ } else {
+ return new MemChunkReader(readOnlyMemChunk, filter);
+ }
+ }
- return new DiskChunkReader(chunkReader);
+ public IBatchReader getIBatchReader() throws IOException {
+ if (type.equals(ChunkReaderType.DISK_CHUNK)) {
+ Chunk chunk = chunkLoader.getChunk(chunkMetaData);
+ AbstractChunkReader AbstractChunkReader = new ChunkReader(chunk, filter);
+ return new DiskChunkReader(AbstractChunkReader);
} else {
return new MemChunkReader(readOnlyMemChunk, filter);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReader.java
index b7043f0..fce248e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReader.java
@@ -23,24 +23,25 @@ import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.db.utils.TimeValuePairUtils;
import org.apache.iotdb.tsfile.read.common.BatchData;
-import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.iotdb.tsfile.read.reader.IBatchReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
/**
* To read chunk data on disk, this class implements an interface {@link IPointReader} based on the
- * data reader {@link ChunkReader}.
+ * data reader {@link AbstractChunkReader}.
* <p>
* Note that <code>ChunkReader</code> is an abstract class with three concrete classes, two of which
* are used here: <code>ChunkReaderWithoutFilter</code> and <code>ChunkReaderWithFilter</code>.
* <p>
* This class is used in {@link org.apache.iotdb.db.query.reader.resourceRelated.UnseqResourceMergeReader}.
*/
-public class DiskChunkReader implements IPointReader {
+public class DiskChunkReader implements IPointReader, IBatchReader {
- private ChunkReader chunkReader;
+ private AbstractChunkReader AbstractChunkReader;
private BatchData data;
- public DiskChunkReader(ChunkReader chunkReader) {
- this.chunkReader = chunkReader;
+ public DiskChunkReader(AbstractChunkReader AbstractChunkReader) {
+ this.AbstractChunkReader = AbstractChunkReader;
}
@Override
@@ -48,8 +49,8 @@ public class DiskChunkReader implements IPointReader {
if (data != null && data.hasNext()) {
return true;
}
- while (chunkReader.hasNextBatch()) {
- data = chunkReader.nextBatch();
+ while (AbstractChunkReader.hasNextBatch()) {
+ data = AbstractChunkReader.nextBatch();
if (data.hasNext()) {
return true;
}
@@ -71,7 +72,17 @@ public class DiskChunkReader implements IPointReader {
}
@Override
+ public boolean hasNextBatch() throws IOException {
+ return false;
+ }
+
+ @Override
+ public BatchData nextBatch() throws IOException {
+ return null;
+ }
+
+ @Override
public void close() {
- this.chunkReader.close();
+ this.AbstractChunkReader.close();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/MemChunkReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/MemChunkReader.java
index b0885ae..5b74663 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/MemChunkReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/MemChunkReader.java
@@ -18,25 +18,24 @@
*/
package org.apache.iotdb.db.query.reader.chunkRelated;
+import java.io.IOException;
import java.util.Iterator;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
-import org.apache.iotdb.db.query.reader.IAggregateReader;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.fileRelated.UnSealedTsFileIterateReader;
import org.apache.iotdb.db.utils.TimeValuePair;
-import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.reader.IBatchReader;
/**
- * To read chunk data in memory, this class implements two interfaces {@link IPointReader} and
- * {@link IAggregateReader} based on the data source {@link ReadOnlyMemChunk}.
+ * To read chunk data in memory
* <p>
* This class is used in {@link UnSealedTsFileIterateReader} and {@link
* org.apache.iotdb.db.query.reader.resourceRelated.UnseqResourceMergeReader}.
*/
-public class MemChunkReader implements IPointReader, IAggregateReader {
+public class MemChunkReader implements IPointReader, IBatchReader {
private Iterator<TimeValuePair> timeValuePairIterator;
private Filter filter;
@@ -88,6 +87,11 @@ public class MemChunkReader implements IPointReader, IAggregateReader {
}
@Override
+ public boolean hasNextBatch() throws IOException {
+ return hasNext();
+ }
+
+ @Override
public BatchData nextBatch() {
BatchData batchData = new BatchData(dataType, true);
if (hasCachedTimeValuePair) {
@@ -111,13 +115,4 @@ public class MemChunkReader implements IPointReader, IAggregateReader {
// Do nothing because mem chunk reader will not open files
}
- @Override
- public PageHeader nextPageHeader() {
- return null;
- }
-
- @Override
- public void skipPageData() {
- nextBatch();
- }
}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/fileRelated/FileSeriesReaderAdapter.java b/server/src/main/java/org/apache/iotdb/db/query/reader/fileRelated/FileSeriesReaderAdapter.java
deleted file mode 100644
index ff048fd..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/fileRelated/FileSeriesReaderAdapter.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.query.reader.fileRelated;
-
-import java.io.IOException;
-import org.apache.iotdb.db.query.reader.IAggregateReader;
-import org.apache.iotdb.tsfile.file.header.PageHeader;
-import org.apache.iotdb.tsfile.read.common.BatchData;
-import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReader;
-
-/**
- * To read a sequence TsFile's on-disk data, this class implements an interface {@link
- * IAggregateReader} based on the data reader {@link FileSeriesReader}.
- * <p>
- * Note that <code>FileSeriesReader</code> is an abstract class with two concrete classes:
- * <code>FileSeriesReaderWithoutFilter</code> and <code>FileSeriesReaderWithFilter</code>.
- * <p>
- * This class is used in {@link UnSealedTsFileIterateReader} and {@link
- * org.apache.iotdb.db.query.reader.resourceRelated.SeqResourceIterateReader}.
- */
-public class FileSeriesReaderAdapter implements IAggregateReader {
-
- private FileSeriesReader fileSeriesReader;
-
- public FileSeriesReaderAdapter(FileSeriesReader fileSeriesReader) {
- this.fileSeriesReader = fileSeriesReader;
- }
-
- @Override
- public PageHeader nextPageHeader() throws IOException {
- return fileSeriesReader.nextPageHeader();
- }
-
- @Override
- public void skipPageData() {
- fileSeriesReader.skipPageData();
- }
-
- @Override
- public boolean hasNext() throws IOException {
- return fileSeriesReader.hasNextBatch();
- }
-
- @Override
- public BatchData nextBatch() throws IOException {
- return fileSeriesReader.nextBatch();
- }
-
- @Override
- public void close() throws IOException {
- fileSeriesReader.close();
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/fileRelated/UnSealedTsFileIterateReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/fileRelated/UnSealedTsFileIterateReader.java
index b35b1bd..95f0089 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/fileRelated/UnSealedTsFileIterateReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/fileRelated/UnSealedTsFileIterateReader.java
@@ -23,7 +23,6 @@ import java.util.Collections;
import java.util.List;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.query.control.FileReaderManager;
-import org.apache.iotdb.db.query.reader.IAggregateReader;
import org.apache.iotdb.db.query.reader.chunkRelated.MemChunkReader;
import org.apache.iotdb.db.query.reader.universal.IterateReader;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
@@ -31,13 +30,13 @@ import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.controller.ChunkLoaderImpl;
import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.reader.IBatchReader;
+import org.apache.iotdb.tsfile.read.reader.series.AbstractFileSeriesReader;
import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReader;
-import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReaderWithFilter;
-import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReaderWithoutFilter;
/**
* To read an unsealed sequence TsFile, this class extends {@link IterateReader} to implement {@link
- * IAggregateReader} for the TsFile.
+ * IBatchReader} for the TsFile.
* <p>
* Note that an unsealed sequence TsFile consists of two parts of data in chronological order: 1)
* data that has been flushed to disk and 2) data in the flushing memtable list.
@@ -100,22 +99,17 @@ public class UnSealedTsFileIterateReader extends IterateReader {
}
/**
- * Creates <code>IAggregateReader</code> for an unsealed sequence TsFile's on-disk data.
+ * for an unsealed sequence TsFile's on-disk data.
*/
- private IAggregateReader initUnSealedTsFileDiskReader(TsFileResource unSealedTsFile,
- Filter filter)
+ private IBatchReader initUnSealedTsFileDiskReader(TsFileResource unSealedTsFile, Filter filter)
throws IOException {
- FileSeriesReader fileSeriesReader;
+ AbstractFileSeriesReader abstractFileSeriesReader;
List<ChunkMetaData> metaDataList = unSealedTsFile.getChunkMetaDataList();
if (metaDataList == null || metaDataList.isEmpty()) {
// init fileSeriesReader
// no need to construct a IChunkLoader since it will never be used in this case
- if (filter == null) {
- fileSeriesReader = new FileSeriesReaderWithoutFilter(null, metaDataList);
- } else {
- fileSeriesReader = new FileSeriesReaderWithFilter(null, metaDataList, filter);
- }
+ abstractFileSeriesReader = new FileSeriesReader(null, metaDataList, filter);
} else {
// prepare metaDataList
@@ -127,13 +121,9 @@ public class UnSealedTsFileIterateReader extends IterateReader {
.get(unSealedTsFile, false);
IChunkLoader chunkLoader = new ChunkLoaderImpl(unClosedTsFileReader);
// init fileSeriesReader
- if (filter == null) {
- fileSeriesReader = new FileSeriesReaderWithoutFilter(chunkLoader, metaDataList);
- } else {
- fileSeriesReader = new FileSeriesReaderWithFilter(chunkLoader, metaDataList, filter);
- }
+ abstractFileSeriesReader = new FileSeriesReader(chunkLoader, metaDataList, filter);
}
- return new FileSeriesReaderAdapter(fileSeriesReader);
+ return abstractFileSeriesReader;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/CachedUnseqResourceMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/CachedUnseqResourceMergeReader.java
index 2ffab28..2e56b4a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/CachedUnseqResourceMergeReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/CachedUnseqResourceMergeReader.java
@@ -25,8 +25,8 @@ import org.apache.iotdb.db.query.reader.chunkRelated.CachedDiskChunkReader;
import org.apache.iotdb.db.query.reader.universal.CachedPriorityMergeReader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
-import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter;
public class CachedUnseqResourceMergeReader extends CachedPriorityMergeReader {
@@ -35,8 +35,8 @@ public class CachedUnseqResourceMergeReader extends CachedPriorityMergeReader {
super(dataType);
int priorityValue = 1;
for (Chunk chunk : chunks) {
- ChunkReader chunkReader = new ChunkReaderWithoutFilter(chunk);
- addReaderWithPriority(new CachedDiskChunkReader(chunkReader), priorityValue++);
+ AbstractChunkReader AbstractChunkReader = new ChunkReader(chunk, null);
+ addReaderWithPriority(new CachedDiskChunkReader(AbstractChunkReader), priorityValue++);
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/SeqResourceIterateReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/SeqResourceIterateReader.java
index 1dc9099..da0dad7 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/SeqResourceIterateReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/SeqResourceIterateReader.java
@@ -26,8 +26,6 @@ import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
-import org.apache.iotdb.db.query.reader.IAggregateReader;
-import org.apache.iotdb.db.query.reader.fileRelated.FileSeriesReaderAdapter;
import org.apache.iotdb.db.query.reader.fileRelated.UnSealedTsFileIterateReader;
import org.apache.iotdb.db.query.reader.universal.IterateReader;
import org.apache.iotdb.db.utils.QueryUtils;
@@ -37,13 +35,11 @@ import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
import org.apache.iotdb.tsfile.read.controller.ChunkLoaderImpl;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.reader.IBatchReader;
import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReader;
-import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReaderWithFilter;
-import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReaderWithoutFilter;
/**
- * To read a chronologically ordered list of sequence TsFiles, this class extends {@link
- * IterateReader} to implements <code>IAggregateReader</code> for the TsFiles.
+ * To read a chronologically ordered list of sequence TsFiles.
* <p>
* Notes: 1) The list of sequence TsFiles is in strict chronological order. 2) The data in a
* sequence TsFile is also organized in chronological order. 3) A sequence TsFile can be either
@@ -104,7 +100,7 @@ public class SeqResourceIterateReader extends IterateReader {
/**
* If the idx-th TsFile in the <code>seqResources</code> might satisfy this <code>filter</code>,
- * then construct <code>IAggregateReader</code> for it, assign to <code>currentSeriesReader</code>
+ * then construct a reader for it, assign to <code>currentSeriesReader</code>
* and return true. Otherwise, return false.
*
* @param idx the index of the TsFile in the resource list
@@ -155,7 +151,7 @@ public class SeqResourceIterateReader extends IterateReader {
return !filter.satisfyStartEndTime(startTime, endTime);
}
- private IAggregateReader initSealedTsFileReader(TsFileResource sealedTsFile, Filter filter,
+ private IBatchReader initSealedTsFileReader(TsFileResource sealedTsFile, Filter filter,
QueryContext context) throws IOException {
// prepare metaDataList
List<ChunkMetaData> metaDataList = DeviceMetaDataCache.getInstance()
@@ -175,12 +171,6 @@ public class SeqResourceIterateReader extends IterateReader {
IChunkLoader chunkLoader = new ChunkLoaderImpl(tsFileReader);
// init fileSeriesReader
- FileSeriesReader fileSeriesReader;
- if (filter == null) {
- fileSeriesReader = new FileSeriesReaderWithoutFilter(chunkLoader, metaDataList);
- } else {
- fileSeriesReader = new FileSeriesReaderWithFilter(chunkLoader, metaDataList, filter);
- }
- return new FileSeriesReaderAdapter(fileSeriesReader);
+ return new FileSeriesReader(chunkLoader, metaDataList, filter);
}
}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/UnseqResourceMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/UnseqResourceMergeReader.java
index 8c61711..2c11893 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/UnseqResourceMergeReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/UnseqResourceMergeReader.java
@@ -35,9 +35,11 @@ import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.controller.ChunkLoaderImpl;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.reader.IBatchReader;
/**
* To read a list of unsequence TsFiles, this class extends {@link PriorityMergeReader} to
@@ -49,15 +51,26 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
* <p>
* This class is used in {@link org.apache.iotdb.db.query.reader.seriesRelated.SeriesReaderWithoutValueFilter}.
*/
-public class UnseqResourceMergeReader extends PriorityMergeReader {
+public class UnseqResourceMergeReader implements IBatchReader {
private Path seriesPath;
- private long queryId;
+ private PriorityMergeReader priorityMergeReader;
+
+ /**
+ * Zesong Sun !!!
+ *
+ * put Reader into merge reader one by one
+ *
+ * (1) get all ChunkMetadata
+ * (2) set priority to each ChunkMetadata
+ * (3) sort All ChunkMetadata by start time
+ * (4) create a ChunkReader with priority for each ChunkMetadata and add the ChunkReader to merge reader one by one
+ */
public UnseqResourceMergeReader(Path seriesPath, List<TsFileResource> unseqResources,
- QueryContext context, Filter filter) throws IOException {
+ QueryContext context, Filter timeFilter) throws IOException {
this.seriesPath = seriesPath;
- this.queryId = context.getJobId();
+ long queryId = context.getJobId();
List<ChunkReaderWrap> readerWrapList = new ArrayList<>();
for (TsFileResource tsFileResource : unseqResources) {
@@ -65,7 +78,7 @@ public class UnseqResourceMergeReader extends PriorityMergeReader {
// prepare metaDataList
List<ChunkMetaData> metaDataList;
if (tsFileResource.isClosed()) {
- if (isTsFileNotSatisfied(tsFileResource, filter)) {
+ if (!isTsFileSatisfied(tsFileResource, timeFilter)) {
continue;
}
@@ -78,7 +91,7 @@ public class UnseqResourceMergeReader extends PriorityMergeReader {
}
} else {
if (tsFileResource.getEndTimeMap().size() != 0) {
- if (isTsFileNotSatisfied(tsFileResource, filter)) {
+ if (!isTsFileSatisfied(tsFileResource, timeFilter)) {
continue;
}
}
@@ -94,16 +107,16 @@ public class UnseqResourceMergeReader extends PriorityMergeReader {
for (ChunkMetaData chunkMetaData : metaDataList) {
- if (filter != null && !filter.satisfy(chunkMetaData.getStatistics())) {
+ if (timeFilter != null && !timeFilter.satisfy(chunkMetaData.getStatistics())) {
continue;
}
// create and add DiskChunkReader
- readerWrapList.add(new ChunkReaderWrap(chunkMetaData, chunkLoader, filter));
+ readerWrapList.add(new ChunkReaderWrap(chunkMetaData, chunkLoader, timeFilter));
}
if (!tsFileResource.isClosed()) {
// create and add MemChunkReader
- readerWrapList.add(new ChunkReaderWrap(tsFileResource.getReadOnlyMemChunk(), filter));
+ readerWrapList.add(new ChunkReaderWrap(tsFileResource.getReadOnlyMemChunk(), timeFilter));
}
}
@@ -116,6 +129,30 @@ public class UnseqResourceMergeReader extends PriorityMergeReader {
}
}
+
+ /**
+ * Zesong Sun !!!
+ */
+ @Override
+ public boolean hasNextBatch() throws IOException {
+ return false;
+ }
+
+
+ /**
+ * Zesong Sun !!!
+ */
+ @Override
+ public BatchData nextBatch() throws IOException {
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+
/**
* Returns true if the start and end time of the series data in this unsequence TsFile do not
* satisfy the filter condition. Returns false if satisfy.
@@ -129,12 +166,12 @@ public class UnseqResourceMergeReader extends PriorityMergeReader {
* satisfy.
*/
// TODO future work: deduplicate code. See SeqResourceIterateReader.
- private boolean isTsFileNotSatisfied(TsFileResource tsFile, Filter filter) {
+ private boolean isTsFileSatisfied(TsFileResource tsFile, Filter filter) {
if (filter == null) {
- return false;
+ return true;
}
long startTime = tsFile.getStartTimeMap().get(seriesPath.getDevice());
long endTime = tsFile.getEndTimeMap().get(seriesPath.getDevice());
- return !filter.satisfyStartEndTime(startTime, endTime);
+ return filter.satisfyStartEndTime(startTime, endTime);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithValueFilter.java
index f9df67b..8ac9fb6 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithValueFilter.java
@@ -19,9 +19,13 @@
package org.apache.iotdb.db.query.reader.seriesRelated;
import java.io.IOException;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.reader.IBatchReader;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.reader.resourceRelated.SeqResourceIterateReader;
+import org.apache.iotdb.db.query.reader.resourceRelated.UnseqResourceMergeReader;
+import org.apache.iotdb.tsfile.read.reader.IBatchReader;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.Path;
@@ -51,36 +55,42 @@ public class SeriesReaderWithValueFilter extends SeriesReaderWithoutValueFilter
this.filter = filter;
}
- public SeriesReaderWithValueFilter(IBatchReader seqResourceIterateReader,
- IPointReader unseqResourceMergeReader, Filter filter) {
+ /**
+ * for test
+ */
+ SeriesReaderWithValueFilter(IBatchReader seqResourceIterateReader,
+ IBatchReader unseqResourceMergeReader, Filter filter) {
super(seqResourceIterateReader, unseqResourceMergeReader);
this.filter = filter;
}
- @Override
- public boolean hasNext() throws IOException {
- if (hasCachedValue) {
- return true;
- }
- while (super.hasNext()) {
- timeValuePair = super.next();
- if (filter.satisfy(timeValuePair.getTimestamp(), timeValuePair.getValue().getValue())) {
- hasCachedValue = true;
- return true;
- }
- }
- return false;
- }
+// @Override
+// public boolean hasNext() throws IOException {
+// if (hasCachedValue) {
+// return true;
+// }
+// while (super.hasNext()) {
+// timeValuePair = super.next();
+// if (filter.satisfy(timeValuePair.getTimestamp(), timeValuePair.getValue().getValue())) {
+// hasCachedValue = true;
+// return true;
+// }
+// }
+// return false;
+// }
+//
+// @Override
+// public TimeValuePair next() throws IOException {
+// if (hasCachedValue || hasNext()) {
+// hasCachedValue = false;
+// return timeValuePair;
+// } else {
+// throw new IOException("data reader is out of bound.");
+// }
+// }
+
+
- @Override
- public TimeValuePair next() throws IOException {
- if (hasCachedValue || hasNext()) {
- hasCachedValue = false;
- return timeValuePair;
- } else {
- throw new IOException("data reader is out of bound.");
- }
- }
@Override
public TimeValuePair current() {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilter.java
index 41af5ae..802ad1d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilter.java
@@ -23,7 +23,8 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
-import org.apache.iotdb.db.query.reader.IBatchReader;
+import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
+import org.apache.iotdb.tsfile.read.reader.IBatchReader;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.resourceRelated.SeqResourceIterateReader;
import org.apache.iotdb.db.query.reader.resourceRelated.UnseqResourceMergeReader;
@@ -40,20 +41,20 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
* Note that filters include value filter and time filter. "without value filter" is equivalent to
* "with global time filter or simply without any filter".
*/
-public class SeriesReaderWithoutValueFilter implements IPointReader {
- private boolean hasCachedBatchData;
- private BatchData batchData;
+
+/**
+ * add seqResourceIterateReader and unseqResourceMergeReader into PriorityMergeReader
+ *
+ *
+ */
+public class SeriesReaderWithoutValueFilter extends PriorityMergeReader {
private IBatchReader seqResourceIterateReader;
- private IPointReader unseqResourceMergeReader;
+ private IBatchReader unseqResourceMergeReader;
- public SeriesReaderWithoutValueFilter(IBatchReader seqResourceIterateReader,
- IPointReader unseqResourceMergeReader) {
- this.seqResourceIterateReader = seqResourceIterateReader;
- this.unseqResourceMergeReader = unseqResourceMergeReader;
- this.hasCachedBatchData = false;
- }
+ private boolean hasCachedBatchData;
+ private BatchData batchData;
/**
* Constructor function.
@@ -71,69 +72,80 @@ public class SeriesReaderWithoutValueFilter implements IPointReader {
timeFilter = queryDataSource.updateTimeFilter(timeFilter);
// reader for sequence resources
- IBatchReader seqResourceIterateReader = new SeqResourceIterateReader(
+ this.seqResourceIterateReader = new SeqResourceIterateReader(
queryDataSource.getSeriesPath(), queryDataSource.getSeqResources(), timeFilter, context);
- // reader for unsequence resources
- IPointReader unseqResourceMergeReader;
+ // reader for unsequence resources, we only push down time filter on unseq reader
if (pushdownUnseq) {
- unseqResourceMergeReader = new UnseqResourceMergeReader(seriesPath,
+ this.unseqResourceMergeReader = new UnseqResourceMergeReader(seriesPath,
queryDataSource.getUnseqResources(), context, timeFilter);
} else {
- unseqResourceMergeReader = new UnseqResourceMergeReader(seriesPath,
+ this.unseqResourceMergeReader = new UnseqResourceMergeReader(seriesPath,
queryDataSource.getUnseqResources(), context, null);
}
- this.seqResourceIterateReader = seqResourceIterateReader;
- this.unseqResourceMergeReader = unseqResourceMergeReader;
this.hasCachedBatchData = false;
}
- @Override
- public boolean hasNext() throws IOException {
- if (hasNextInBatchDataOrBatchReader()) {
- return true;
- }
- return unseqResourceMergeReader != null && unseqResourceMergeReader.hasNext();
+ /**
+ * for test
+ */
+ SeriesReaderWithoutValueFilter(IBatchReader seqResourceIterateReader,
+ IBatchReader unseqResourceMergeReader) {
+ this.seqResourceIterateReader = seqResourceIterateReader;
+ this.unseqResourceMergeReader = unseqResourceMergeReader;
+ this.hasCachedBatchData = false;
}
- @Override
- public TimeValuePair next() throws IOException {
- boolean hasNextBatch = hasNextInBatchDataOrBatchReader();
- boolean hasNextPoint = unseqResourceMergeReader != null && unseqResourceMergeReader.hasNext();
-
- // has next in both batch reader and point reader
- if (hasNextBatch && hasNextPoint) {
- long timeInPointReader = unseqResourceMergeReader.current().getTimestamp();
- long timeInBatchData = batchData.currentTime();
- if (timeInPointReader > timeInBatchData) {
- TimeValuePair timeValuePair = TimeValuePairUtils.getCurrentTimeValuePair(batchData);
- batchData.next();
- return timeValuePair;
- } else if (timeInPointReader == timeInBatchData) {
- // Note that batchData here still moves next even though the current data to be read is
- // overwritten by unsequence data source. Only in this way can hasNext() work correctly.
- batchData.next();
- return unseqResourceMergeReader.next();
- } else {
- return unseqResourceMergeReader.next();
- }
- }
-
- // only has next in batch reader
- if (hasNextBatch) {
- TimeValuePair timeValuePair = TimeValuePairUtils.getCurrentTimeValuePair(batchData);
- batchData.next();
- return timeValuePair;
- }
-
- // only has next in point reader
- if (hasNextPoint) {
- return unseqResourceMergeReader.next();
- }
- return null;
- }
+ /**
+ * methods in IPointReader
+ */
+// @Override
+// public boolean hasNext() throws IOException {
+// if (hasNextInBatchDataOrBatchReader()) {
+// return true;
+// }
+// return unseqResourceMergeReader != null && unseqResourceMergeReader.hasNext();
+// }
+//
+// @Override
+// public TimeValuePair next() throws IOException {
+// boolean hasNextBatch = hasNextInBatchDataOrBatchReader();
+// boolean hasNextPoint = unseqResourceMergeReader != null && unseqResourceMergeReader.hasNext();
+//
+// // has next in both batch reader and point reader
+// if (hasNextBatch && hasNextPoint) {
+// long timeInPointReader = unseqResourceMergeReader.current().getTimestamp();
+// long timeInBatchData = batchData.currentTime();
+// if (timeInPointReader > timeInBatchData) {
+// TimeValuePair timeValuePair = TimeValuePairUtils.getCurrentTimeValuePair(batchData);
+// batchData.next();
+// return timeValuePair;
+// } else if (timeInPointReader == timeInBatchData) {
+// // Note that batchData here still moves next even though the current data to be read is
+// // overwritten by unsequence data source. Only in this way can hasNext() work correctly.
+// batchData.next();
+// return unseqResourceMergeReader.next();
+// } else {
+// return unseqResourceMergeReader.next();
+// }
+// }
+//
+// // only has next in batch reader
+// if (hasNextBatch) {
+// TimeValuePair timeValuePair = TimeValuePairUtils.getCurrentTimeValuePair(batchData);
+// batchData.next();
+// return timeValuePair;
+// }
+//
+// // only has next in point reader
+// if (hasNextPoint) {
+// return unseqResourceMergeReader.next();
+// }
+//
+// return null;
+// }
private boolean hasNextInBatchDataOrBatchReader() throws IOException {
// has value in batchData
@@ -144,7 +156,7 @@ public class SeriesReaderWithoutValueFilter implements IPointReader {
}
// has value in batchReader
- while (seqResourceIterateReader != null && seqResourceIterateReader.hasNext()) {
+ while (seqResourceIterateReader != null && seqResourceIterateReader.hasNextBatch()) {
batchData = seqResourceIterateReader.nextBatch();
if (batchData.hasNext()) {
hasCachedBatchData = true;
@@ -154,9 +166,15 @@ public class SeriesReaderWithoutValueFilter implements IPointReader {
return false;
}
+
+ @Override
+ public boolean hasNextBatch() throws IOException {
+ return false;
+ }
+
@Override
- public TimeValuePair current() throws IOException {
- throw new IOException("current() in SeriesReaderWithoutValueFilter is an empty method.");
+ public BatchData nextBatch() throws IOException {
+ return null;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/CachedPriorityMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/CachedPriorityMergeReader.java
index 125fa0c..0c555aa 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/CachedPriorityMergeReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/CachedPriorityMergeReader.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.db.utils.TimeValuePairUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.BatchData;
/**
* CachedPriorityMergeReader use a cache to reduce unnecessary heap updates and increase locality.
@@ -92,4 +93,5 @@ public class CachedPriorityMergeReader extends PriorityMergeReader {
return timeValuePairCache[cacheIdx];
}
}
+
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/IterateReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/IterateReader.java
index 58ce739..fdece3b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/IterateReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/IterateReader.java
@@ -19,16 +19,15 @@
package org.apache.iotdb.db.query.reader.universal;
import java.io.IOException;
-import org.apache.iotdb.db.query.reader.IAggregateReader;
-import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.reader.IBatchReader;
/**
- * This class implements {@link IAggregateReader} for sequential data sources.
+ * This class is for sequential data sources.
*/
-public abstract class IterateReader implements IAggregateReader {
+public abstract class IterateReader implements IBatchReader {
- protected IAggregateReader currentSeriesReader;
+ protected IBatchReader currentSeriesReader;
private boolean curReaderInitialized;
private int nextSeriesReaderIndex;
private int readerSize;
@@ -41,9 +40,9 @@ public abstract class IterateReader implements IAggregateReader {
}
@Override
- public boolean hasNext() throws IOException {
+ public boolean hasNextBatch() throws IOException {
- if (curReaderInitialized && currentSeriesReader.hasNext()) {
+ if (curReaderInitialized && currentSeriesReader.hasNextBatch()) {
return true;
} else {
curReaderInitialized = false;
@@ -51,7 +50,7 @@ public abstract class IterateReader implements IAggregateReader {
while (nextSeriesReaderIndex < readerSize) {
boolean isConstructed = constructNextReader(nextSeriesReaderIndex++);
- if (isConstructed && currentSeriesReader.hasNext()) {
+ if (isConstructed && currentSeriesReader.hasNextBatch()) {
curReaderInitialized = true;
return true;
}
@@ -60,7 +59,7 @@ public abstract class IterateReader implements IAggregateReader {
}
/**
- * If the idx-th data source in order needs reading, construct <code>IAggregateReader</code> for
+ * If the idx-th data source in order needs reading, construct <code>IBatchReader</code> for
* it, assign to <code>currentSeriesReader</code> and return true. Otherwise, return false.
*
* @param idx the index of the data source
@@ -74,16 +73,6 @@ public abstract class IterateReader implements IAggregateReader {
}
@Override
- public PageHeader nextPageHeader() throws IOException {
- return currentSeriesReader.nextPageHeader();
- }
-
- @Override
- public void skipPageData() throws IOException {
- currentSeriesReader.skipPageData();
- }
-
- @Override
public void close() {
// file stream is managed in QueryResourceManager.
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
index 1e1e5d1..742a40f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
@@ -23,11 +23,13 @@ import java.util.List;
import java.util.PriorityQueue;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.reader.IBatchReader;
/**
* This class implements {@link IPointReader} for data sources with different priorities.
*/
-public class PriorityMergeReader implements IPointReader {
+public class PriorityMergeReader implements IBatchReader {
PriorityQueue<Element> heap = new PriorityQueue<>((o1, o2) -> {
int timeCompare = Long.compare(o1.timeValuePair.getTimestamp(),
@@ -52,56 +54,66 @@ public class PriorityMergeReader implements IPointReader {
}
}
- @Override
- public boolean hasNext() {
- return !heap.isEmpty();
- }
+// @Override
+// public boolean hasNext() {
+// return !heap.isEmpty();
+// }
+//
+// @Override
+// public TimeValuePair next() throws IOException {
+// Element top = heap.poll();
+// TimeValuePair ret = top.timeValuePair;
+// TimeValuePair topNext = null;
+// if (top.hasNext()) {
+// top.next();
+// topNext = top.currPair();
+// }
+// long topNextTime = topNext == null ? Long.MAX_VALUE : topNext.getTimestamp();
+// updateHeap(ret.getTimestamp(), topNextTime);
+// if (topNext != null) {
+// top.timeValuePair = topNext;
+// heap.add(top);
+// }
+// return ret;
+// }
+//
+// @Override
+// public TimeValuePair current() throws IOException {
+// return heap.peek().timeValuePair;
+// }
+//
+// private void updateHeap(long topTime, long topNextTime) throws IOException {
+// while (!heap.isEmpty() && heap.peek().currTime() == topTime) {
+// Element e = heap.poll();
+// if (!e.hasNext()) {
+// e.reader.close();
+// continue;
+// }
+//
+// e.next();
+// if (e.currTime() == topNextTime) {
+// // if the next value of the peek will be overwritten by the next of the top, skip it
+// if (e.hasNext()) {
+// e.next();
+// heap.add(e);
+// } else {
+// // the chunk is end
+// e.close();
+// }
+// } else {
+// heap.add(e);
+// }
+// }
+// }
@Override
- public TimeValuePair next() throws IOException {
- Element top = heap.poll();
- TimeValuePair ret = top.timeValuePair;
- TimeValuePair topNext = null;
- if (top.hasNext()) {
- top.next();
- topNext = top.currPair();
- }
- long topNextTime = topNext == null ? Long.MAX_VALUE : topNext.getTimestamp();
- updateHeap(ret.getTimestamp(), topNextTime);
- if (topNext != null) {
- top.timeValuePair = topNext;
- heap.add(top);
- }
- return ret;
+ public boolean hasNextBatch() throws IOException {
+ return false;
}
@Override
- public TimeValuePair current() throws IOException {
- return heap.peek().timeValuePair;
- }
-
- private void updateHeap(long topTime, long topNextTime) throws IOException {
- while (!heap.isEmpty() && heap.peek().currTime() == topTime) {
- Element e = heap.poll();
- if (!e.hasNext()) {
- e.reader.close();
- continue;
- }
-
- e.next();
- if (e.currTime() == topNextTime) {
- // if the next value of the peek will be overwritten by the next of the top, skip it
- if (e.hasNext()) {
- e.next();
- heap.add(e);
- } else {
- // the chunk is end
- e.close();
- }
- } else {
- heap.add(e);
- }
- }
+ public BatchData nextBatch() throws IOException {
+ return null;
}
@Override
@@ -118,6 +130,12 @@ public class PriorityMergeReader implements IPointReader {
TimeValuePair timeValuePair;
int priority;
+ /**
+ * Zesong Sun !!!
+ */
+ BatchData batchData;
+ int index;
+
Element(IPointReader reader, TimeValuePair timeValuePair, int priority) {
this.reader = reader;
this.timeValuePair = timeValuePair;
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
index ef9a49b..81651d6 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
@@ -37,8 +37,8 @@ import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
-import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -101,10 +101,10 @@ public class MergeUtils {
}
public static int writeChunkWithoutUnseq(Chunk chunk, IChunkWriter chunkWriter) throws IOException {
- ChunkReader chunkReader = new ChunkReaderWithoutFilter(chunk);
+ AbstractChunkReader AbstractChunkReader = new ChunkReader(chunk);
int ptWritten = 0;
- while (chunkReader.hasNextBatch()) {
- BatchData batchData = chunkReader.nextBatch();
+ while (AbstractChunkReader.hasNextBatch()) {
+ BatchData batchData = AbstractChunkReader.nextBatch();
for (int i = 0; i < batchData.length(); i++) {
writeBatchPoint(batchData, i, chunkWriter);
}
diff --git a/server/src/test/java/org/apache/iotdb/db/query/externalsort/FakeChunkReaderWrap.java b/server/src/test/java/org/apache/iotdb/db/query/externalsort/FakeChunkReaderWrap.java
index f18bdc6..f08bfe4 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/externalsort/FakeChunkReaderWrap.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/externalsort/FakeChunkReaderWrap.java
@@ -22,6 +22,9 @@ import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.db.query.reader.chunkRelated.ChunkReaderWrap;
+/**
+ * for test
+ */
public class FakeChunkReaderWrap extends ChunkReaderWrap {
private IPointReader pointReader;
public FakeChunkReaderWrap(IPointReader pointReader){
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/FakedIBatchPoint.java b/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/FakedIBatchPoint.java
index a58509a..8b1182e 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/FakedIBatchPoint.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/FakedIBatchPoint.java
@@ -23,7 +23,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
-import org.apache.iotdb.db.query.reader.IBatchReader;
+import org.apache.iotdb.tsfile.read.reader.IBatchReader;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.db.utils.TsPrimitiveType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -61,7 +61,7 @@ public class FakedIBatchPoint implements IBatchReader {
}
@Override
- public boolean hasNext() {
+ public boolean hasNextBatch() {
if (hasCachedBatchData) {
return true;
}
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithValueFilterTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithValueFilterTest.java
index ac7296e..41e1cca 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithValueFilterTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithValueFilterTest.java
@@ -20,7 +20,8 @@
package org.apache.iotdb.db.query.reader.seriesRelated;
import java.io.IOException;
-import org.apache.iotdb.db.query.reader.IBatchReader;
+import org.apache.iotdb.db.query.reader.resourceRelated.UnseqResourceMergeReader;
+import org.apache.iotdb.tsfile.read.reader.IBatchReader;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.read.filter.ValueFilter;
@@ -35,7 +36,7 @@ public class SeriesReaderWithValueFilterTest {
// (100,0),(105,1),(110,0),(115,1),(120,0),...
IBatchReader batchReader = new FakedIBatchPoint(100, 1000, 5, 2);
// (100,0),(105,1),(110,2),(115,3),(120,0),...
- IPointReader pointReader = new FakedIPointReader(100, 500, 5, 4);
+ IBatchReader pointReader = new FakedIBatchPoint(100, 500, 5, 4);
reader = new SeriesReaderWithValueFilter(batchReader, pointReader, ValueFilter.eq(0L));
}
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilterTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilterTest.java
index d25298b..4a8ce01 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilterTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilterTest.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.db.query.reader.seriesRelated;
import java.io.IOException;
-import org.apache.iotdb.db.query.reader.IBatchReader;
+import org.apache.iotdb.tsfile.read.reader.IBatchReader;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.junit.Assert;
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
index 15bd301..e8697c9 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.storageGroup.StorageGroupProcessorException;
-import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.query.reader.chunkRelated.DiskChunkReader;
import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
@@ -47,8 +46,8 @@ import org.apache.iotdb.tsfile.read.controller.ChunkLoaderImpl;
import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
import org.apache.iotdb.tsfile.read.controller.IMetadataQuerier;
import org.apache.iotdb.tsfile.read.controller.MetadataQuerierByFileImpl;
+import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
-import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter;
import org.apache.iotdb.tsfile.write.TsFileWriter;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
@@ -166,9 +165,8 @@ public class UnseqTsFileRecoverTest {
int priorityValue = 1;
for (ChunkMetaData chunkMetaData : metadataQuerier.getChunkMetaDataList(path)) {
Chunk chunk = chunkLoader.getChunk(chunkMetaData);
- ChunkReader chunkReader = new ChunkReaderWithoutFilter(chunk);
-
- unSeqMergeReader.addReaderWithPriority(new DiskChunkReader(chunkReader), priorityValue);
+ AbstractChunkReader AbstractChunkReader = new ChunkReader(chunk, null);
+ unSeqMergeReader.addReaderWithPriority(new DiskChunkReader(AbstractChunkReader), priorityValue);
priorityValue++;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/DataSetWithoutTimeGenerator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/DataSetWithoutTimeGenerator.java
index e8eedb4..ad11265 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/DataSetWithoutTimeGenerator.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/DataSetWithoutTimeGenerator.java
@@ -30,14 +30,14 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReader;
+import org.apache.iotdb.tsfile.read.reader.series.AbstractFileSeriesReader;
/**
* multi-way merging data set, no need to use TimeGenerator.
*/
public class DataSetWithoutTimeGenerator extends QueryDataSet {
- private List<FileSeriesReader> readers;
+ private List<AbstractFileSeriesReader> readers;
private List<BatchData> batchDataList;
@@ -59,7 +59,7 @@ public class DataSetWithoutTimeGenerator extends QueryDataSet {
* @throws IOException IOException
*/
public DataSetWithoutTimeGenerator(List<Path> paths, List<TSDataType> dataTypes,
- List<FileSeriesReader> readers)
+ List<AbstractFileSeriesReader> readers)
throws IOException {
super(paths, dataTypes);
this.readers = readers;
@@ -73,7 +73,7 @@ public class DataSetWithoutTimeGenerator extends QueryDataSet {
timeSet = new HashSet<>();
for (int i = 0; i < paths.size(); i++) {
- FileSeriesReader reader = readers.get(i);
+ AbstractFileSeriesReader reader = readers.get(i);
if (!reader.hasNextBatch()) {
batchDataList.add(new BatchData());
hasDataRemaining.add(false);
@@ -117,7 +117,7 @@ public class DataSetWithoutTimeGenerator extends QueryDataSet {
data.next();
if (!data.hasNext()) {
- FileSeriesReader reader = readers.get(i);
+ AbstractFileSeriesReader reader = readers.get(i);
if (reader.hasNextBatch()) {
data = reader.nextBatch();
if (data.hasNext()) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/executor/TsFileExecutor.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/executor/TsFileExecutor.java
index 9b2b205..b490338 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/executor/TsFileExecutor.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/executor/TsFileExecutor.java
@@ -38,9 +38,8 @@ import org.apache.iotdb.tsfile.read.expression.util.ExpressionOptimizer;
import org.apache.iotdb.tsfile.read.query.dataset.DataSetWithoutTimeGenerator;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.read.reader.series.EmptyFileSeriesReader;
+import org.apache.iotdb.tsfile.read.reader.series.AbstractFileSeriesReader;
import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReader;
-import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReaderWithFilter;
-import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReaderWithoutFilter;
import org.apache.iotdb.tsfile.utils.BloomFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -167,24 +166,18 @@ public class TsFileExecutor implements QueryExecutor {
* @return DataSetWithoutTimeGenerator
*/
private QueryDataSet executeMayAttachTimeFiler(List<Path> selectedPathList,
- GlobalTimeExpression timeFilter)
- throws IOException, NoMeasurementException {
- List<FileSeriesReader> readersOfSelectedSeries = new ArrayList<>();
+ GlobalTimeExpression timeFilter) throws IOException, NoMeasurementException {
+ List<AbstractFileSeriesReader> readersOfSelectedSeries = new ArrayList<>();
List<TSDataType> dataTypes = new ArrayList<>();
for (Path path : selectedPathList) {
List<ChunkMetaData> chunkMetaDataList = metadataQuerier.getChunkMetaDataList(path);
- FileSeriesReader seriesReader;
+ AbstractFileSeriesReader seriesReader;
if (chunkMetaDataList.isEmpty()) {
seriesReader = new EmptyFileSeriesReader();
dataTypes.add(metadataQuerier.getDataType(path.getMeasurement()));
} else {
- if (timeFilter == null) {
- seriesReader = new FileSeriesReaderWithoutFilter(chunkLoader, chunkMetaDataList);
- } else {
- seriesReader = new FileSeriesReaderWithFilter(chunkLoader, chunkMetaDataList,
- timeFilter.getFilter());
- }
+ seriesReader = new FileSeriesReader(chunkLoader, chunkMetaDataList, timeFilter.getFilter());
dataTypes.add(chunkMetaDataList.get(0).getDataType());
}
readersOfSelectedSeries.add(seriesReader);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TimeGeneratorImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TimeGeneratorImpl.java
index 496cff5..5ac4842 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TimeGeneratorImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TimeGeneratorImpl.java
@@ -35,8 +35,8 @@ import org.apache.iotdb.tsfile.read.query.timegenerator.node.AndNode;
import org.apache.iotdb.tsfile.read.query.timegenerator.node.LeafNode;
import org.apache.iotdb.tsfile.read.query.timegenerator.node.Node;
import org.apache.iotdb.tsfile.read.query.timegenerator.node.OrNode;
+import org.apache.iotdb.tsfile.read.reader.series.AbstractFileSeriesReader;
import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReader;
-import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReaderWithFilter;
public class TimeGeneratorImpl implements TimeGenerator {
@@ -93,7 +93,7 @@ public class TimeGeneratorImpl implements TimeGenerator {
if (expression.getType() == ExpressionType.SERIES) {
SingleSeriesExpression singleSeriesExp = (SingleSeriesExpression) expression;
- FileSeriesReader seriesReader = generateSeriesReader(singleSeriesExp);
+ AbstractFileSeriesReader seriesReader = generateSeriesReader(singleSeriesExp);
Path path = singleSeriesExp.getSeriesPath();
if (!leafCache.containsKey(path)) {
@@ -120,11 +120,11 @@ public class TimeGeneratorImpl implements TimeGenerator {
"Unsupported ExpressionType when construct OperatorNode: " + expression.getType());
}
- private FileSeriesReader generateSeriesReader(SingleSeriesExpression singleSeriesExp)
+ private AbstractFileSeriesReader generateSeriesReader(SingleSeriesExpression singleSeriesExp)
throws IOException {
List<ChunkMetaData> chunkMetaDataList = metadataQuerier
.getChunkMetaDataList(singleSeriesExp.getSeriesPath());
- return new FileSeriesReaderWithFilter(chunkLoader, chunkMetaDataList,
+ return new FileSeriesReader(chunkLoader, chunkMetaDataList,
singleSeriesExp.getFilter());
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/LeafNode.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/LeafNode.java
index 5137a41..e1221f7 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/LeafNode.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/LeafNode.java
@@ -20,17 +20,17 @@ package org.apache.iotdb.tsfile.read.query.timegenerator.node;
import java.io.IOException;
import org.apache.iotdb.tsfile.read.common.BatchData;
-import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReader;
+import org.apache.iotdb.tsfile.read.reader.series.AbstractFileSeriesReader;
public class LeafNode implements Node {
- private FileSeriesReader reader;
+ private AbstractFileSeriesReader reader;
private BatchData data = null;
private boolean gotData = false;
- public LeafNode(FileSeriesReader reader) {
+ public LeafNode(AbstractFileSeriesReader reader) {
this.reader = reader;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/IBatchReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IBatchReader.java
similarity index 91%
rename from server/src/main/java/org/apache/iotdb/db/query/reader/IBatchReader.java
rename to tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IBatchReader.java
index bbf1a15..5644d72 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/IBatchReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IBatchReader.java
@@ -16,14 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.query.reader;
+package org.apache.iotdb.tsfile.read.reader;
import java.io.IOException;
import org.apache.iotdb.tsfile.read.common.BatchData;
public interface IBatchReader {
- boolean hasNext() throws IOException;
+ boolean hasNextBatch() throws IOException;
BatchData nextBatch() throws IOException;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/AbstractChunkReader.java
similarity index 92%
copy from tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java
copy to tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/AbstractChunkReader.java
index 479b98a..1758ec7 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/AbstractChunkReader.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.tsfile.read.reader.chunk;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
import java.util.Arrays;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.compress.IUnCompressor;
@@ -34,21 +33,21 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.reader.IBatchReader;
import org.apache.iotdb.tsfile.read.reader.page.PageReader;
-public abstract class ChunkReader {
+public abstract class AbstractChunkReader implements IBatchReader {
- ChunkHeader chunkHeader;
+ private ChunkHeader chunkHeader;
private ByteBuffer chunkDataBuffer;
private IUnCompressor unCompressor;
- private EndianType endianType;
private Decoder valueDecoder;
private Decoder timeDecoder = Decoder.getDecoderByType(
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
TSDataType.INT64);
- private Filter filter;
+ protected Filter filter;
private BatchData data;
@@ -60,21 +59,17 @@ public abstract class ChunkReader {
*/
protected long deletedAt;
- public ChunkReader(Chunk chunk) {
- this(chunk, null);
- }
-
/**
* constructor of ChunkReader.
*
* @param chunk input Chunk object
* @param filter filter
*/
- public ChunkReader(Chunk chunk, Filter filter) {
+ public AbstractChunkReader(Chunk chunk, Filter filter) {
this.filter = filter;
this.chunkDataBuffer = chunk.getData();
this.deletedAt = chunk.getDeletedAt();
- this.endianType = chunk.getEndianType();
+ EndianType endianType = chunk.getEndianType();
chunkHeader = chunk.getHeader();
this.unCompressor = IUnCompressor.getUnCompressor(chunkHeader.getCompressionType());
valueDecoder = Decoder
@@ -123,11 +118,8 @@ public abstract class ChunkReader {
return data;
}
- public BatchData currentBatch() {
- return data;
- }
- public PageHeader nextPageHeader() throws IOException {
+ public PageHeader nextPageHeader() {
return pageHeader;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java
index 479b98a..cf831fc 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java
@@ -16,156 +16,28 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.iotdb.tsfile.read.reader.chunk;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.util.Arrays;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.compress.IUnCompressor;
-import org.apache.iotdb.tsfile.encoding.common.EndianType;
-import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
-import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.header.PageHeader;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import org.apache.iotdb.tsfile.read.reader.page.PageReader;
-
-public abstract class ChunkReader {
-
- ChunkHeader chunkHeader;
- private ByteBuffer chunkDataBuffer;
-
- private IUnCompressor unCompressor;
- private EndianType endianType;
- private Decoder valueDecoder;
- private Decoder timeDecoder = Decoder.getDecoderByType(
- TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
- TSDataType.INT64);
-
- private Filter filter;
-
- private BatchData data;
-
- private PageHeader pageHeader;
- private boolean hasCachedPageHeader;
-
- /**
- * Data whose timestamp <= deletedAt should be considered deleted(not be returned).
- */
- protected long deletedAt;
- public ChunkReader(Chunk chunk) {
- this(chunk, null);
- }
+public class ChunkReader extends AbstractChunkReader {
- /**
- * constructor of ChunkReader.
- *
- * @param chunk input Chunk object
- * @param filter filter
- */
public ChunkReader(Chunk chunk, Filter filter) {
- this.filter = filter;
- this.chunkDataBuffer = chunk.getData();
- this.deletedAt = chunk.getDeletedAt();
- this.endianType = chunk.getEndianType();
- chunkHeader = chunk.getHeader();
- this.unCompressor = IUnCompressor.getUnCompressor(chunkHeader.getCompressionType());
- valueDecoder = Decoder
- .getDecoderByType(chunkHeader.getEncodingType(), chunkHeader.getDataType());
- valueDecoder.setEndianType(endianType);
- data = new BatchData(chunkHeader.getDataType());
- hasCachedPageHeader = false;
- }
-
- /**
- * judge if has nextBatch.
- */
- public boolean hasNextBatch() {
- if (hasCachedPageHeader) {
- return true;
- }
- // construct next satisfied page header
- while (chunkDataBuffer.remaining() > 0) {
- // deserialize a PageHeader from chunkDataBuffer
- pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, chunkHeader.getDataType());
-
- // if the current page satisfies
- if (pageSatisfied(pageHeader)) {
- hasCachedPageHeader = true;
- return true;
- } else {
- skipBytesInStreamByLength(pageHeader.getCompressedSize());
- }
- }
- return false;
+ super(chunk, filter);
}
- /**
- * get next data batch.
- *
- * @return next data batch
- * @throws IOException IOException
- */
- public BatchData nextBatch() throws IOException {
- PageReader pageReader = constructPageReaderForNextPage(pageHeader.getCompressedSize());
- hasCachedPageHeader = false;
- if (pageReader.hasNextBatch()) {
- data = pageReader.nextBatch();
- return data;
+ @Override
+ public boolean pageSatisfied(PageHeader pageHeader) {
+ if (pageHeader.getEndTime() < deletedAt) {
+ return false;
}
- return data;
- }
-
- public BatchData currentBatch() {
- return data;
- }
-
- public PageHeader nextPageHeader() throws IOException {
- return pageHeader;
- }
-
- public void skipPageData() {
- skipBytesInStreamByLength(pageHeader.getCompressedSize());
- hasCachedPageHeader = false;
- }
-
- private void skipBytesInStreamByLength(long length) {
- chunkDataBuffer.position(chunkDataBuffer.position() + (int) length);
- }
-
- public abstract boolean pageSatisfied(PageHeader pageHeader);
-
- private PageReader constructPageReaderForNextPage(int compressedPageBodyLength)
- throws IOException {
- byte[] compressedPageBody = new byte[compressedPageBodyLength];
-
- // already in memory
- if (compressedPageBodyLength > chunkDataBuffer.remaining()) {
- throw new IOException(
- "unexpected byte read length when read compressedPageBody. Expected:"
- + Arrays.toString(compressedPageBody) + ". Actual:" + chunkDataBuffer
- .remaining());
+ if (filter == null ) {
+ return true;
+ } else {
+ return filter.satisfy(pageHeader.getStatistics());
}
- chunkDataBuffer.get(compressedPageBody, 0, compressedPageBodyLength);
- valueDecoder.reset();
- ByteBuffer pageData = ByteBuffer.wrap(unCompressor.uncompress(compressedPageBody));
- PageReader reader = new PageReader(pageData, chunkHeader.getDataType(),
- valueDecoder, timeDecoder, filter);
- reader.setDeletedAt(deletedAt);
- return reader;
}
- public void close() {
- }
-
- public ChunkHeader getChunkHeader() {
- return chunkHeader;
- }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderByTimestamp.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderByTimestamp.java
index 951c327..1630a63 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderByTimestamp.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderByTimestamp.java
@@ -21,12 +21,12 @@ package org.apache.iotdb.tsfile.read.reader.chunk;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.read.common.Chunk;
-public class ChunkReaderByTimestamp extends ChunkReader {
+public class ChunkReaderByTimestamp extends AbstractChunkReader {
private long currentTimestamp;
public ChunkReaderByTimestamp(Chunk chunk) {
- super(chunk);
+ super(chunk, null);
}
@Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderWithFilter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderWithFilter.java
deleted file mode 100644
index 62c9cb2..0000000
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderWithFilter.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.tsfile.read.reader.chunk;
-
-import org.apache.iotdb.tsfile.file.header.PageHeader;
-import org.apache.iotdb.tsfile.read.common.Chunk;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-
-public class ChunkReaderWithFilter extends ChunkReader {
-
- private Filter filter;
-
- public ChunkReaderWithFilter(Chunk chunk, Filter filter) {
- super(chunk, filter);
- this.filter = filter;
- }
-
- @Override
- public boolean pageSatisfied(PageHeader pageHeader) {
- if (pageHeader.getEndTime() < deletedAt) {
- return false;
- }
- return filter.satisfy(pageHeader.getStatistics());
- }
-
-}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderWithoutFilter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderWithoutFilter.java
deleted file mode 100644
index c2a09c6..0000000
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderWithoutFilter.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.tsfile.read.reader.chunk;
-
-import org.apache.iotdb.tsfile.file.header.PageHeader;
-import org.apache.iotdb.tsfile.read.common.Chunk;
-
-public class ChunkReaderWithoutFilter extends ChunkReader {
-
- public ChunkReaderWithoutFilter(Chunk chunk) {
- super(chunk);
- }
-
- @Override
- public boolean pageSatisfied(PageHeader pageHeader) {
- return pageHeader.getEndTime() > deletedAt;
- }
-
-}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/AbstractFileSeriesReader.java
similarity index 77%
copy from tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReader.java
copy to tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/AbstractFileSeriesReader.java
index ca43763..041c34e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/AbstractFileSeriesReader.java
@@ -25,26 +25,32 @@ import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
-import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.reader.IBatchReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
/**
* Series reader is used to query one series of one tsfile.
*/
-public abstract class FileSeriesReader {
+public abstract class AbstractFileSeriesReader implements IBatchReader {
protected IChunkLoader chunkLoader;
protected List<ChunkMetaData> chunkMetaDataList;
- protected ChunkReader chunkReader;
+ protected AbstractChunkReader AbstractChunkReader;
private int chunkToRead;
private BatchData data;
+ protected Filter filter;
+
/**
* constructor of FileSeriesReader.
*/
- public FileSeriesReader(IChunkLoader chunkLoader, List<ChunkMetaData> chunkMetaDataList) {
+ public AbstractFileSeriesReader(IChunkLoader chunkLoader, List<ChunkMetaData> chunkMetaDataList,
+ Filter filter) {
this.chunkLoader = chunkLoader;
this.chunkMetaDataList = chunkMetaDataList;
+ this.filter = filter;
this.chunkToRead = 0;
}
@@ -56,7 +62,7 @@ public abstract class FileSeriesReader {
public boolean hasNextBatch() throws IOException {
// current chunk has additional batch
- if (chunkReader != null && chunkReader.hasNextBatch()) {
+ if (AbstractChunkReader != null && AbstractChunkReader.hasNextBatch()) {
return true;
}
@@ -68,7 +74,7 @@ public abstract class FileSeriesReader {
// chunk metadata satisfy the condition
initChunkReader(chunkMetaData);
- if (chunkReader.hasNextBatch()) {
+ if (AbstractChunkReader.hasNextBatch()) {
return true;
}
}
@@ -80,7 +86,7 @@ public abstract class FileSeriesReader {
* get next batch data.
*/
public BatchData nextBatch() throws IOException {
- data = chunkReader.nextBatch();
+ data = AbstractChunkReader.nextBatch();
return data;
}
@@ -88,12 +94,12 @@ public abstract class FileSeriesReader {
return data;
}
- public PageHeader nextPageHeader() throws IOException {
- return chunkReader.nextPageHeader();
+ public PageHeader nextPageHeader() {
+ return AbstractChunkReader.nextPageHeader();
}
public void skipPageData() {
- chunkReader.skipPageData();
+ AbstractChunkReader.skipPageData();
}
protected abstract void initChunkReader(ChunkMetaData chunkMetaData) throws IOException;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/EmptyFileSeriesReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/EmptyFileSeriesReader.java
index 4f63840..1c43a6c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/EmptyFileSeriesReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/EmptyFileSeriesReader.java
@@ -25,11 +25,11 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
/**
* this is for those series which has no data points
*/
-public class EmptyFileSeriesReader extends FileSeriesReader {
+public class EmptyFileSeriesReader extends AbstractFileSeriesReader {
BatchData data = new BatchData();
public EmptyFileSeriesReader() {
- super(null, null);
+ super(null, null, null);
}
@Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReader.java
index ca43763..9ececb6 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReader.java
@@ -16,95 +16,40 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.iotdb.tsfile.read.reader.series;
import java.io.IOException;
import java.util.List;
-import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
-import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
/**
- * Series reader is used to query one series of one tsfile.
+ * Series reader is used to query one series of one TsFile,
+ * and this reader has a filter operating on the same series.
*/
-public abstract class FileSeriesReader {
+public class FileSeriesReader extends AbstractFileSeriesReader {
- protected IChunkLoader chunkLoader;
- protected List<ChunkMetaData> chunkMetaDataList;
- protected ChunkReader chunkReader;
- private int chunkToRead;
+ public FileSeriesReader(IChunkLoader chunkLoader,
+ List<ChunkMetaData> chunkMetaDataList, Filter filter) {
+ super(chunkLoader, chunkMetaDataList, filter);
+ }
- private BatchData data;
- /**
- * constructor of FileSeriesReader.
- */
- public FileSeriesReader(IChunkLoader chunkLoader, List<ChunkMetaData> chunkMetaDataList) {
- this.chunkLoader = chunkLoader;
- this.chunkMetaDataList = chunkMetaDataList;
- this.chunkToRead = 0;
+ @Override
+ protected void initChunkReader(ChunkMetaData chunkMetaData) throws IOException {
+ Chunk chunk = chunkLoader.getChunk(chunkMetaData);
+ this.AbstractChunkReader = new ChunkReader(chunk, filter);
}
- /**
- * check if current chunk has next batch data.
- *
- * @return True if current chunk has next batch data
- */
- public boolean hasNextBatch() throws IOException {
-
- // current chunk has additional batch
- if (chunkReader != null && chunkReader.hasNextBatch()) {
+ @Override
+ protected boolean chunkSatisfied(ChunkMetaData chunkMetaData) {
+ if (filter == null ) {
return true;
}
-
- // current chunk does not have additional batch, init new chunk reader
- while (chunkToRead < chunkMetaDataList.size()) {
-
- ChunkMetaData chunkMetaData = nextChunkMeta();
- if (chunkSatisfied(chunkMetaData)) {
- // chunk metadata satisfy the condition
- initChunkReader(chunkMetaData);
-
- if (chunkReader.hasNextBatch()) {
- return true;
- }
- }
- }
- return false;
- }
-
- /**
- * get next batch data.
- */
- public BatchData nextBatch() throws IOException {
- data = chunkReader.nextBatch();
- return data;
- }
-
- public BatchData currentBatch() {
- return data;
- }
-
- public PageHeader nextPageHeader() throws IOException {
- return chunkReader.nextPageHeader();
- }
-
- public void skipPageData() {
- chunkReader.skipPageData();
+ return filter.satisfy(chunkMetaData.getStatistics());
}
- protected abstract void initChunkReader(ChunkMetaData chunkMetaData) throws IOException;
-
- protected abstract boolean chunkSatisfied(ChunkMetaData chunkMetaData);
-
- public void close() throws IOException {
- chunkLoader.close();
- }
-
- private ChunkMetaData nextChunkMeta() {
- return chunkMetaDataList.get(chunkToRead++);
- }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderByTimestamp.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderByTimestamp.java
index d9d7a51..384ab5f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderByTimestamp.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderByTimestamp.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
-import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderByTimestamp;
/**
@@ -40,7 +40,7 @@ public class FileSeriesReaderByTimestamp {
protected List<ChunkMetaData> chunkMetaDataList;
private int currentChunkIndex = 0;
- private ChunkReader chunkReader;
+ private AbstractChunkReader AbstractChunkReader;
private long currentTimestamp;
private BatchData data = null; // current batch data
@@ -64,13 +64,13 @@ public class FileSeriesReaderByTimestamp {
this.currentTimestamp = timestamp;
// first initialization, only invoked in the first time
- if (chunkReader == null) {
+ if (AbstractChunkReader == null) {
if (!constructNextSatisfiedChunkReader()) {
return null;
}
- if (chunkReader.hasNextBatch()) {
- data = chunkReader.nextBatch();
+ if (AbstractChunkReader.hasNextBatch()) {
+ data = AbstractChunkReader.nextBatch();
} else {
return null;
}
@@ -93,8 +93,8 @@ public class FileSeriesReaderByTimestamp {
}
return null;
} else {
- if (chunkReader.hasNextBatch()) {
- data = chunkReader.nextBatch();
+ if (AbstractChunkReader.hasNextBatch()) {
+ data = AbstractChunkReader.nextBatch();
} else if (!constructNextSatisfiedChunkReader()) {
return null;
}
@@ -111,20 +111,20 @@ public class FileSeriesReaderByTimestamp {
*/
public boolean hasNext() throws IOException {
- if (chunkReader != null) {
+ if (AbstractChunkReader != null) {
if (data != null && data.hasNext()) {
return true;
}
- while (chunkReader.hasNextBatch()) {
- data = chunkReader.nextBatch();
+ while (AbstractChunkReader.hasNextBatch()) {
+ data = AbstractChunkReader.nextBatch();
if (data != null && data.hasNext()) {
return true;
}
}
}
while (constructNextSatisfiedChunkReader()) {
- while (chunkReader.hasNextBatch()) {
- data = chunkReader.nextBatch();
+ while (AbstractChunkReader.hasNextBatch()) {
+ data = AbstractChunkReader.nextBatch();
if (data != null && data.hasNext()) {
return true;
}
@@ -138,7 +138,7 @@ public class FileSeriesReaderByTimestamp {
ChunkMetaData chunkMetaData = chunkMetaDataList.get(currentChunkIndex++);
if (chunkSatisfied(chunkMetaData)) {
initChunkReader(chunkMetaData);
- ((ChunkReaderByTimestamp) chunkReader).setCurrentTimestamp(currentTimestamp);
+ ((ChunkReaderByTimestamp) AbstractChunkReader).setCurrentTimestamp(currentTimestamp);
return true;
}
}
@@ -147,7 +147,7 @@ public class FileSeriesReaderByTimestamp {
private void initChunkReader(ChunkMetaData chunkMetaData) throws IOException {
Chunk chunk = chunkLoader.getChunk(chunkMetaData);
- this.chunkReader = new ChunkReaderByTimestamp(chunk);
+ this.AbstractChunkReader = new ChunkReaderByTimestamp(chunk);
}
private boolean chunkSatisfied(ChunkMetaData chunkMetaData) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderWithFilter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderWithFilter.java
deleted file mode 100644
index 0a2fc53..0000000
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderWithFilter.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.tsfile.read.reader.series;
-
-import java.io.IOException;
-import java.util.List;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
-import org.apache.iotdb.tsfile.read.common.Chunk;
-import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithFilter;
-
-/**
- * Series reader is used to query one series of one TsFile,
- * and this reader has a filter operating on the same series.
- */
-public class FileSeriesReaderWithFilter extends FileSeriesReader {
-
- private Filter filter;
-
- public FileSeriesReaderWithFilter(IChunkLoader chunkLoader,
- List<ChunkMetaData> chunkMetaDataList, Filter filter) {
- super(chunkLoader, chunkMetaDataList);
- this.filter = filter;
- }
-
- @Override
- protected void initChunkReader(ChunkMetaData chunkMetaData) throws IOException {
- Chunk chunk = chunkLoader.getChunk(chunkMetaData);
- this.chunkReader = new ChunkReaderWithFilter(chunk, filter);
- }
-
- @Override
- protected boolean chunkSatisfied(ChunkMetaData chunkMetaData) {
- return filter.satisfy(chunkMetaData.getStatistics());
- }
-
-}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderWithoutFilter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderWithoutFilter.java
deleted file mode 100644
index 00f2f3b..0000000
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderWithoutFilter.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.tsfile.read.reader.series;
-
-import java.io.IOException;
-import java.util.List;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
-import org.apache.iotdb.tsfile.read.common.Chunk;
-import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
-import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter;
-
-/**
- * Series reader is used to query one series of one tsfile, this reader has no filter.
- */
-public class FileSeriesReaderWithoutFilter extends FileSeriesReader {
-
- public FileSeriesReaderWithoutFilter(IChunkLoader chunkLoader,
- List<ChunkMetaData> chunkMetaDataList) {
- super(chunkLoader, chunkMetaDataList);
- }
-
- @Override
- protected void initChunkReader(ChunkMetaData chunkMetaData) throws IOException {
- Chunk chunk = chunkLoader.getChunk(chunkMetaData);
- this.chunkReader = new ChunkReaderWithoutFilter(chunk);
- }
-
- @Override
- protected boolean chunkSatisfied(ChunkMetaData chunkMetaData) {
- return true;
- }
-
-}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/query/timegenerator/NodeTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/query/timegenerator/NodeTest.java
index 5b2c6d2..3beabfe 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/query/timegenerator/NodeTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/query/timegenerator/NodeTest.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.tsfile.read.query.timegenerator.node.AndNode;
import org.apache.iotdb.tsfile.read.query.timegenerator.node.LeafNode;
import org.apache.iotdb.tsfile.read.query.timegenerator.node.Node;
import org.apache.iotdb.tsfile.read.query.timegenerator.node.OrNode;
-import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReader;
+import org.apache.iotdb.tsfile.read.reader.series.AbstractFileSeriesReader;
import org.junit.Assert;
import org.junit.Test;
@@ -36,7 +36,7 @@ public class NodeTest {
public void testLeafNode() throws IOException {
int index = 0;
long[] timestamps = new long[]{1, 2, 3, 4, 5, 6, 7};
- FileSeriesReader seriesReader = new FakedFileSeriesReader(timestamps);
+ AbstractFileSeriesReader seriesReader = new FakedFileSeriesReader(timestamps);
Node leafNode = new LeafNode(seriesReader);
while (leafNode.hasNext()) {
Assert.assertEquals(timestamps[index++], leafNode.next());
@@ -88,7 +88,7 @@ public class NodeTest {
Assert.assertEquals(ret.length, index);
}
- private static class FakedFileSeriesReader extends FileSeriesReader {
+ private static class FakedFileSeriesReader extends AbstractFileSeriesReader {
BatchData data;
boolean hasCachedData;
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/query/timegenerator/ReaderByTimestampTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/query/timegenerator/ReaderByTimestampTest.java
index 6e2df7d..6899917 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/query/timegenerator/ReaderByTimestampTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/query/timegenerator/ReaderByTimestampTest.java
@@ -29,8 +29,8 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.controller.ChunkLoaderImpl;
import org.apache.iotdb.tsfile.read.controller.MetadataQuerierByFileImpl;
+import org.apache.iotdb.tsfile.read.reader.series.AbstractFileSeriesReader;
import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReader;
-import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReaderWithoutFilter;
import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReaderByTimestamp;
import org.junit.After;
import org.junit.Assert;
@@ -64,8 +64,8 @@ public class ReaderByTimestampTest {
ChunkLoaderImpl seriesChunkLoader = new ChunkLoaderImpl(fileReader);
List<ChunkMetaData> chunkMetaDataList = metadataQuerierByFile
.getChunkMetaDataList(new Path("d1.s1"));
- FileSeriesReader seriesReader = new FileSeriesReaderWithoutFilter(seriesChunkLoader,
- chunkMetaDataList);
+ AbstractFileSeriesReader seriesReader = new FileSeriesReader(seriesChunkLoader,
+ chunkMetaDataList, null);
List<Long> timeList = new ArrayList<>();
List<Object> valueList = new ArrayList<>();
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/ReaderTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/ReaderTest.java
index 551e63b..396e65d 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/ReaderTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/ReaderTest.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.tsfile.read.reader;
import java.io.IOException;
import java.util.List;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -33,9 +32,8 @@ import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.apache.iotdb.tsfile.read.filter.ValueFilter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
+import org.apache.iotdb.tsfile.read.reader.series.AbstractFileSeriesReader;
import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReader;
-import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReaderWithFilter;
-import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReaderWithoutFilter;
import org.apache.iotdb.tsfile.utils.TsFileGeneratorForTest;
import org.junit.After;
import org.junit.Assert;
@@ -70,8 +68,8 @@ public class ReaderTest {
List<ChunkMetaData> chunkMetaDataList = metadataQuerierByFile
.getChunkMetaDataList(new Path("d1.s1"));
- FileSeriesReader seriesReader = new FileSeriesReaderWithoutFilter(seriesChunkLoader,
- chunkMetaDataList);
+ AbstractFileSeriesReader seriesReader = new FileSeriesReader(seriesChunkLoader,
+ chunkMetaDataList, null);
long startTime = TsFileGeneratorForTest.START_TIMESTAMP;
BatchData data = null;
@@ -87,7 +85,7 @@ public class ReaderTest {
Assert.assertEquals(rowCount, count);
chunkMetaDataList = metadataQuerierByFile.getChunkMetaDataList(new Path("d1.s4"));
- seriesReader = new FileSeriesReaderWithoutFilter(seriesChunkLoader, chunkMetaDataList);
+ seriesReader = new FileSeriesReader(seriesChunkLoader, chunkMetaDataList, null);
count = 0;
while (seriesReader.hasNextBatch()) {
@@ -110,7 +108,7 @@ public class ReaderTest {
FilterFactory.and(TimeFilter.gt(1480563570029L), TimeFilter.lt(1480563570033L)),
FilterFactory.and(ValueFilter.gtEq(9520331), ValueFilter.ltEq(9520361)));
SingleSeriesExpression singleSeriesExp = new SingleSeriesExpression(new Path("d1.s1"), filter);
- FileSeriesReader seriesReader = new FileSeriesReaderWithFilter(seriesChunkLoader,
+ AbstractFileSeriesReader seriesReader = new FileSeriesReader(seriesChunkLoader,
chunkMetaDataList,
singleSeriesExp.getFilter());