You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ea...@apache.org on 2019/02/27 06:21:31 UTC

[incubator-iotdb] branch east_fixbug updated: update read logic of TsFileSequenceReader

This is an automated email from the ASF dual-hosted git repository.

east pushed a commit to branch east_fixbug
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/east_fixbug by this push:
     new edebd4c  update read logic of TsFileSequenceReader
edebd4c is described below

commit edebd4cbf5ee8fb9ea0303680ee56e6e0ba5c1d9
Author: mdf369 <95...@qq.com>
AuthorDate: Wed Feb 27 14:21:12 2019 +0800

    update read logic of TsFileSequenceReader
---
 .../iotdb/tsfile/file/header/PageHeader.java       |  50 +++++
 .../file/metadata/statistics/BinaryStatistics.java |  10 +
 .../file/metadata/statistics/Statistics.java       | 132 +++++++++++++
 .../iotdb/tsfile/read/TsFileSequenceReader.java    |  14 +-
 .../file/metadata/ChunkGroupMetaDataTest.java      |  15 +-
 .../org/apache/iotdb/tsfile/read/ReadBugTest.java  | 208 ---------------------
 6 files changed, 208 insertions(+), 221 deletions(-)

diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/PageHeader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/PageHeader.java
index aa38ecf..192d8ca 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/PageHeader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/PageHeader.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.NoStatistics;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
@@ -60,6 +61,10 @@ public class PageHeader {
     return 3 * Integer.BYTES + 2 * Long.BYTES + Statistics.getStatsByType(type).getSerializedSize();
   }
 
+  public static int calculatePageHeaderSizeWithoutStatistics() {
+    return 3 * Integer.BYTES + 2 * Long.BYTES;
+  }
+
   public static PageHeader deserializeFrom(InputStream inputStream, TSDataType dataType)
       throws IOException {
     int uncompressedSize = ReadWriteIOUtils.readInt(inputStream);
@@ -84,6 +89,51 @@ public class PageHeader {
         minTimestamp);
   }
 
+  /**
+   * deserialize from FileChannel.
+   *
+   * @param channel FileChannel
+   * @param offset offset
+   * @param markerRead read marker (boolean type)
+   * @param dataType data type
+   * @return CHUNK_HEADER object
+   * @throws IOException IOException
+   */
+  public static PageHeader deserializeFrom(FileChannel channel, long offset, boolean markerRead, TSDataType dataType)
+      throws IOException {
+    long offsetVar = offset;
+    if (!markerRead) {
+      offsetVar++;
+    }
+
+    if (dataType == TSDataType.TEXT) {
+      int sizeWithoutStatistics = calculatePageHeaderSizeWithoutStatistics();
+      ByteBuffer bufferWithoutStatistics = ByteBuffer.allocate(sizeWithoutStatistics);
+      ReadWriteIOUtils.readAsPossible(channel, offsetVar, bufferWithoutStatistics);
+      bufferWithoutStatistics.flip();
+      offsetVar += sizeWithoutStatistics;
+
+      Statistics statistics = Statistics.deserialize(channel, offsetVar, dataType);
+      return deserializePartFrom(statistics, bufferWithoutStatistics);
+    } else {
+      int size = calculatePageHeaderSize(dataType);
+      ByteBuffer buffer = ByteBuffer.allocate(size);
+      ReadWriteIOUtils.readAsPossible(channel, offsetVar, buffer);
+      buffer.flip();
+      return deserializeFrom(buffer, dataType);
+    }
+  }
+
+  private static PageHeader deserializePartFrom(Statistics statistics, ByteBuffer buffer) {
+    int uncompressedSize = ReadWriteIOUtils.readInt(buffer);
+    int compressedSize = ReadWriteIOUtils.readInt(buffer);
+    int numOfValues = ReadWriteIOUtils.readInt(buffer);
+    long maxTimestamp = ReadWriteIOUtils.readLong(buffer);
+    long minTimestamp = ReadWriteIOUtils.readLong(buffer);
+    return new PageHeader(uncompressedSize, compressedSize, numOfValues, statistics, maxTimestamp,
+        minTimestamp);
+  }
+
   public int calculatePageHeaderSize() {
     return 3 * Integer.BYTES + 2 * Long.BYTES + statistics.getSerializedSize();
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BinaryStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BinaryStatistics.java
index b4b20db..51ea983 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BinaryStatistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BinaryStatistics.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.tsfile.file.metadata.statistics;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BytesUtils;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -205,4 +206,13 @@ public class BinaryStatistics extends Statistics<Binary> {
         ReadWriteIOUtils.readByteBufferWithSelfDescriptionLength(byteBuffer).array());
     this.sum = ReadWriteIOUtils.readDouble(byteBuffer);
   }
+
+  @Override
+  protected void fill(FileChannel channel, long offset) throws IOException {
+    int size = getSerializedSize();
+    ByteBuffer buffer = ByteBuffer.allocate(size);
+    ReadWriteIOUtils.readAsPossible(channel, offset, buffer);
+    buffer.flip();
+    fill(buffer);
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
index 052222f..dc73637 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
@@ -23,6 +23,11 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
 import org.apache.iotdb.tsfile.exception.write.UnknownColumnTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
@@ -130,6 +135,34 @@ public abstract class Statistics<T> {
     return statistics;
   }
 
+  public static Statistics deserialize(FileChannel channel, long offset, TSDataType dataType) throws IOException {
+    Statistics statistics = null;
+    switch (dataType) {
+      case INT32:
+        statistics = new IntegerStatistics();
+        break;
+      case INT64:
+        statistics = new LongStatistics();
+        break;
+      case TEXT:
+        statistics = new BinaryStatistics();
+        break;
+      case BOOLEAN:
+        statistics = new BooleanStatistics();
+        break;
+      case DOUBLE:
+        statistics = new DoubleStatistics();
+        break;
+      case FLOAT:
+        statistics = new FloatStatistics();
+        break;
+      default:
+        throw new UnknownColumnTypeException(dataType.toString());
+    }
+    statistics.fill(channel, offset);
+    return statistics;
+  }
+
   public abstract void setMinMaxFromBytes(byte[] minBytes, byte[] maxBytes);
 
   public abstract T getMin();
@@ -249,6 +282,14 @@ public abstract class Statistics<T> {
 
   abstract void fill(ByteBuffer byteBuffer) throws IOException;
 
+  protected void fill(FileChannel channel, long offset) throws IOException {
+    int size = getSerializedSize();
+    ByteBuffer buffer = ByteBuffer.allocate(size);
+    ReadWriteIOUtils.readAsPossible(channel, offset, buffer);
+    buffer.flip();
+    fill(buffer);
+  }
+
   public int getSerializedSize() {
     if (sizeOfDatum() == 0) {
       return 0;
@@ -295,4 +336,95 @@ public abstract class Statistics<T> {
     return length;
   }
 
+  public static void main(String[] args) throws IOException {
+    Statistics statistics = new BinaryStatistics();
+    statistics.fill(new FileChannel() {
+      @Override
+      public int read(ByteBuffer dst) throws IOException {
+        return 0;
+      }
+
+      @Override
+      public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
+        return 0;
+      }
+
+      @Override
+      public int write(ByteBuffer src) throws IOException {
+        return 0;
+      }
+
+      @Override
+      public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
+        return 0;
+      }
+
+      @Override
+      public long position() throws IOException {
+        return 0;
+      }
+
+      @Override
+      public FileChannel position(long newPosition) throws IOException {
+        return null;
+      }
+
+      @Override
+      public long size() throws IOException {
+        return 0;
+      }
+
+      @Override
+      public FileChannel truncate(long size) throws IOException {
+        return null;
+      }
+
+      @Override
+      public void force(boolean metaData) throws IOException {
+
+      }
+
+      @Override
+      public long transferTo(long position, long count, WritableByteChannel target)
+          throws IOException {
+        return 0;
+      }
+
+      @Override
+      public long transferFrom(ReadableByteChannel src, long position, long count)
+          throws IOException {
+        return 0;
+      }
+
+      @Override
+      public int read(ByteBuffer dst, long position) throws IOException {
+        return 0;
+      }
+
+      @Override
+      public int write(ByteBuffer src, long position) throws IOException {
+        return 0;
+      }
+
+      @Override
+      public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException {
+        return null;
+      }
+
+      @Override
+      public FileLock lock(long position, long size, boolean shared) throws IOException {
+        return null;
+      }
+
+      @Override
+      public FileLock tryLock(long position, long size, boolean shared) throws IOException {
+        return null;
+      }
+
+      @Override
+      protected void implCloseChannel() throws IOException {
+
+      }
+    }, 0);
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 5afaa98..fa673ba 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -246,19 +246,21 @@ public class TsFileSequenceReader {
   /**
    * not thread safe.
    *
-   * @param type -given tsfile data type
+   * @param type given tsfile data type
    */
   public PageHeader readPageHeader(TSDataType type) throws IOException {
-    return readPageHeader(type, -1);
+    return PageHeader.deserializeFrom(tsFileInput.wrapAsInputStream(), type);
   }
 
   /**
-   * notice, this function will modify channel's position.
+   * read the page's header.
    *
-   * @param position the file offset of this page header's header
+   * @param position the file offset of this chunk's header
+   * @param markerRead true if the offset does not contains the marker , otherwise false
+   * @param dataType given tsfile data type
    */
-  public PageHeader readPageHeader(TSDataType type, long position) throws IOException {
-    return PageHeader.deserializeFrom(tsFileInput.wrapAsInputStream(), type);
+  private PageHeader readPageHeader(long position, boolean markerRead, TSDataType dataType) throws IOException {
+    return PageHeader.deserializeFrom(tsFileInput.wrapAsFileChannel(), position, markerRead, dataType);
   }
 
   public long position() throws IOException {
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/ChunkGroupMetaDataTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/ChunkGroupMetaDataTest.java
index e8b43e9..e466d6e 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/ChunkGroupMetaDataTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/ChunkGroupMetaDataTest.java
@@ -50,8 +50,10 @@ import org.json.JSONArray;
 import org.json.JSONObject;
 import org.json.JSONTokener;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -60,19 +62,18 @@ import static org.junit.Assert.fail;
 public class ChunkGroupMetaDataTest {
 
   public static final String DELTA_OBJECT_UID = "delta-3312";
-  final String PATH = "target/outputChunkGroup.tsfile";
-  private String testDataFile;
+  final static String PATH = "target/outputChunkGroup.tsfile";
+  private static String testDataFile;
 
-  @Before
-  public void setUp() throws WriteProcessException, IOException, InterruptedException {
+  @BeforeClass
+  public static void setUp() throws WriteProcessException, IOException, InterruptedException {
     testDataFile = TsFileGeneratorForTest.outputDataFile;
-    TSFileDescriptor.getInstance().getConfig().timeSeriesEncoder = "TS_2DIFF";
 
     TsFileGeneratorForTest.generateFile(1000, 16 * 1024 * 1024, 10000);
   }
 
-  @After
-  public void tearDown() {
+  @AfterClass
+  public static void tearDown() {
     File file = new File(PATH);
     if (file.exists()) {
       file.delete();
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/ReadBugTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/ReadBugTest.java
deleted file mode 100644
index aa9cc2f..0000000
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/ReadBugTest.java
+++ /dev/null
@@ -1,208 +0,0 @@
-package org.apache.iotdb.tsfile.read;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Scanner;
-import java.util.stream.Collectors;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
-import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
-import org.apache.iotdb.tsfile.file.MetaMarker;
-import org.apache.iotdb.tsfile.file.header.ChunkHeader;
-import org.apache.iotdb.tsfile.file.header.PageHeader;
-import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
-import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
-import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
-import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.read.expression.QueryExpression;
-import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
-import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.utils.RecordUtils;
-import org.apache.iotdb.tsfile.write.TsFileWriter;
-import org.apache.iotdb.tsfile.write.record.TSRecord;
-import org.apache.iotdb.tsfile.write.schema.FileSchema;
-import org.apache.iotdb.tsfile.write.schema.SchemaBuilder;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ReadBugTest {
-  public static final String DELTA_OBJECT_UID = "delta-3312";
-  private String outputDataFile;
-  private String inputDataFile;
-  private static ReadOnlyTsFile roTsFile = null;
-
-  private static int rowCount;
-  private static int chunkGroupSize;
-  private static int pageSize;
-  public final long START_TIMESTAMP = 1480562618000L;
-
-  @Before
-  public void setUp() throws WriteProcessException, IOException, InterruptedException {
-    outputDataFile = "src/test/resources/testTsFile.tsfile";
-    TSFileDescriptor.getInstance().getConfig().timeSeriesEncoder = "TS_2DIFF";
-    inputDataFile = "src/test/resources/perTestInputData";
-
-    rowCount = 1000;
-    chunkGroupSize = 16 * 1024 * 1024;
-    pageSize = 10000;
-    generateSampleInputDataFile();
-    writeToFile();
-
-    TsFileSequenceReader reader = new TsFileSequenceReader(outputDataFile);
-    roTsFile = new ReadOnlyTsFile(reader);
-  }
-
-  private void generateSampleInputDataFile() throws IOException {
-    File file = new File(inputDataFile);
-    if (file.exists()) {
-      file.delete();
-    }
-    file.getParentFile().mkdirs();
-    FileWriter fw = new FileWriter(file);
-
-    long startTime = START_TIMESTAMP;
-    for (int i = 0; i < rowCount; i++) {
-      // write d1
-      String d1 = "d1," + (startTime + i);
-      if (i % 8 == 0) {
-        d1 += ",s4," + "dog" + i;
-      }
-      fw.write(d1 + "\r\n");
-    }
-    fw.close();
-  }
-
-  private void writeToFile() throws IOException, WriteProcessException {
-    File file = new File(outputDataFile);
-    if (file.exists()) {
-      file.delete();
-    }
-
-    FileSchema schema = generateTestSchema();
-
-    TSFileDescriptor.getInstance().getConfig().groupSizeInByte = chunkGroupSize;
-    TSFileDescriptor.getInstance().getConfig().maxNumberOfPointsInPage = pageSize;
-    TsFileWriter innerWriter = new TsFileWriter(file, schema, TSFileDescriptor.getInstance().getConfig());
-    Scanner in = getDataFile(inputDataFile);
-    assert in != null;
-    while (in.hasNextLine()) {
-      String str = in.nextLine();
-      TSRecord record = RecordUtils.parseSimpleTupleRecord(str, schema);
-      innerWriter.write(record);
-    }
-    innerWriter.close();
-    in.close();
-  }
-
-  private Scanner getDataFile(String path) {
-    File file = new File(path);
-    try {
-      Scanner in = new Scanner(file);
-      return in;
-    } catch (FileNotFoundException e) {
-      e.printStackTrace();
-      return null;
-    }
-  }
-
-  private FileSchema generateTestSchema() {
-    SchemaBuilder schemaBuilder = new SchemaBuilder();
-//    schemaBuilder.addSeries("s1", TSDataType.INT32, TSEncoding.RLE);
-//    schemaBuilder.addSeries("s2", TSDataType.INT64, TSEncoding.PLAIN);
-//    schemaBuilder.addSeries("s3", TSDataType.INT64, TSEncoding.TS_2DIFF);
-    schemaBuilder.addSeries("s4", TSDataType.TEXT, TSEncoding.PLAIN, CompressionType.UNCOMPRESSED,
-        Collections.singletonMap(Encoder.MAX_STRING_LENGTH, "20"));
-//    schemaBuilder.addSeries("s5", TSDataType.BOOLEAN, TSEncoding.RLE);
-//    schemaBuilder.addSeries("s6", TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY,
-//        Collections.singletonMap(Encoder.MAX_POINT_NUMBER, "5"));
-//    schemaBuilder.addSeries("s7", TSDataType.DOUBLE, TSEncoding.GORILLA);
-    return schemaBuilder.build();
-  }
-
-  @After
-  public void tearDown() throws IOException {
-    if (roTsFile != null) {
-      roTsFile.close();
-    }
-
-    File file = new File(inputDataFile);
-    if (file.exists()) {
-      file.delete();
-    }
-    file = new File(outputDataFile);
-    if (file.exists()) {
-      file.delete();
-    }
-  }
-
-  @Test
-  public void testRead() throws IOException {
-    List<Path> pathList = new ArrayList<>();
-    pathList.add(new Path("d1.s4"));
-    QueryExpression queryExpression = QueryExpression.create(pathList, null);
-    QueryDataSet dataSet = roTsFile.query(queryExpression);
-
-    int count = 0;
-    while (dataSet.hasNext()) {
-      RowRecord r = dataSet.next();
-      System.out.println(count);
-      System.out.println(r);
-      count++;
-    }
-  }
-
-  @Test
-  public void fixBug() throws IOException {
-    TsFileSequenceReader reader = new TsFileSequenceReader(outputDataFile);
-    TsFileMetaData metaData = reader.readFileMetadata();
-    List<Pair<Long, Long>> offsetList = new ArrayList<>();
-    long startOffset = reader.position();
-    byte marker;
-    while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
-      switch (marker) {
-        case MetaMarker.CHUNK_HEADER:
-          ChunkHeader header = reader.readChunkHeader();
-          for (int j = 0; j < header.getNumOfPages(); j++) {
-            PageHeader pageHeader = reader.readPageHeader(header.getDataType());
-            reader.readPage(pageHeader, header.getCompressionType());
-          }
-          break;
-        case MetaMarker.CHUNK_GROUP_FOOTER:
-          reader.readChunkGroupFooter();
-          long endOffset = reader.position();
-          offsetList.add(new Pair<>(startOffset, endOffset));
-          startOffset = endOffset;
-          break;
-        default:
-          MetaMarker.handleUnexpectedMarker(marker);
-      }
-    }
-    int offsetListIndex = 0;
-    List<TsDeviceMetadataIndex> deviceMetadataIndexList = metaData.getDeviceMap().values().stream()
-        .sorted((x, y) -> (int) (x.getOffset() - y.getOffset())).collect(Collectors.toList());
-    for (TsDeviceMetadataIndex index : deviceMetadataIndexList) {
-      TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(index);
-      List<ChunkGroupMetaData> chunkGroupMetaDataList = deviceMetadata.getChunkGroupMetaDataList();
-      for (ChunkGroupMetaData chunkGroupMetaData : chunkGroupMetaDataList) {
-        Pair<Long, Long> pair = offsetList.get(offsetListIndex++);
-        Assert.assertEquals(chunkGroupMetaData.getStartOffsetOfChunkGroup(), (long) pair.left);
-        Assert.assertEquals(chunkGroupMetaData.getEndOffsetOfChunkGroup(), (long) pair.right);
-      }
-    }
-    reader.close();
-  }
-}