You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/12/26 09:12:40 UTC
[incubator-iotdb] 01/01: optimize PageReader and ChunkReader
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch fix_chunk_reader
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit c0c85cd3bb60e186213373f64c2d6c9cee788d68
Author: qiaojialin <64...@qq.com>
AuthorDate: Thu Dec 26 17:12:13 2019 +0800
optimize PageReader and ChunkReader
---
.../apache/iotdb/tsfile/TsFileSequenceRead.java | 16 +-
.../db/engine/merge/task/MergeMultiChunkTask.java | 7 +-
.../reader/chunkRelated/CachedDiskChunkReader.java | 14 +-
.../query/reader/chunkRelated/ChunkReaderWrap.java | 3 +-
.../query/reader/chunkRelated/DiskChunkReader.java | 13 +-
.../chunkRelated/DiskChunkReaderByTimestamp.java | 8 +-
.../CachedUnseqResourceMergeReader.java | 3 +-
.../NewUnseqResourceMergeReader.java | 14 +-
.../java/org/apache/iotdb/db/utils/MergeUtils.java | 7 +-
.../writelog/recover/UnseqTsFileRecoverTest.java | 3 +-
.../read/reader/chunk/AbstractChunkReader.java | 163 ---------------------
.../tsfile/read/reader/chunk/ChunkReader.java | 135 ++++++++++++++++-
.../read/reader/chunk/ChunkReaderByTimestamp.java | 2 +-
.../iotdb/tsfile/read/reader/page/PageReader.java | 135 ++---------------
.../reader/series/AbstractFileSeriesReader.java | 10 +-
.../reader/series/FileSeriesReaderByTimestamp.java | 20 +--
.../iotdb/tsfile/read/reader/PageReaderTest.java | 7 +-
17 files changed, 200 insertions(+), 360 deletions(-)
diff --git a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
index 9eba9fb..ddf11d5 100644
--- a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
+++ b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
@@ -85,15 +85,13 @@ public class TsFileSequenceRead {
System.out
.println("\t\tUncompressed page data size: " + pageHeader.getUncompressedSize());
PageReader reader1 = new PageReader(pageData, header.getDataType(), valueDecoder,
- defaultTimeDecoder);
- while (reader1.hasNextBatch()) {
- BatchData batchData = reader1.nextBatch();
- while (batchData.hasCurrent()) {
- System.out.println(
- "\t\t\ttime, value: " + batchData.currentTime() + ", " + batchData
- .currentValue());
- batchData.next();
- }
+ defaultTimeDecoder, null);
+ BatchData batchData = reader1.getAllSatisfiedPageData();
+ while (batchData.hasCurrent()) {
+ System.out.println(
+ "\t\t\ttime, value: " + batchData.currentTime() + ", " + batchData
+ .currentValue());
+ batchData.next();
}
}
break;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
index cecf6ee..87f29c5 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
@@ -49,7 +49,6 @@ import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -401,9 +400,9 @@ class MergeMultiChunkTask {
private int writeChunkWithUnseq(Chunk chunk, IChunkWriter chunkWriter, IPointReader unseqReader,
long chunkLimitTime, int pathIdx) throws IOException {
int cnt = 0;
- AbstractChunkReader chunkReader = new ChunkReader(chunk, null);
- while (chunkReader.hasNextBatch()) {
- BatchData batchData = chunkReader.nextBatch();
+ ChunkReader chunkReader = new ChunkReader(chunk, null);
+ while (chunkReader.hasNextSatisfiedPage()) {
+ BatchData batchData = chunkReader.nextPageData();
cnt += mergeWriteBatch(batchData, chunkWriter, unseqReader, pathIdx);
}
cnt += writeRemainingUnseq(chunkWriter, unseqReader, chunkLimitTime, pathIdx);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/CachedDiskChunkReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/CachedDiskChunkReader.java
index bdca2e5..822815d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/CachedDiskChunkReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/CachedDiskChunkReader.java
@@ -24,16 +24,16 @@ import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.db.utils.TimeValuePairUtils;
import org.apache.iotdb.tsfile.read.common.BatchData;
-import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
public class CachedDiskChunkReader implements IPointReader {
- private AbstractChunkReader chunkReader;
+ private ChunkReader chunkReader;
private BatchData data;
private TimeValuePair prev;
private TimeValuePair current;
- public CachedDiskChunkReader(AbstractChunkReader chunkReader) {
+ public CachedDiskChunkReader(ChunkReader chunkReader) {
this.chunkReader = chunkReader;
this.prev =
TimeValuePairUtils.getEmptyTimeValuePair(chunkReader.getChunkHeader().getDataType());
@@ -44,8 +44,8 @@ public class CachedDiskChunkReader implements IPointReader {
if (data != null && data.hasCurrent()) {
return true;
}
- while (chunkReader.hasNextBatch()) {
- data = chunkReader.nextBatch();
+ while (chunkReader.hasNextSatisfiedPage()) {
+ data = chunkReader.nextPageData();
if (data.hasCurrent()) {
return true;
}
@@ -60,8 +60,8 @@ public class CachedDiskChunkReader implements IPointReader {
if (data.hasCurrent()) {
TimeValuePairUtils.setCurrentTimeValuePair(data, current());
} else {
- while (chunkReader.hasNextBatch()) {
- data = chunkReader.nextBatch();
+ while (chunkReader.hasNextSatisfiedPage()) {
+ data = chunkReader.nextPageData();
if (data.hasCurrent()) {
TimeValuePairUtils.setCurrentTimeValuePair(data, current());
break;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/ChunkReaderWrap.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/ChunkReaderWrap.java
index 2bb2bb6..4dd57bc 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/ChunkReaderWrap.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/ChunkReaderWrap.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderByTimestamp;
@@ -71,7 +70,7 @@ public class ChunkReaderWrap {
public IPointReader getIPointReader() throws IOException {
if (type.equals(ChunkReaderType.DISK_CHUNK)) {
Chunk chunk = chunkLoader.getChunk(chunkMetaData);
- AbstractChunkReader chunkReader = new ChunkReader(chunk, filter);
+ ChunkReader chunkReader = new ChunkReader(chunk, filter);
return new DiskChunkReader(chunkReader);
} else {
return new MemChunkReader(readOnlyMemChunk, filter);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReader.java
index 1e00642..240867e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReader.java
@@ -20,16 +20,15 @@ package org.apache.iotdb.db.query.reader.chunkRelated;
import java.io.IOException;
import org.apache.iotdb.db.query.reader.IPointReader;
-import org.apache.iotdb.db.query.reader.resourceRelated.NewUnseqResourceMergeReader;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.db.utils.TimeValuePairUtils;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.reader.IBatchReader;
-import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
/**
* To read chunk data on disk, this class implements an interface {@link IPointReader} based on the
- * data reader {@link AbstractChunkReader}.
+ * data reader {@link ChunkReader}.
* <p>
* Note that <code>ChunkReader</code> is an abstract class with three concrete classes, two of which
* are used here: <code>ChunkReaderWithoutFilter</code> and <code>ChunkReaderWithFilter</code>.
@@ -37,10 +36,10 @@ import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
*/
public class DiskChunkReader implements IPointReader, IBatchReader {
- private AbstractChunkReader chunkReader;
+ private ChunkReader chunkReader;
private BatchData data;
- public DiskChunkReader(AbstractChunkReader chunkReader) {
+ public DiskChunkReader(ChunkReader chunkReader) {
this.chunkReader = chunkReader;
}
@@ -49,8 +48,8 @@ public class DiskChunkReader implements IPointReader, IBatchReader {
if (data != null && data.hasCurrent()) {
return true;
}
- while (chunkReader.hasNextBatch()) {
- data = chunkReader.nextBatch();
+ while (chunkReader.hasNextSatisfiedPage()) {
+ data = chunkReader.nextPageData();
if (data.hasCurrent()) {
return true;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReaderByTimestamp.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReaderByTimestamp.java
index 81d9866..521a4db 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReaderByTimestamp.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReaderByTimestamp.java
@@ -54,8 +54,8 @@ public class DiskChunkReaderByTimestamp implements IReaderByTimestamp {
return null;
} else {
chunkReaderByTimestamp.setCurrentTimestamp(timestamp);
- if (chunkReaderByTimestamp.hasNextBatch()) {
- data = chunkReaderByTimestamp.nextBatch();
+ if (chunkReaderByTimestamp.hasNextSatisfiedPage()) {
+ data = chunkReaderByTimestamp.nextPageData();
} else {
return null;
}
@@ -70,8 +70,8 @@ public class DiskChunkReaderByTimestamp implements IReaderByTimestamp {
if (data != null && data.hasCurrent()) {
return true;
}
- if (chunkReaderByTimestamp != null && chunkReaderByTimestamp.hasNextBatch()) {
- data = chunkReaderByTimestamp.nextBatch();
+ if (chunkReaderByTimestamp != null && chunkReaderByTimestamp.hasNextSatisfiedPage()) {
+ data = chunkReaderByTimestamp.nextPageData();
return true;
}
return false;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/CachedUnseqResourceMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/CachedUnseqResourceMergeReader.java
index ee0fdd8..33c3462 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/CachedUnseqResourceMergeReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/CachedUnseqResourceMergeReader.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.db.query.reader.chunkRelated.CachedDiskChunkReader;
import org.apache.iotdb.db.query.reader.universal.CachedPriorityMergeReader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Chunk;
-import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
public class CachedUnseqResourceMergeReader extends CachedPriorityMergeReader {
@@ -35,7 +34,7 @@ public class CachedUnseqResourceMergeReader extends CachedPriorityMergeReader {
super(dataType);
int priorityValue = 1;
for (Chunk chunk : chunks) {
- AbstractChunkReader chunkReader = new ChunkReader(chunk, null);
+ ChunkReader chunkReader = new ChunkReader(chunk, null);
addReaderWithPriority(new CachedDiskChunkReader(chunkReader), priorityValue++);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/NewUnseqResourceMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/NewUnseqResourceMergeReader.java
index 61e8763..ce2d379 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/NewUnseqResourceMergeReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/NewUnseqResourceMergeReader.java
@@ -88,16 +88,18 @@ public class NewUnseqResourceMergeReader implements IBatchReader {
if (tsFileResource.isClosed()) {
// get chunk metadata list of current closed tsfile
currentChunkMetaDataList = DeviceMetaDataCache.getInstance().get(tsFileResource, seriesPath);
- List<Modification> pathModifications = context
- .getPathModifications(tsFileResource.getModFile(), seriesPath.getFullPath());
- if (!pathModifications.isEmpty()) {
- QueryUtils.modifyChunkMetaData(currentChunkMetaDataList, pathModifications);
- }
} else {
- // metadata list of already flushed chunk groups
+ // metadata list of already flushed chunks
currentChunkMetaDataList = tsFileResource.getChunkMetaDataList();
}
+ // get modifications and apply to chunk metadatas
+ List<Modification> pathModifications = context
+ .getPathModifications(tsFileResource.getModFile(), seriesPath.getFullPath());
+ if (!pathModifications.isEmpty()) {
+ QueryUtils.modifyChunkMetaData(currentChunkMetaDataList, pathModifications);
+ }
+
if (!currentChunkMetaDataList.isEmpty()) {
TsFileSequenceReader tsFileReader = FileReaderManager.getInstance()
.get(tsFileResource, tsFileResource.isClosed());
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
index a0d1253..eb065d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
@@ -37,7 +37,6 @@ import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.slf4j.Logger;
@@ -101,10 +100,10 @@ public class MergeUtils {
}
public static int writeChunkWithoutUnseq(Chunk chunk, IChunkWriter chunkWriter) throws IOException {
- AbstractChunkReader chunkReader = new ChunkReader(chunk, null);
+ ChunkReader chunkReader = new ChunkReader(chunk, null);
int ptWritten = 0;
- while (chunkReader.hasNextBatch()) {
- BatchData batchData = chunkReader.nextBatch();
+ while (chunkReader.hasNextSatisfiedPage()) {
+ BatchData batchData = chunkReader.nextPageData();
for (int i = 0; i < batchData.length(); i++) {
writeBatchPoint(batchData, i, chunkWriter);
}
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
index 45fa54c..8b88e8f 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
@@ -46,7 +46,6 @@ import org.apache.iotdb.tsfile.read.controller.ChunkLoaderImpl;
import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
import org.apache.iotdb.tsfile.read.controller.IMetadataQuerier;
import org.apache.iotdb.tsfile.read.controller.MetadataQuerierByFileImpl;
-import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
import org.apache.iotdb.tsfile.write.TsFileWriter;
import org.apache.iotdb.tsfile.write.record.TSRecord;
@@ -165,7 +164,7 @@ public class UnseqTsFileRecoverTest {
int priorityValue = 1;
for (ChunkMetaData chunkMetaData : metadataQuerier.getChunkMetaDataList(path)) {
Chunk chunk = chunkLoader.getChunk(chunkMetaData);
- AbstractChunkReader chunkReader = new ChunkReader(chunk, null);
+ ChunkReader chunkReader = new ChunkReader(chunk, null);
unSeqMergeReader
.addReaderWithPriority(new DiskChunkReader(chunkReader), priorityValue);
priorityValue++;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/AbstractChunkReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/AbstractChunkReader.java
deleted file mode 100644
index 1758ec7..0000000
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/AbstractChunkReader.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.tsfile.read.reader.chunk;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.compress.IUnCompressor;
-import org.apache.iotdb.tsfile.encoding.common.EndianType;
-import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
-import org.apache.iotdb.tsfile.file.header.ChunkHeader;
-import org.apache.iotdb.tsfile.file.header.PageHeader;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.common.BatchData;
-import org.apache.iotdb.tsfile.read.common.Chunk;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import org.apache.iotdb.tsfile.read.reader.IBatchReader;
-import org.apache.iotdb.tsfile.read.reader.page.PageReader;
-
-public abstract class AbstractChunkReader implements IBatchReader {
-
- private ChunkHeader chunkHeader;
- private ByteBuffer chunkDataBuffer;
-
- private IUnCompressor unCompressor;
- private Decoder valueDecoder;
- private Decoder timeDecoder = Decoder.getDecoderByType(
- TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
- TSDataType.INT64);
-
- protected Filter filter;
-
- private BatchData data;
-
- private PageHeader pageHeader;
- private boolean hasCachedPageHeader;
-
- /**
- * Data whose timestamp <= deletedAt should be considered deleted(not be returned).
- */
- protected long deletedAt;
-
- /**
- * constructor of ChunkReader.
- *
- * @param chunk input Chunk object
- * @param filter filter
- */
- public AbstractChunkReader(Chunk chunk, Filter filter) {
- this.filter = filter;
- this.chunkDataBuffer = chunk.getData();
- this.deletedAt = chunk.getDeletedAt();
- EndianType endianType = chunk.getEndianType();
- chunkHeader = chunk.getHeader();
- this.unCompressor = IUnCompressor.getUnCompressor(chunkHeader.getCompressionType());
- valueDecoder = Decoder
- .getDecoderByType(chunkHeader.getEncodingType(), chunkHeader.getDataType());
- valueDecoder.setEndianType(endianType);
- data = new BatchData(chunkHeader.getDataType());
- hasCachedPageHeader = false;
- }
-
- /**
- * judge if has nextBatch.
- */
- public boolean hasNextBatch() {
- if (hasCachedPageHeader) {
- return true;
- }
- // construct next satisfied page header
- while (chunkDataBuffer.remaining() > 0) {
- // deserialize a PageHeader from chunkDataBuffer
- pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, chunkHeader.getDataType());
-
- // if the current page satisfies
- if (pageSatisfied(pageHeader)) {
- hasCachedPageHeader = true;
- return true;
- } else {
- skipBytesInStreamByLength(pageHeader.getCompressedSize());
- }
- }
- return false;
- }
-
- /**
- * get next data batch.
- *
- * @return next data batch
- * @throws IOException IOException
- */
- public BatchData nextBatch() throws IOException {
- PageReader pageReader = constructPageReaderForNextPage(pageHeader.getCompressedSize());
- hasCachedPageHeader = false;
- if (pageReader.hasNextBatch()) {
- data = pageReader.nextBatch();
- return data;
- }
- return data;
- }
-
-
- public PageHeader nextPageHeader() {
- return pageHeader;
- }
-
- public void skipPageData() {
- skipBytesInStreamByLength(pageHeader.getCompressedSize());
- hasCachedPageHeader = false;
- }
-
- private void skipBytesInStreamByLength(long length) {
- chunkDataBuffer.position(chunkDataBuffer.position() + (int) length);
- }
-
- public abstract boolean pageSatisfied(PageHeader pageHeader);
-
- private PageReader constructPageReaderForNextPage(int compressedPageBodyLength)
- throws IOException {
- byte[] compressedPageBody = new byte[compressedPageBodyLength];
-
- // already in memory
- if (compressedPageBodyLength > chunkDataBuffer.remaining()) {
- throw new IOException(
- "unexpected byte read length when read compressedPageBody. Expected:"
- + Arrays.toString(compressedPageBody) + ". Actual:" + chunkDataBuffer
- .remaining());
- }
- chunkDataBuffer.get(compressedPageBody, 0, compressedPageBodyLength);
- valueDecoder.reset();
- ByteBuffer pageData = ByteBuffer.wrap(unCompressor.uncompress(compressedPageBody));
- PageReader reader = new PageReader(pageData, chunkHeader.getDataType(),
- valueDecoder, timeDecoder, filter);
- reader.setDeletedAt(deletedAt);
- return reader;
- }
-
- public void close() {
- }
-
- public ChunkHeader getChunkHeader() {
- return chunkHeader;
- }
-}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java
index cf831fc..1d8abdf 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java
@@ -16,28 +16,147 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.tsfile.read.reader.chunk;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.encoding.common.EndianType;
+import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.reader.page.PageReader;
+
+public class ChunkReader {
+
+ private ChunkHeader chunkHeader;
+ private ByteBuffer chunkDataBuffer;
+
+ private IUnCompressor unCompressor;
+ private Decoder valueDecoder;
+ private Decoder timeDecoder = Decoder.getDecoderByType(
+ TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
+ TSDataType.INT64);
-public class ChunkReader extends AbstractChunkReader {
+ protected Filter filter;
+ private PageHeader pageHeader;
+ private boolean hasCachedPageHeader;
+
+ /**
+ * Data whose timestamp <= deletedAt should be considered deleted(not be returned).
+ */
+ protected long deletedAt;
+
+ /**
+ * constructor of ChunkReader.
+ *
+ * @param chunk input Chunk object
+ * @param filter filter
+ */
public ChunkReader(Chunk chunk, Filter filter) {
- super(chunk, filter);
+ this.filter = filter;
+ this.chunkDataBuffer = chunk.getData();
+ this.deletedAt = chunk.getDeletedAt();
+ EndianType endianType = chunk.getEndianType();
+ chunkHeader = chunk.getHeader();
+ this.unCompressor = IUnCompressor.getUnCompressor(chunkHeader.getCompressionType());
+ valueDecoder = Decoder
+ .getDecoderByType(chunkHeader.getEncodingType(), chunkHeader.getDataType());
+ valueDecoder.setEndianType(endianType);
+ hasCachedPageHeader = false;
+ }
+
+ /**
+ * judge if has next page whose page header satisfies the filter.
+ */
+ public boolean hasNextSatisfiedPage() {
+ if (hasCachedPageHeader) {
+ return true;
+ }
+ // construct next satisfied page header
+ while (chunkDataBuffer.remaining() > 0) {
+ // deserialize a PageHeader from chunkDataBuffer
+ pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, chunkHeader.getDataType());
+
+ // if the current page satisfies
+ if (pageSatisfied(pageHeader)) {
+ hasCachedPageHeader = true;
+ return true;
+ } else {
+ skipBytesInStreamByLength(pageHeader.getCompressedSize());
+ }
+ }
+ return false;
+ }
+
+ /**
+ * get next data batch.
+ *
+ * @return next data batch
+ * @throws IOException IOException
+ */
+ public BatchData nextPageData() throws IOException {
+ if(hasCachedPageHeader || hasNextSatisfiedPage()) {
+ PageReader pageReader = constructPageReaderForNextPage(pageHeader);
+ hasCachedPageHeader = false;
+ return pageReader.getAllSatisfiedPageData();
+ } else {
+ throw new IOException("no next page data");
+ }
+ }
+
+ public PageHeader nextPageHeader() {
+ return pageHeader;
+ }
+
+ public void skipPageData() {
+ skipBytesInStreamByLength(pageHeader.getCompressedSize());
+ hasCachedPageHeader = false;
+ }
+
+ private void skipBytesInStreamByLength(long length) {
+ chunkDataBuffer.position(chunkDataBuffer.position() + (int) length);
}
- @Override
public boolean pageSatisfied(PageHeader pageHeader) {
- if (pageHeader.getEndTime() < deletedAt) {
+ if (pageHeader.getEndTime() <= deletedAt) {
return false;
}
- if (filter == null ) {
- return true;
- } else {
- return filter.satisfy(pageHeader.getStatistics());
+ return filter == null || filter.satisfy(pageHeader.getStatistics());
+ }
+
+ private PageReader constructPageReaderForNextPage(PageHeader pageHeader)
+ throws IOException {
+ int compressedPageBodyLength = pageHeader.getCompressedSize();
+ byte[] compressedPageBody = new byte[compressedPageBodyLength];
+
+ // doesn't has a complete page body
+ if (compressedPageBodyLength > chunkDataBuffer.remaining()) {
+ throw new IOException("do not has a complete page body. Expected:" + compressedPageBodyLength
+ + ". Actual:" + chunkDataBuffer.remaining());
}
+
+ chunkDataBuffer.get(compressedPageBody);
+ valueDecoder.reset();
+ ByteBuffer pageData = ByteBuffer.wrap(unCompressor.uncompress(compressedPageBody));
+ PageReader reader = new PageReader(pageData, chunkHeader.getDataType(),
+ valueDecoder, timeDecoder, filter);
+ reader.setDeletedAt(deletedAt);
+ return reader;
}
+ public void close() {
+ }
+
+ public ChunkHeader getChunkHeader() {
+ return chunkHeader;
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderByTimestamp.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderByTimestamp.java
index 1630a63..1f487d0 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderByTimestamp.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderByTimestamp.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.tsfile.read.reader.chunk;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.read.common.Chunk;
-public class ChunkReaderByTimestamp extends AbstractChunkReader {
+public class ChunkReaderByTimestamp extends ChunkReader {
private long currentTimestamp;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
index 7d113e9..2e43f77 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
@@ -25,7 +25,9 @@ import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
@@ -45,32 +47,24 @@ public class PageReader {
/** value column in memory */
private ByteBuffer valueBuffer;
- private BatchData data = null;
-
- private Filter filter = null;
+ private Filter filter;
+ /** Data whose timestamp <= deletedAt should be considered deleted(not be returned). */
private long deletedAt = Long.MIN_VALUE;
public PageReader(ByteBuffer pageData, TSDataType dataType, Decoder valueDecoder,
- Decoder timeDecoder,
- Filter filter) {
- this(pageData, dataType, valueDecoder, timeDecoder);
- this.filter = filter;
- }
-
- public PageReader(ByteBuffer pageData, TSDataType dataType, Decoder valueDecoder,
- Decoder timeDecoder) {
+ Decoder timeDecoder, Filter filter) {
this.dataType = dataType;
this.valueDecoder = valueDecoder;
this.timeDecoder = timeDecoder;
+ this.filter = filter;
splitDataToTimeStampAndValue(pageData);
}
/**
* split pageContent into two stream: time and value
*
- * @param pageData
- * uncompressed bytes size of time column, time column, value column
+ * @param pageData uncompressed bytes size of time column, time column, value column
*/
private void splitDataToTimeStampAndValue(ByteBuffer pageData) {
int timeBufferLength = ReadWriteForEncodingUtils.readUnsignedVarInt(pageData);
@@ -82,28 +76,7 @@ public class PageReader {
valueBuffer.position(timeBufferLength);
}
- public boolean hasNextBatch() throws IOException {
- return timeDecoder.hasNext(timeBuffer);
- }
-
- /**
- * may return an empty BatchData
- */
- public BatchData nextBatch() throws IOException {
- if (filter == null) {
- data = getAllPageData();
- } else {
- data = getAllPageDataWithFilter();
- }
-
- return data;
- }
-
- public BatchData currentBatch() {
- return data;
- }
-
- private BatchData getAllPageData() throws IOException {
+ public BatchData getAllSatisfiedPageData() throws IOException {
BatchData pageData = new BatchData(dataType, true);
@@ -113,42 +86,42 @@ public class PageReader {
switch (dataType) {
case BOOLEAN:
boolean aBoolean = valueDecoder.readBoolean(valueBuffer);
- if (timestamp > deletedAt) {
+ if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, aBoolean))) {
pageData.putTime(timestamp);
pageData.putBoolean(aBoolean);
}
break;
case INT32:
int anInt = valueDecoder.readInt(valueBuffer);
- if (timestamp > deletedAt) {
+ if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, anInt))) {
pageData.putTime(timestamp);
pageData.putInt(anInt);
}
break;
case INT64:
long aLong = valueDecoder.readLong(valueBuffer);
- if (timestamp > deletedAt) {
+ if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, aLong))) {
pageData.putTime(timestamp);
pageData.putLong(aLong);
}
break;
case FLOAT:
float aFloat = valueDecoder.readFloat(valueBuffer);
- if (timestamp > deletedAt) {
+ if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, aFloat))) {
pageData.putTime(timestamp);
pageData.putFloat(aFloat);
}
break;
case DOUBLE:
double aDouble = valueDecoder.readDouble(valueBuffer);
- if (timestamp > deletedAt) {
+ if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, aDouble))) {
pageData.putTime(timestamp);
pageData.putDouble(aDouble);
}
break;
case TEXT:
Binary aBinary = valueDecoder.readBinary(valueBuffer);
- if (timestamp > deletedAt) {
+ if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, aBinary))) {
pageData.putTime(timestamp);
pageData.putBinary(aBinary);
}
@@ -160,86 +133,6 @@ public class PageReader {
return pageData;
}
- private BatchData getAllPageDataWithFilter() throws IOException {
- BatchData pageData = new BatchData(dataType, true);
-
- while (timeDecoder.hasNext(timeBuffer)) {
- long timestamp = timeDecoder.readLong(timeBuffer);
-
- switch (dataType) {
- case BOOLEAN:
- readBoolean(pageData, timestamp);
- break;
- case INT32:
- readInt(pageData, timestamp);
- break;
- case INT64:
- readLong(pageData, timestamp);
- break;
- case FLOAT:
- readFloat(pageData, timestamp);
- break;
- case DOUBLE:
- readDouble(pageData, timestamp);
- break;
- case TEXT:
- readText(pageData, timestamp);
- break;
- default:
- throw new UnSupportedDataTypeException(String.valueOf(dataType));
- }
- }
-
- return pageData;
- }
-
- private void readBoolean(BatchData pageData, long timestamp) {
- boolean aBoolean = valueDecoder.readBoolean(valueBuffer);
- if (timestamp > deletedAt && filter.satisfy(timestamp, aBoolean)) {
- pageData.putTime(timestamp);
- pageData.putBoolean(aBoolean);
- }
- }
-
- private void readInt(BatchData pageData, long timestamp) {
- int anInt = valueDecoder.readInt(valueBuffer);
- if (timestamp > deletedAt && filter.satisfy(timestamp, anInt)) {
- pageData.putTime(timestamp);
- pageData.putInt(anInt);
- }
- }
-
- private void readLong(BatchData pageData, long timestamp) {
- long aLong = valueDecoder.readLong(valueBuffer);
- if (timestamp > deletedAt && filter.satisfy(timestamp, aLong)) {
- pageData.putTime(timestamp);
- pageData.putLong(aLong);
- }
- }
-
- private void readFloat(BatchData pageData, long timestamp) {
- float aFloat = valueDecoder.readFloat(valueBuffer);
- if (timestamp > deletedAt && filter.satisfy(timestamp, aFloat)) {
- pageData.putTime(timestamp);
- pageData.putFloat(aFloat);
- }
- }
-
- private void readDouble(BatchData pageData, long timestamp) {
- double aDouble = valueDecoder.readDouble(valueBuffer);
- if (timestamp > deletedAt && filter.satisfy(timestamp, aDouble)) {
- pageData.putTime(timestamp);
- pageData.putDouble(aDouble);
- }
- }
-
- private void readText(BatchData pageData, long timestamp) {
- Binary aBinary = valueDecoder.readBinary(valueBuffer);
- if (timestamp > deletedAt && filter.satisfy(timestamp, aBinary)) {
- pageData.putTime(timestamp);
- pageData.putBinary(aBinary);
- }
- }
public void close() {
timeBuffer = null;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/AbstractFileSeriesReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/AbstractFileSeriesReader.java
index 069c435..14bb4be 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/AbstractFileSeriesReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/AbstractFileSeriesReader.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.reader.IAggregateReader;
-import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
import java.io.IOException;
import java.util.List;
@@ -37,7 +37,7 @@ public abstract class AbstractFileSeriesReader implements IAggregateReader {
protected IChunkLoader chunkLoader;
protected List<ChunkMetaData> chunkMetaDataList;
- protected AbstractChunkReader chunkReader;
+ protected ChunkReader chunkReader;
private int chunkToRead;
private BatchData data;
@@ -63,7 +63,7 @@ public abstract class AbstractFileSeriesReader implements IAggregateReader {
public boolean hasNextBatch() throws IOException {
// current chunk has additional batch
- if (chunkReader != null && chunkReader.hasNextBatch()) {
+ if (chunkReader != null && chunkReader.hasNextSatisfiedPage()) {
return true;
}
@@ -75,7 +75,7 @@ public abstract class AbstractFileSeriesReader implements IAggregateReader {
// chunk metadata satisfy the condition
initChunkReader(chunkMetaData);
- if (chunkReader.hasNextBatch()) {
+ if (chunkReader.hasNextSatisfiedPage()) {
return true;
}
}
@@ -87,7 +87,7 @@ public abstract class AbstractFileSeriesReader implements IAggregateReader {
* get next batch data.
*/
public BatchData nextBatch() throws IOException {
- data = chunkReader.nextBatch();
+ data = chunkReader.nextPageData();
return data;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderByTimestamp.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderByTimestamp.java
index 13be71b..1843229 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderByTimestamp.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderByTimestamp.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
-import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderByTimestamp;
/**
@@ -40,7 +40,7 @@ public class FileSeriesReaderByTimestamp {
protected List<ChunkMetaData> chunkMetaDataList;
private int currentChunkIndex = 0;
- private AbstractChunkReader chunkReader;
+ private ChunkReader chunkReader;
private long currentTimestamp;
private BatchData data = null; // current batch data
@@ -69,8 +69,8 @@ public class FileSeriesReaderByTimestamp {
return null;
}
- if (chunkReader.hasNextBatch()) {
- data = chunkReader.nextBatch();
+ if (chunkReader.hasNextSatisfiedPage()) {
+ data = chunkReader.nextPageData();
} else {
return null;
}
@@ -93,8 +93,8 @@ public class FileSeriesReaderByTimestamp {
}
return null;
} else {
- if (chunkReader.hasNextBatch()) {
- data = chunkReader.nextBatch();
+ if (chunkReader.hasNextSatisfiedPage()) {
+ data = chunkReader.nextPageData();
} else if (!constructNextSatisfiedChunkReader()) {
return null;
}
@@ -115,16 +115,16 @@ public class FileSeriesReaderByTimestamp {
if (data != null && data.hasCurrent()) {
return true;
}
- while (chunkReader.hasNextBatch()) {
- data = chunkReader.nextBatch();
+ while (chunkReader.hasNextSatisfiedPage()) {
+ data = chunkReader.nextPageData();
if (data != null && data.hasCurrent()) {
return true;
}
}
}
while (constructNextSatisfiedChunkReader()) {
- while (chunkReader.hasNextBatch()) {
- data = chunkReader.nextBatch();
+ while (chunkReader.hasNextSatisfiedPage()) {
+ data = chunkReader.nextPageData();
if (data != null && data.hasCurrent()) {
return true;
}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/PageReaderTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/PageReaderTest.java
index bcce85c..6a55bd1 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/PageReaderTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/PageReaderTest.java
@@ -171,14 +171,11 @@ public class PageReaderTest {
ByteBuffer page = ByteBuffer.wrap(pageWriter.getUncompressedBytes().array());
PageReader pageReader = new PageReader(page, dataType, decoder,
- new DeltaBinaryDecoder.LongDeltaDecoder());
+ new DeltaBinaryDecoder.LongDeltaDecoder(), null);
int index = 0;
long startTimestamp = System.currentTimeMillis();
- BatchData data = null;
- if (pageReader.hasNextBatch()) {
- data = pageReader.nextBatch();
- }
+ BatchData data = pageReader.getAllSatisfiedPageData();
assert data != null;
while (data.hasCurrent()) {