You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2019/10/29 13:08:48 UTC

[incubator-iotdb] branch rel/0.8_sy_test created (now 2fba6fa)

This is an automated email from the ASF dual-hosted git repository.

suyue pushed a change to branch rel/0.8_sy_test
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at 2fba6fa  read chunk data before chunk header

This branch includes the following new commits:

     new 2fba6fa  read chunk data before chunk header

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: read chunk data before chunk header

Posted by su...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

suyue pushed a commit to branch rel/0.8_sy_test
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 2fba6fa1b572b9173d389a46873408353bfb4c66
Author: suyue <23...@qq.com>
AuthorDate: Tue Oct 29 21:05:00 2019 +0800

    read chunk data before chunk header
---
 .../iotdb/tsfile/file/header/ChunkHeader.java      | 42 ++++++++++++++--------
 .../iotdb/tsfile/file/metadata/ChunkMetaData.java  | 19 ++++++++++
 .../iotdb/tsfile/read/TsFileSequenceReader.java    | 30 ++++++++++------
 .../iotdb/tsfile/write/chunk/ChunkBuffer.java      |  2 +-
 .../iotdb/tsfile/write/writer/TsFileIOWriter.java  |  3 +-
 .../iotdb/tsfile/write/TsFileIOWriterTest.java     |  2 +-
 6 files changed, 70 insertions(+), 28 deletions(-)

diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
index 0dc5ef0..232ec5f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
@@ -59,19 +59,30 @@ public class ChunkHeader {
   public ChunkHeader(String measurementID, int dataSize, TSDataType dataType,
       CompressionType compressionType,
       TSEncoding encoding, int numOfPages) {
-    this(measurementID, dataSize, dataType, compressionType, encoding, numOfPages, 0);
+    this(measurementID, dataSize, getSerializedSize(measurementID), dataType, compressionType,
+        encoding, numOfPages, 0);
+  }
+
+  public ChunkHeader(String measurementID, int dataSize, int headerSize, TSDataType dataType,
+      CompressionType compressionType, TSEncoding encoding, int numOfPages) {
+    this(measurementID, dataSize, headerSize, dataType, compressionType, encoding, numOfPages, 0);
   }
 
   private ChunkHeader(String measurementID, int dataSize, TSDataType dataType,
-      CompressionType compressionType,
-      TSEncoding encoding, int numOfPages, long maxTombstoneTime) {
+      CompressionType compressionType, TSEncoding encoding, int numOfPages, long maxTombstoneTime) {
+    this(measurementID, dataSize, getSerializedSize(measurementID), dataType, compressionType,
+        encoding, numOfPages, maxTombstoneTime);
+  }
+
+  private ChunkHeader(String measurementID, int dataSize, int headerSize, TSDataType dataType,
+      CompressionType compressionType, TSEncoding encoding, int numOfPages, long maxTombstoneTime) {
     this.measurementID = measurementID;
     this.dataSize = dataSize;
     this.dataType = dataType;
     this.compressionType = compressionType;
     this.numOfPages = numOfPages;
     this.encodingType = encoding;
-    this.serializedSize = getSerializedSize(measurementID);
+    this.serializedSize = headerSize;
     this.maxTombstoneTime = maxTombstoneTime;
   }
 
@@ -132,7 +143,8 @@ public class ChunkHeader {
     }
 
     String measurementID = ReadWriteIOUtils.readString(byteBuffer);
-    return deserializePartFrom(measurementID, byteBuffer);
+    return deserializePartFrom(measurementID, ChunkHeader.getSerializedSize(measurementID),
+        byteBuffer);
   }
 
   /**
@@ -140,39 +152,39 @@ public class ChunkHeader {
    *
    * @param input TsFileInput
    * @param offset offset
+   * @param chunkHeaderSize the size of chunk's header
    * @param markerRead read marker (boolean type)
    * @return CHUNK_HEADER object
    * @throws IOException IOException
    */
-  public static ChunkHeader deserializeFrom(TsFileInput input, long offset, boolean markerRead)
+  public static ChunkHeader deserializeFrom(TsFileInput input, long offset, int chunkHeaderSize,
+      boolean markerRead)
       throws IOException {
     long offsetVar = offset;
     if (!markerRead) {
       offsetVar++;
     }
-    ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES);
+    ByteBuffer buffer = ByteBuffer.allocate(chunkHeaderSize);
     input.read(buffer, offsetVar);
     buffer.flip();
+
     int size = buffer.getInt();
-    offsetVar += Integer.BYTES;
-    buffer = ByteBuffer.allocate(getSerializedSize(size));
-    ReadWriteIOUtils.readAsPossible(input, offsetVar, buffer);
-    buffer.flip();
     String measurementID = ReadWriteIOUtils.readStringWithoutLength(buffer, size);
-    return deserializePartFrom(measurementID, buffer);
+    return deserializePartFrom(measurementID, chunkHeaderSize, buffer);
   }
 
-  private static ChunkHeader deserializePartFrom(String measurementID, ByteBuffer buffer) throws UnsupportedEncodingException {
+  private static ChunkHeader deserializePartFrom(String measurementID, int chunkHeaderSize,
+      ByteBuffer buffer) {
     int dataSize = ReadWriteIOUtils.readInt(buffer);
     TSDataType dataType = TSDataType.deserialize(ReadWriteIOUtils.readShort(buffer));
     int numOfPages = ReadWriteIOUtils.readInt(buffer);
     CompressionType type = ReadWriteIOUtils.readCompressionType(buffer);
     TSEncoding encoding = ReadWriteIOUtils.readEncoding(buffer);
     long maxTombstoneTime = ReadWriteIOUtils.readLong(buffer);
-    return new ChunkHeader(measurementID, dataSize, dataType, type, encoding, numOfPages,
+    return new ChunkHeader(measurementID, dataSize, chunkHeaderSize, dataType, type, encoding,
+        numOfPages,
         maxTombstoneTime);
   }
-
   public int getSerializedSize() {
     return serializedSize;
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetaData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetaData.java
index c63152e..9c04144 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetaData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetaData.java
@@ -68,6 +68,11 @@ public class ChunkMetaData {
 
   private TsDigest valuesStatistics;
 
+  /**
+   * chunk data size.
+   */
+  private long dataSize;
+
   private ChunkMetaData() {
   }
 
@@ -108,6 +113,7 @@ public class ChunkMetaData {
     chunkMetaData.endTime = ReadWriteIOUtils.readLong(inputStream);
 
     chunkMetaData.tsDataType = ReadWriteIOUtils.readDataType(inputStream);
+    chunkMetaData.dataSize = ReadWriteIOUtils.readLong(inputStream);
 
     chunkMetaData.valuesStatistics = TsDigest.deserializeFrom(inputStream);
 
@@ -129,6 +135,7 @@ public class ChunkMetaData {
     chunkMetaData.startTime = ReadWriteIOUtils.readLong(buffer);
     chunkMetaData.endTime = ReadWriteIOUtils.readLong(buffer);
     chunkMetaData.tsDataType = ReadWriteIOUtils.readDataType(buffer);
+    chunkMetaData.dataSize = ReadWriteIOUtils.readLong(buffer);
 
     chunkMetaData.valuesStatistics = TsDigest.deserializeFrom(buffer);
 
@@ -144,6 +151,7 @@ public class ChunkMetaData {
     int serializedSize = (Integer.BYTES  +
             4 * Long.BYTES + // 4 long: offsetOfChunkHeader, numOfPoints, startTime, endTime
             TSDataType.getSerializedSize() + // TSDataType
+         Long.BYTES + // data size
             (valuesStatistics == null ? TsDigest.getNullDigestSize()
                     : valuesStatistics.getSerializedSize()));
     try {
@@ -231,6 +239,7 @@ public class ChunkMetaData {
     byteLen += ReadWriteIOUtils.write(startTime, outputStream);
     byteLen += ReadWriteIOUtils.write(endTime, outputStream);
     byteLen += ReadWriteIOUtils.write(tsDataType, outputStream);
+    byteLen += ReadWriteIOUtils.write(dataSize, outputStream);
 
     if (valuesStatistics == null) {
       byteLen += TsDigest.serializeNullTo(outputStream);
@@ -255,6 +264,7 @@ public class ChunkMetaData {
     byteLen += ReadWriteIOUtils.write(startTime, buffer);
     byteLen += ReadWriteIOUtils.write(endTime, buffer);
     byteLen += ReadWriteIOUtils.write(tsDataType, buffer);
+    byteLen += ReadWriteIOUtils.write(dataSize, buffer);
 
     if (valuesStatistics == null) {
       byteLen += TsDigest.serializeNullTo(buffer);
@@ -280,6 +290,14 @@ public class ChunkMetaData {
     this.deletedAt = deletedAt;
   }
 
+  public long getDataSize() {
+    return dataSize;
+  }
+
+  public void setDataSize(long dataSize) {
+    this.dataSize = dataSize;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
@@ -297,6 +315,7 @@ public class ChunkMetaData {
         deletedAt == that.deletedAt &&
         Objects.equals(measurementUid, that.measurementUid) &&
         tsDataType == that.tsDataType &&
+        dataSize == that.dataSize &&
         Objects.equals(valuesStatistics, that.valuesStatistics);
   }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index eec73e0..b5a0fd5 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -291,8 +291,9 @@ public class TsFileSequenceReader implements AutoCloseable {
    * @param position the file offset of this chunk's header
    * @param markerRead true if the offset does not contains the marker , otherwise false
    */
-  private ChunkHeader readChunkHeader(long position, boolean markerRead) throws IOException {
-    return ChunkHeader.deserializeFrom(tsFileInput, position, markerRead);
+  private ChunkHeader readChunkHeader(long position, int chunkHeaderSize, boolean markerRead)
+      throws IOException {
+    return ChunkHeader.deserializeFrom(tsFileInput, position, chunkHeaderSize, markerRead);
   }
 
   /**
@@ -333,9 +334,13 @@ public class TsFileSequenceReader implements AutoCloseable {
    * @return -chunk
    */
   public Chunk readMemChunk(ChunkMetaData metaData) throws IOException {
-    ChunkHeader header = readChunkHeader(metaData.getOffsetOfChunkHeader(), false);
-    ByteBuffer buffer = readChunk(metaData.getOffsetOfChunkHeader() + header.getSerializedSize(),
-        header.getDataSize());
+    int chunkHeadSize = ChunkHeader.getSerializedSize(metaData.getMeasurementUid());
+    long chunkHeaderPos = metaData.getOffsetOfChunkHeader();
+    long chunkDataPos = metaData.getOffsetOfChunkHeader() + chunkHeadSize;
+    int chunkDataSize = (int) metaData.getDataSize();
+
+    ByteBuffer buffer = readChunk(chunkDataPos, chunkDataSize);
+    ChunkHeader header = readChunkHeader(chunkHeaderPos, chunkHeadSize, false);
     return new Chunk(header, buffer);
   }
 
@@ -569,11 +574,16 @@ public class TsFileSequenceReader implements AutoCloseable {
                 startTimeOfChunk, endTimeOfChunk);
             currentChunk.setNumOfPoints(numOfPoints);
             Map<String, ByteBuffer> statisticsMap = new HashMap<>();
-            statisticsMap.put(StatisticConstant.MAX_VALUE, ByteBuffer.wrap(chunkStatistics.getMaxBytes()));
-            statisticsMap.put(StatisticConstant.MIN_VALUE, ByteBuffer.wrap(chunkStatistics.getMinBytes()));
-            statisticsMap.put(StatisticConstant.FIRST, ByteBuffer.wrap(chunkStatistics.getFirstBytes()));
-            statisticsMap.put(StatisticConstant.SUM, ByteBuffer.wrap(chunkStatistics.getSumBytes()));
-            statisticsMap.put(StatisticConstant.LAST, ByteBuffer.wrap(chunkStatistics.getLastBytes()));
+            statisticsMap
+                .put(StatisticConstant.MAX_VALUE, ByteBuffer.wrap(chunkStatistics.getMaxBytes()));
+            statisticsMap
+                .put(StatisticConstant.MIN_VALUE, ByteBuffer.wrap(chunkStatistics.getMinBytes()));
+            statisticsMap
+                .put(StatisticConstant.FIRST, ByteBuffer.wrap(chunkStatistics.getFirstBytes()));
+            statisticsMap
+                .put(StatisticConstant.SUM, ByteBuffer.wrap(chunkStatistics.getSumBytes()));
+            statisticsMap
+                .put(StatisticConstant.LAST, ByteBuffer.wrap(chunkStatistics.getLastBytes()));
             TsDigest tsDigest = new TsDigest();
             tsDigest.setStatistics(statisticsMap);
             currentChunk.setDigest(tsDigest);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkBuffer.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkBuffer.java
index 290c516..e4120a0 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkBuffer.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkBuffer.java
@@ -195,7 +195,7 @@ public class ChunkBuffer {
     long size = writer.getPos() - totalByteSize;
     assert size == pageBuffer.size();
 
-    writer.endChunk(totalValueCount);
+    writer.endChunk(totalValueCount, size);
     return headerSize + size;
   }
 
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 206fce5..2c0c64a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -210,8 +210,9 @@ public class TsFileIOWriter {
    *
    * @param totalValueCount -set the number of points to the currentChunkMetaData
    */
-  public void endChunk(long totalValueCount) {
+  public void endChunk(long totalValueCount, long size) {
     currentChunkMetaData.setNumOfPoints(totalValueCount);
+    currentChunkMetaData.setDataSize(size);
     currentChunkGroupMetaData.addTimeSeriesChunkMetaData(currentChunkMetaData);
     LOG.debug("end series chunk:{},totalvalue:{}", currentChunkMetaData, totalValueCount);
     currentChunkMetaData = null;
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
index 911c3c0..b83e0d9 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
@@ -59,7 +59,7 @@ public class TsFileIOWriterTest {
     writer.startChunkGroup(deviceId);
     writer.startFlushChunk(measurementSchema, measurementSchema.getCompressor(),
         measurementSchema.getType(), measurementSchema.getEncodingType(), statistics, 0, 0, 0, 0);
-    writer.endChunk(0);
+    writer.endChunk(0,0);
     writer.endChunkGroup(0);
 
     // end file