You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2019/10/29 13:08:49 UTC

[incubator-iotdb] 01/01: read chunk data before chunk header

This is an automated email from the ASF dual-hosted git repository.

suyue pushed a commit to branch rel/0.8_sy_test
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 2fba6fa1b572b9173d389a46873408353bfb4c66
Author: suyue <23...@qq.com>
AuthorDate: Tue Oct 29 21:05:00 2019 +0800

    read chunk data before chunk header
---
 .../iotdb/tsfile/file/header/ChunkHeader.java      | 42 ++++++++++++++--------
 .../iotdb/tsfile/file/metadata/ChunkMetaData.java  | 19 ++++++++++
 .../iotdb/tsfile/read/TsFileSequenceReader.java    | 30 ++++++++++------
 .../iotdb/tsfile/write/chunk/ChunkBuffer.java      |  2 +-
 .../iotdb/tsfile/write/writer/TsFileIOWriter.java  |  3 +-
 .../iotdb/tsfile/write/TsFileIOWriterTest.java     |  2 +-
 6 files changed, 70 insertions(+), 28 deletions(-)

diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
index 0dc5ef0..232ec5f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
@@ -59,19 +59,30 @@ public class ChunkHeader {
   public ChunkHeader(String measurementID, int dataSize, TSDataType dataType,
       CompressionType compressionType,
       TSEncoding encoding, int numOfPages) {
-    this(measurementID, dataSize, dataType, compressionType, encoding, numOfPages, 0);
+    this(measurementID, dataSize, getSerializedSize(measurementID), dataType, compressionType,
+        encoding, numOfPages, 0);
+  }
+
+  public ChunkHeader(String measurementID, int dataSize, int headerSize, TSDataType dataType,
+      CompressionType compressionType, TSEncoding encoding, int numOfPages) {
+    this(measurementID, dataSize, headerSize, dataType, compressionType, encoding, numOfPages, 0);
   }
 
   private ChunkHeader(String measurementID, int dataSize, TSDataType dataType,
-      CompressionType compressionType,
-      TSEncoding encoding, int numOfPages, long maxTombstoneTime) {
+      CompressionType compressionType, TSEncoding encoding, int numOfPages, long maxTombstoneTime) {
+    this(measurementID, dataSize, getSerializedSize(measurementID), dataType, compressionType,
+        encoding, numOfPages, maxTombstoneTime);
+  }
+
+  private ChunkHeader(String measurementID, int dataSize, int headerSize, TSDataType dataType,
+      CompressionType compressionType, TSEncoding encoding, int numOfPages, long maxTombstoneTime) {
     this.measurementID = measurementID;
     this.dataSize = dataSize;
     this.dataType = dataType;
     this.compressionType = compressionType;
     this.numOfPages = numOfPages;
     this.encodingType = encoding;
-    this.serializedSize = getSerializedSize(measurementID);
+    this.serializedSize = headerSize;
     this.maxTombstoneTime = maxTombstoneTime;
   }
 
@@ -132,7 +143,8 @@ public class ChunkHeader {
     }
 
     String measurementID = ReadWriteIOUtils.readString(byteBuffer);
-    return deserializePartFrom(measurementID, byteBuffer);
+    return deserializePartFrom(measurementID, ChunkHeader.getSerializedSize(measurementID),
+        byteBuffer);
   }
 
   /**
@@ -140,39 +152,39 @@ public class ChunkHeader {
    *
    * @param input TsFileInput
    * @param offset offset
+   * @param chunkHeaderSize the size of chunk's header
    * @param markerRead read marker (boolean type)
    * @return CHUNK_HEADER object
    * @throws IOException IOException
    */
-  public static ChunkHeader deserializeFrom(TsFileInput input, long offset, boolean markerRead)
+  public static ChunkHeader deserializeFrom(TsFileInput input, long offset, int chunkHeaderSize,
+      boolean markerRead)
       throws IOException {
     long offsetVar = offset;
     if (!markerRead) {
       offsetVar++;
     }
-    ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES);
+    ByteBuffer buffer = ByteBuffer.allocate(chunkHeaderSize);
     input.read(buffer, offsetVar);
     buffer.flip();
+
     int size = buffer.getInt();
-    offsetVar += Integer.BYTES;
-    buffer = ByteBuffer.allocate(getSerializedSize(size));
-    ReadWriteIOUtils.readAsPossible(input, offsetVar, buffer);
-    buffer.flip();
     String measurementID = ReadWriteIOUtils.readStringWithoutLength(buffer, size);
-    return deserializePartFrom(measurementID, buffer);
+    return deserializePartFrom(measurementID, chunkHeaderSize, buffer);
   }
 
-  private static ChunkHeader deserializePartFrom(String measurementID, ByteBuffer buffer) throws UnsupportedEncodingException {
+  private static ChunkHeader deserializePartFrom(String measurementID, int chunkHeaderSize,
+      ByteBuffer buffer) {
     int dataSize = ReadWriteIOUtils.readInt(buffer);
     TSDataType dataType = TSDataType.deserialize(ReadWriteIOUtils.readShort(buffer));
     int numOfPages = ReadWriteIOUtils.readInt(buffer);
     CompressionType type = ReadWriteIOUtils.readCompressionType(buffer);
     TSEncoding encoding = ReadWriteIOUtils.readEncoding(buffer);
     long maxTombstoneTime = ReadWriteIOUtils.readLong(buffer);
-    return new ChunkHeader(measurementID, dataSize, dataType, type, encoding, numOfPages,
+    return new ChunkHeader(measurementID, dataSize, chunkHeaderSize, dataType, type, encoding,
+        numOfPages,
         maxTombstoneTime);
   }
-
   public int getSerializedSize() {
     return serializedSize;
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetaData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetaData.java
index c63152e..9c04144 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetaData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetaData.java
@@ -68,6 +68,11 @@ public class ChunkMetaData {
 
   private TsDigest valuesStatistics;
 
+  /**
+   * chunk data size.
+   */
+  private long dataSize;
+
   private ChunkMetaData() {
   }
 
@@ -108,6 +113,7 @@ public class ChunkMetaData {
     chunkMetaData.endTime = ReadWriteIOUtils.readLong(inputStream);
 
     chunkMetaData.tsDataType = ReadWriteIOUtils.readDataType(inputStream);
+    chunkMetaData.dataSize = ReadWriteIOUtils.readLong(inputStream);
 
     chunkMetaData.valuesStatistics = TsDigest.deserializeFrom(inputStream);
 
@@ -129,6 +135,7 @@ public class ChunkMetaData {
     chunkMetaData.startTime = ReadWriteIOUtils.readLong(buffer);
     chunkMetaData.endTime = ReadWriteIOUtils.readLong(buffer);
     chunkMetaData.tsDataType = ReadWriteIOUtils.readDataType(buffer);
+    chunkMetaData.dataSize = ReadWriteIOUtils.readLong(buffer);
 
     chunkMetaData.valuesStatistics = TsDigest.deserializeFrom(buffer);
 
@@ -144,6 +151,7 @@ public class ChunkMetaData {
     int serializedSize = (Integer.BYTES  +
             4 * Long.BYTES + // 4 long: offsetOfChunkHeader, numOfPoints, startTime, endTime
             TSDataType.getSerializedSize() + // TSDataType
+         Long.BYTES + // data size
             (valuesStatistics == null ? TsDigest.getNullDigestSize()
                     : valuesStatistics.getSerializedSize()));
     try {
@@ -231,6 +239,7 @@ public class ChunkMetaData {
     byteLen += ReadWriteIOUtils.write(startTime, outputStream);
     byteLen += ReadWriteIOUtils.write(endTime, outputStream);
     byteLen += ReadWriteIOUtils.write(tsDataType, outputStream);
+    byteLen += ReadWriteIOUtils.write(dataSize, outputStream);
 
     if (valuesStatistics == null) {
       byteLen += TsDigest.serializeNullTo(outputStream);
@@ -255,6 +264,7 @@ public class ChunkMetaData {
     byteLen += ReadWriteIOUtils.write(startTime, buffer);
     byteLen += ReadWriteIOUtils.write(endTime, buffer);
     byteLen += ReadWriteIOUtils.write(tsDataType, buffer);
+    byteLen += ReadWriteIOUtils.write(dataSize, buffer);
 
     if (valuesStatistics == null) {
       byteLen += TsDigest.serializeNullTo(buffer);
@@ -280,6 +290,14 @@ public class ChunkMetaData {
     this.deletedAt = deletedAt;
   }
 
+  public long getDataSize() {
+    return dataSize;
+  }
+
+  public void setDataSize(long dataSize) {
+    this.dataSize = dataSize;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
@@ -297,6 +315,7 @@ public class ChunkMetaData {
         deletedAt == that.deletedAt &&
         Objects.equals(measurementUid, that.measurementUid) &&
         tsDataType == that.tsDataType &&
+        dataSize == that.dataSize &&
         Objects.equals(valuesStatistics, that.valuesStatistics);
   }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index eec73e0..b5a0fd5 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -291,8 +291,9 @@ public class TsFileSequenceReader implements AutoCloseable {
    * @param position the file offset of this chunk's header
    * @param markerRead true if the offset does not contains the marker , otherwise false
    */
-  private ChunkHeader readChunkHeader(long position, boolean markerRead) throws IOException {
-    return ChunkHeader.deserializeFrom(tsFileInput, position, markerRead);
+  private ChunkHeader readChunkHeader(long position, int chunkHeaderSize, boolean markerRead)
+      throws IOException {
+    return ChunkHeader.deserializeFrom(tsFileInput, position, chunkHeaderSize, markerRead);
   }
 
   /**
@@ -333,9 +334,13 @@ public class TsFileSequenceReader implements AutoCloseable {
    * @return -chunk
    */
   public Chunk readMemChunk(ChunkMetaData metaData) throws IOException {
-    ChunkHeader header = readChunkHeader(metaData.getOffsetOfChunkHeader(), false);
-    ByteBuffer buffer = readChunk(metaData.getOffsetOfChunkHeader() + header.getSerializedSize(),
-        header.getDataSize());
+    int chunkHeadSize = ChunkHeader.getSerializedSize(metaData.getMeasurementUid());
+    long chunkHeaderPos = metaData.getOffsetOfChunkHeader();
+    long chunkDataPos = metaData.getOffsetOfChunkHeader() + chunkHeadSize;
+    int chunkDataSize = (int) metaData.getDataSize();
+
+    ByteBuffer buffer = readChunk(chunkDataPos, chunkDataSize);
+    ChunkHeader header = readChunkHeader(chunkHeaderPos, chunkHeadSize, false);
     return new Chunk(header, buffer);
   }
 
@@ -569,11 +574,16 @@ public class TsFileSequenceReader implements AutoCloseable {
                 startTimeOfChunk, endTimeOfChunk);
             currentChunk.setNumOfPoints(numOfPoints);
             Map<String, ByteBuffer> statisticsMap = new HashMap<>();
-            statisticsMap.put(StatisticConstant.MAX_VALUE, ByteBuffer.wrap(chunkStatistics.getMaxBytes()));
-            statisticsMap.put(StatisticConstant.MIN_VALUE, ByteBuffer.wrap(chunkStatistics.getMinBytes()));
-            statisticsMap.put(StatisticConstant.FIRST, ByteBuffer.wrap(chunkStatistics.getFirstBytes()));
-            statisticsMap.put(StatisticConstant.SUM, ByteBuffer.wrap(chunkStatistics.getSumBytes()));
-            statisticsMap.put(StatisticConstant.LAST, ByteBuffer.wrap(chunkStatistics.getLastBytes()));
+            statisticsMap
+                .put(StatisticConstant.MAX_VALUE, ByteBuffer.wrap(chunkStatistics.getMaxBytes()));
+            statisticsMap
+                .put(StatisticConstant.MIN_VALUE, ByteBuffer.wrap(chunkStatistics.getMinBytes()));
+            statisticsMap
+                .put(StatisticConstant.FIRST, ByteBuffer.wrap(chunkStatistics.getFirstBytes()));
+            statisticsMap
+                .put(StatisticConstant.SUM, ByteBuffer.wrap(chunkStatistics.getSumBytes()));
+            statisticsMap
+                .put(StatisticConstant.LAST, ByteBuffer.wrap(chunkStatistics.getLastBytes()));
             TsDigest tsDigest = new TsDigest();
             tsDigest.setStatistics(statisticsMap);
             currentChunk.setDigest(tsDigest);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkBuffer.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkBuffer.java
index 290c516..e4120a0 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkBuffer.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkBuffer.java
@@ -195,7 +195,7 @@ public class ChunkBuffer {
     long size = writer.getPos() - totalByteSize;
     assert size == pageBuffer.size();
 
-    writer.endChunk(totalValueCount);
+    writer.endChunk(totalValueCount, size);
     return headerSize + size;
   }
 
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 206fce5..2c0c64a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -210,8 +210,9 @@ public class TsFileIOWriter {
    *
    * @param totalValueCount -set the number of points to the currentChunkMetaData
    */
-  public void endChunk(long totalValueCount) {
+  public void endChunk(long totalValueCount, long size) {
     currentChunkMetaData.setNumOfPoints(totalValueCount);
+    currentChunkMetaData.setDataSize(size);
     currentChunkGroupMetaData.addTimeSeriesChunkMetaData(currentChunkMetaData);
     LOG.debug("end series chunk:{},totalvalue:{}", currentChunkMetaData, totalValueCount);
     currentChunkMetaData = null;
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
index 911c3c0..b83e0d9 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
@@ -59,7 +59,7 @@ public class TsFileIOWriterTest {
     writer.startChunkGroup(deviceId);
     writer.startFlushChunk(measurementSchema, measurementSchema.getCompressor(),
         measurementSchema.getType(), measurementSchema.getEncodingType(), statistics, 0, 0, 0, 0);
-    writer.endChunk(0);
+    writer.endChunk(0,0);
     writer.endChunkGroup(0);
 
     // end file