You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2019/10/29 13:08:49 UTC
[incubator-iotdb] 01/01: read chunk data before chunk header
This is an automated email from the ASF dual-hosted git repository.
suyue pushed a commit to branch rel/0.8_sy_test
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 2fba6fa1b572b9173d389a46873408353bfb4c66
Author: suyue <23...@qq.com>
AuthorDate: Tue Oct 29 21:05:00 2019 +0800
read chunk data before chunk header
---
.../iotdb/tsfile/file/header/ChunkHeader.java | 42 ++++++++++++++--------
.../iotdb/tsfile/file/metadata/ChunkMetaData.java | 19 ++++++++++
.../iotdb/tsfile/read/TsFileSequenceReader.java | 30 ++++++++++------
.../iotdb/tsfile/write/chunk/ChunkBuffer.java | 2 +-
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 3 +-
.../iotdb/tsfile/write/TsFileIOWriterTest.java | 2 +-
6 files changed, 70 insertions(+), 28 deletions(-)
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
index 0dc5ef0..232ec5f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
@@ -59,19 +59,30 @@ public class ChunkHeader {
public ChunkHeader(String measurementID, int dataSize, TSDataType dataType,
CompressionType compressionType,
TSEncoding encoding, int numOfPages) {
- this(measurementID, dataSize, dataType, compressionType, encoding, numOfPages, 0);
+ this(measurementID, dataSize, getSerializedSize(measurementID), dataType, compressionType,
+ encoding, numOfPages, 0);
+ }
+
+ public ChunkHeader(String measurementID, int dataSize, int headerSize, TSDataType dataType,
+ CompressionType compressionType, TSEncoding encoding, int numOfPages) {
+ this(measurementID, dataSize, headerSize, dataType, compressionType, encoding, numOfPages, 0);
}
private ChunkHeader(String measurementID, int dataSize, TSDataType dataType,
- CompressionType compressionType,
- TSEncoding encoding, int numOfPages, long maxTombstoneTime) {
+ CompressionType compressionType, TSEncoding encoding, int numOfPages, long maxTombstoneTime) {
+ this(measurementID, dataSize, getSerializedSize(measurementID), dataType, compressionType,
+ encoding, numOfPages, maxTombstoneTime);
+ }
+
+ private ChunkHeader(String measurementID, int dataSize, int headerSize, TSDataType dataType,
+ CompressionType compressionType, TSEncoding encoding, int numOfPages, long maxTombstoneTime) {
this.measurementID = measurementID;
this.dataSize = dataSize;
this.dataType = dataType;
this.compressionType = compressionType;
this.numOfPages = numOfPages;
this.encodingType = encoding;
- this.serializedSize = getSerializedSize(measurementID);
+ this.serializedSize = headerSize;
this.maxTombstoneTime = maxTombstoneTime;
}
@@ -132,7 +143,8 @@ public class ChunkHeader {
}
String measurementID = ReadWriteIOUtils.readString(byteBuffer);
- return deserializePartFrom(measurementID, byteBuffer);
+ return deserializePartFrom(measurementID, ChunkHeader.getSerializedSize(measurementID),
+ byteBuffer);
}
/**
@@ -140,39 +152,39 @@ public class ChunkHeader {
*
* @param input TsFileInput
* @param offset offset
+ * @param chunkHeaderSize the size of chunk's header
* @param markerRead read marker (boolean type)
* @return CHUNK_HEADER object
* @throws IOException IOException
*/
- public static ChunkHeader deserializeFrom(TsFileInput input, long offset, boolean markerRead)
+ public static ChunkHeader deserializeFrom(TsFileInput input, long offset, int chunkHeaderSize,
+ boolean markerRead)
throws IOException {
long offsetVar = offset;
if (!markerRead) {
offsetVar++;
}
- ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES);
+ ByteBuffer buffer = ByteBuffer.allocate(chunkHeaderSize);
input.read(buffer, offsetVar);
buffer.flip();
+
int size = buffer.getInt();
- offsetVar += Integer.BYTES;
- buffer = ByteBuffer.allocate(getSerializedSize(size));
- ReadWriteIOUtils.readAsPossible(input, offsetVar, buffer);
- buffer.flip();
String measurementID = ReadWriteIOUtils.readStringWithoutLength(buffer, size);
- return deserializePartFrom(measurementID, buffer);
+ return deserializePartFrom(measurementID, chunkHeaderSize, buffer);
}
- private static ChunkHeader deserializePartFrom(String measurementID, ByteBuffer buffer) throws UnsupportedEncodingException {
+ private static ChunkHeader deserializePartFrom(String measurementID, int chunkHeaderSize,
+ ByteBuffer buffer) {
int dataSize = ReadWriteIOUtils.readInt(buffer);
TSDataType dataType = TSDataType.deserialize(ReadWriteIOUtils.readShort(buffer));
int numOfPages = ReadWriteIOUtils.readInt(buffer);
CompressionType type = ReadWriteIOUtils.readCompressionType(buffer);
TSEncoding encoding = ReadWriteIOUtils.readEncoding(buffer);
long maxTombstoneTime = ReadWriteIOUtils.readLong(buffer);
- return new ChunkHeader(measurementID, dataSize, dataType, type, encoding, numOfPages,
+ return new ChunkHeader(measurementID, dataSize, chunkHeaderSize, dataType, type, encoding,
+ numOfPages,
maxTombstoneTime);
}
-
public int getSerializedSize() {
return serializedSize;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetaData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetaData.java
index c63152e..9c04144 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetaData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetaData.java
@@ -68,6 +68,11 @@ public class ChunkMetaData {
private TsDigest valuesStatistics;
+ /**
+ * chunk data size.
+ */
+ private long dataSize;
+
private ChunkMetaData() {
}
@@ -108,6 +113,7 @@ public class ChunkMetaData {
chunkMetaData.endTime = ReadWriteIOUtils.readLong(inputStream);
chunkMetaData.tsDataType = ReadWriteIOUtils.readDataType(inputStream);
+ chunkMetaData.dataSize = ReadWriteIOUtils.readLong(inputStream);
chunkMetaData.valuesStatistics = TsDigest.deserializeFrom(inputStream);
@@ -129,6 +135,7 @@ public class ChunkMetaData {
chunkMetaData.startTime = ReadWriteIOUtils.readLong(buffer);
chunkMetaData.endTime = ReadWriteIOUtils.readLong(buffer);
chunkMetaData.tsDataType = ReadWriteIOUtils.readDataType(buffer);
+ chunkMetaData.dataSize = ReadWriteIOUtils.readLong(buffer);
chunkMetaData.valuesStatistics = TsDigest.deserializeFrom(buffer);
@@ -144,6 +151,7 @@ public class ChunkMetaData {
int serializedSize = (Integer.BYTES +
4 * Long.BYTES + // 4 long: offsetOfChunkHeader, numOfPoints, startTime, endTime
TSDataType.getSerializedSize() + // TSDataType
+ Long.BYTES + // data size
(valuesStatistics == null ? TsDigest.getNullDigestSize()
: valuesStatistics.getSerializedSize()));
try {
@@ -231,6 +239,7 @@ public class ChunkMetaData {
byteLen += ReadWriteIOUtils.write(startTime, outputStream);
byteLen += ReadWriteIOUtils.write(endTime, outputStream);
byteLen += ReadWriteIOUtils.write(tsDataType, outputStream);
+ byteLen += ReadWriteIOUtils.write(dataSize, outputStream);
if (valuesStatistics == null) {
byteLen += TsDigest.serializeNullTo(outputStream);
@@ -255,6 +264,7 @@ public class ChunkMetaData {
byteLen += ReadWriteIOUtils.write(startTime, buffer);
byteLen += ReadWriteIOUtils.write(endTime, buffer);
byteLen += ReadWriteIOUtils.write(tsDataType, buffer);
+ byteLen += ReadWriteIOUtils.write(dataSize, buffer);
if (valuesStatistics == null) {
byteLen += TsDigest.serializeNullTo(buffer);
@@ -280,6 +290,14 @@ public class ChunkMetaData {
this.deletedAt = deletedAt;
}
+ public long getDataSize() {
+ return dataSize;
+ }
+
+ public void setDataSize(long dataSize) {
+ this.dataSize = dataSize;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -297,6 +315,7 @@ public class ChunkMetaData {
deletedAt == that.deletedAt &&
Objects.equals(measurementUid, that.measurementUid) &&
tsDataType == that.tsDataType &&
+ dataSize == that.dataSize &&
Objects.equals(valuesStatistics, that.valuesStatistics);
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index eec73e0..b5a0fd5 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -291,8 +291,9 @@ public class TsFileSequenceReader implements AutoCloseable {
* @param position the file offset of this chunk's header
* @param markerRead true if the offset does not contains the marker , otherwise false
*/
- private ChunkHeader readChunkHeader(long position, boolean markerRead) throws IOException {
- return ChunkHeader.deserializeFrom(tsFileInput, position, markerRead);
+ private ChunkHeader readChunkHeader(long position, int chunkHeaderSize, boolean markerRead)
+ throws IOException {
+ return ChunkHeader.deserializeFrom(tsFileInput, position, chunkHeaderSize, markerRead);
}
/**
@@ -333,9 +334,13 @@ public class TsFileSequenceReader implements AutoCloseable {
* @return -chunk
*/
public Chunk readMemChunk(ChunkMetaData metaData) throws IOException {
- ChunkHeader header = readChunkHeader(metaData.getOffsetOfChunkHeader(), false);
- ByteBuffer buffer = readChunk(metaData.getOffsetOfChunkHeader() + header.getSerializedSize(),
- header.getDataSize());
+ int chunkHeadSize = ChunkHeader.getSerializedSize(metaData.getMeasurementUid());
+ long chunkHeaderPos = metaData.getOffsetOfChunkHeader();
+ long chunkDataPos = metaData.getOffsetOfChunkHeader() + chunkHeadSize;
+ int chunkDataSize = (int) metaData.getDataSize();
+
+ ByteBuffer buffer = readChunk(chunkDataPos, chunkDataSize);
+ ChunkHeader header = readChunkHeader(chunkHeaderPos, chunkHeadSize, false);
return new Chunk(header, buffer);
}
@@ -569,11 +574,16 @@ public class TsFileSequenceReader implements AutoCloseable {
startTimeOfChunk, endTimeOfChunk);
currentChunk.setNumOfPoints(numOfPoints);
Map<String, ByteBuffer> statisticsMap = new HashMap<>();
- statisticsMap.put(StatisticConstant.MAX_VALUE, ByteBuffer.wrap(chunkStatistics.getMaxBytes()));
- statisticsMap.put(StatisticConstant.MIN_VALUE, ByteBuffer.wrap(chunkStatistics.getMinBytes()));
- statisticsMap.put(StatisticConstant.FIRST, ByteBuffer.wrap(chunkStatistics.getFirstBytes()));
- statisticsMap.put(StatisticConstant.SUM, ByteBuffer.wrap(chunkStatistics.getSumBytes()));
- statisticsMap.put(StatisticConstant.LAST, ByteBuffer.wrap(chunkStatistics.getLastBytes()));
+ statisticsMap
+ .put(StatisticConstant.MAX_VALUE, ByteBuffer.wrap(chunkStatistics.getMaxBytes()));
+ statisticsMap
+ .put(StatisticConstant.MIN_VALUE, ByteBuffer.wrap(chunkStatistics.getMinBytes()));
+ statisticsMap
+ .put(StatisticConstant.FIRST, ByteBuffer.wrap(chunkStatistics.getFirstBytes()));
+ statisticsMap
+ .put(StatisticConstant.SUM, ByteBuffer.wrap(chunkStatistics.getSumBytes()));
+ statisticsMap
+ .put(StatisticConstant.LAST, ByteBuffer.wrap(chunkStatistics.getLastBytes()));
TsDigest tsDigest = new TsDigest();
tsDigest.setStatistics(statisticsMap);
currentChunk.setDigest(tsDigest);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkBuffer.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkBuffer.java
index 290c516..e4120a0 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkBuffer.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkBuffer.java
@@ -195,7 +195,7 @@ public class ChunkBuffer {
long size = writer.getPos() - totalByteSize;
assert size == pageBuffer.size();
- writer.endChunk(totalValueCount);
+ writer.endChunk(totalValueCount, size);
return headerSize + size;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 206fce5..2c0c64a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -210,8 +210,9 @@ public class TsFileIOWriter {
*
* @param totalValueCount -set the number of points to the currentChunkMetaData
*/
- public void endChunk(long totalValueCount) {
+ public void endChunk(long totalValueCount, long size) {
currentChunkMetaData.setNumOfPoints(totalValueCount);
+ currentChunkMetaData.setDataSize(size);
currentChunkGroupMetaData.addTimeSeriesChunkMetaData(currentChunkMetaData);
LOG.debug("end series chunk:{},totalvalue:{}", currentChunkMetaData, totalValueCount);
currentChunkMetaData = null;
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
index 911c3c0..b83e0d9 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
@@ -59,7 +59,7 @@ public class TsFileIOWriterTest {
writer.startChunkGroup(deviceId);
writer.startFlushChunk(measurementSchema, measurementSchema.getCompressor(),
measurementSchema.getType(), measurementSchema.getEncodingType(), statistics, 0, 0, 0, 0);
- writer.endChunk(0);
+ writer.endChunk(0,0);
writer.endChunkGroup(0);
// end file