You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ea...@apache.org on 2019/02/27 06:21:31 UTC
[incubator-iotdb] branch east_fixbug updated: update read logic of
TsFileSequenceReader
This is an automated email from the ASF dual-hosted git repository.
east pushed a commit to branch east_fixbug
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/east_fixbug by this push:
new edebd4c update read logic of TsFileSequenceReader
edebd4c is described below
commit edebd4cbf5ee8fb9ea0303680ee56e6e0ba5c1d9
Author: mdf369 <95...@qq.com>
AuthorDate: Wed Feb 27 14:21:12 2019 +0800
update read logic of TsFileSequenceReader
---
.../iotdb/tsfile/file/header/PageHeader.java | 50 +++++
.../file/metadata/statistics/BinaryStatistics.java | 10 +
.../file/metadata/statistics/Statistics.java | 132 +++++++++++++
.../iotdb/tsfile/read/TsFileSequenceReader.java | 14 +-
.../file/metadata/ChunkGroupMetaDataTest.java | 15 +-
.../org/apache/iotdb/tsfile/read/ReadBugTest.java | 208 ---------------------
6 files changed, 208 insertions(+), 221 deletions(-)
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/PageHeader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/PageHeader.java
index aa38ecf..192d8ca 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/PageHeader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/PageHeader.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.NoStatistics;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
@@ -60,6 +61,10 @@ public class PageHeader {
return 3 * Integer.BYTES + 2 * Long.BYTES + Statistics.getStatsByType(type).getSerializedSize();
}
+ public static int calculatePageHeaderSizeWithoutStatistics() {
+ return 3 * Integer.BYTES + 2 * Long.BYTES;
+ }
+
public static PageHeader deserializeFrom(InputStream inputStream, TSDataType dataType)
throws IOException {
int uncompressedSize = ReadWriteIOUtils.readInt(inputStream);
@@ -84,6 +89,51 @@ public class PageHeader {
minTimestamp);
}
+ /**
+ * deserialize from FileChannel.
+ *
+ * @param channel FileChannel
+ * @param offset offset
+ * @param markerRead read marker (boolean type)
+ * @param dataType data type
+ * @return CHUNK_HEADER object
+ * @throws IOException IOException
+ */
+ public static PageHeader deserializeFrom(FileChannel channel, long offset, boolean markerRead, TSDataType dataType)
+ throws IOException {
+ long offsetVar = offset;
+ if (!markerRead) {
+ offsetVar++;
+ }
+
+ if (dataType == TSDataType.TEXT) {
+ int sizeWithoutStatistics = calculatePageHeaderSizeWithoutStatistics();
+ ByteBuffer bufferWithoutStatistics = ByteBuffer.allocate(sizeWithoutStatistics);
+ ReadWriteIOUtils.readAsPossible(channel, offsetVar, bufferWithoutStatistics);
+ bufferWithoutStatistics.flip();
+ offsetVar += sizeWithoutStatistics;
+
+ Statistics statistics = Statistics.deserialize(channel, offsetVar, dataType);
+ return deserializePartFrom(statistics, bufferWithoutStatistics);
+ } else {
+ int size = calculatePageHeaderSize(dataType);
+ ByteBuffer buffer = ByteBuffer.allocate(size);
+ ReadWriteIOUtils.readAsPossible(channel, offsetVar, buffer);
+ buffer.flip();
+ return deserializeFrom(buffer, dataType);
+ }
+ }
+
+ private static PageHeader deserializePartFrom(Statistics statistics, ByteBuffer buffer) {
+ int uncompressedSize = ReadWriteIOUtils.readInt(buffer);
+ int compressedSize = ReadWriteIOUtils.readInt(buffer);
+ int numOfValues = ReadWriteIOUtils.readInt(buffer);
+ long maxTimestamp = ReadWriteIOUtils.readLong(buffer);
+ long minTimestamp = ReadWriteIOUtils.readLong(buffer);
+ return new PageHeader(uncompressedSize, compressedSize, numOfValues, statistics, maxTimestamp,
+ minTimestamp);
+ }
+
public int calculatePageHeaderSize() {
return 3 * Integer.BYTES + 2 * Long.BYTES + statistics.getSerializedSize();
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BinaryStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BinaryStatistics.java
index b4b20db..51ea983 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BinaryStatistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BinaryStatistics.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.tsfile.file.metadata.statistics;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BytesUtils;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -205,4 +206,13 @@ public class BinaryStatistics extends Statistics<Binary> {
ReadWriteIOUtils.readByteBufferWithSelfDescriptionLength(byteBuffer).array());
this.sum = ReadWriteIOUtils.readDouble(byteBuffer);
}
+
+ @Override
+ protected void fill(FileChannel channel, long offset) throws IOException {
+ int size = getSerializedSize();
+ ByteBuffer buffer = ByteBuffer.allocate(size);
+ ReadWriteIOUtils.readAsPossible(channel, offset, buffer);
+ buffer.flip();
+ fill(buffer);
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
index 052222f..dc73637 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
@@ -23,6 +23,11 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
import org.apache.iotdb.tsfile.exception.write.UnknownColumnTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
@@ -130,6 +135,34 @@ public abstract class Statistics<T> {
return statistics;
}
+ public static Statistics deserialize(FileChannel channel, long offset, TSDataType dataType) throws IOException {
+ Statistics statistics = null;
+ switch (dataType) {
+ case INT32:
+ statistics = new IntegerStatistics();
+ break;
+ case INT64:
+ statistics = new LongStatistics();
+ break;
+ case TEXT:
+ statistics = new BinaryStatistics();
+ break;
+ case BOOLEAN:
+ statistics = new BooleanStatistics();
+ break;
+ case DOUBLE:
+ statistics = new DoubleStatistics();
+ break;
+ case FLOAT:
+ statistics = new FloatStatistics();
+ break;
+ default:
+ throw new UnknownColumnTypeException(dataType.toString());
+ }
+ statistics.fill(channel, offset);
+ return statistics;
+ }
+
public abstract void setMinMaxFromBytes(byte[] minBytes, byte[] maxBytes);
public abstract T getMin();
@@ -249,6 +282,14 @@ public abstract class Statistics<T> {
abstract void fill(ByteBuffer byteBuffer) throws IOException;
+ protected void fill(FileChannel channel, long offset) throws IOException {
+ int size = getSerializedSize();
+ ByteBuffer buffer = ByteBuffer.allocate(size);
+ ReadWriteIOUtils.readAsPossible(channel, offset, buffer);
+ buffer.flip();
+ fill(buffer);
+ }
+
public int getSerializedSize() {
if (sizeOfDatum() == 0) {
return 0;
@@ -295,4 +336,95 @@ public abstract class Statistics<T> {
return length;
}
+ public static void main(String[] args) throws IOException {
+ Statistics statistics = new BinaryStatistics();
+ statistics.fill(new FileChannel() {
+ @Override
+ public int read(ByteBuffer dst) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public int write(ByteBuffer src) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public long position() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public FileChannel position(long newPosition) throws IOException {
+ return null;
+ }
+
+ @Override
+ public long size() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public FileChannel truncate(long size) throws IOException {
+ return null;
+ }
+
+ @Override
+ public void force(boolean metaData) throws IOException {
+
+ }
+
+ @Override
+ public long transferTo(long position, long count, WritableByteChannel target)
+ throws IOException {
+ return 0;
+ }
+
+ @Override
+ public long transferFrom(ReadableByteChannel src, long position, long count)
+ throws IOException {
+ return 0;
+ }
+
+ @Override
+ public int read(ByteBuffer dst, long position) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public int write(ByteBuffer src, long position) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException {
+ return null;
+ }
+
+ @Override
+ public FileLock lock(long position, long size, boolean shared) throws IOException {
+ return null;
+ }
+
+ @Override
+ public FileLock tryLock(long position, long size, boolean shared) throws IOException {
+ return null;
+ }
+
+ @Override
+ protected void implCloseChannel() throws IOException {
+
+ }
+ }, 0);
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 5afaa98..fa673ba 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -246,19 +246,21 @@ public class TsFileSequenceReader {
/**
* not thread safe.
*
- * @param type -given tsfile data type
+ * @param type given tsfile data type
*/
public PageHeader readPageHeader(TSDataType type) throws IOException {
- return readPageHeader(type, -1);
+ return PageHeader.deserializeFrom(tsFileInput.wrapAsInputStream(), type);
}
/**
- * notice, this function will modify channel's position.
+ * read the page's header.
*
- * @param position the file offset of this page header's header
+ * @param position the file offset of this chunk's header
+ * @param markerRead true if the offset does not contains the marker , otherwise false
+ * @param dataType given tsfile data type
*/
- public PageHeader readPageHeader(TSDataType type, long position) throws IOException {
- return PageHeader.deserializeFrom(tsFileInput.wrapAsInputStream(), type);
+ private PageHeader readPageHeader(long position, boolean markerRead, TSDataType dataType) throws IOException {
+ return PageHeader.deserializeFrom(tsFileInput.wrapAsFileChannel(), position, markerRead, dataType);
}
public long position() throws IOException {
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/ChunkGroupMetaDataTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/ChunkGroupMetaDataTest.java
index e8b43e9..e466d6e 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/ChunkGroupMetaDataTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/ChunkGroupMetaDataTest.java
@@ -50,8 +50,10 @@ import org.json.JSONArray;
import org.json.JSONObject;
import org.json.JSONTokener;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -60,19 +62,18 @@ import static org.junit.Assert.fail;
public class ChunkGroupMetaDataTest {
public static final String DELTA_OBJECT_UID = "delta-3312";
- final String PATH = "target/outputChunkGroup.tsfile";
- private String testDataFile;
+ final static String PATH = "target/outputChunkGroup.tsfile";
+ private static String testDataFile;
- @Before
- public void setUp() throws WriteProcessException, IOException, InterruptedException {
+ @BeforeClass
+ public static void setUp() throws WriteProcessException, IOException, InterruptedException {
testDataFile = TsFileGeneratorForTest.outputDataFile;
- TSFileDescriptor.getInstance().getConfig().timeSeriesEncoder = "TS_2DIFF";
TsFileGeneratorForTest.generateFile(1000, 16 * 1024 * 1024, 10000);
}
- @After
- public void tearDown() {
+ @AfterClass
+ public static void tearDown() {
File file = new File(PATH);
if (file.exists()) {
file.delete();
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/ReadBugTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/ReadBugTest.java
deleted file mode 100644
index aa9cc2f..0000000
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/ReadBugTest.java
+++ /dev/null
@@ -1,208 +0,0 @@
-package org.apache.iotdb.tsfile.read;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Scanner;
-import java.util.stream.Collectors;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
-import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
-import org.apache.iotdb.tsfile.file.MetaMarker;
-import org.apache.iotdb.tsfile.file.header.ChunkHeader;
-import org.apache.iotdb.tsfile.file.header.PageHeader;
-import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
-import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
-import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
-import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.read.expression.QueryExpression;
-import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
-import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.utils.RecordUtils;
-import org.apache.iotdb.tsfile.write.TsFileWriter;
-import org.apache.iotdb.tsfile.write.record.TSRecord;
-import org.apache.iotdb.tsfile.write.schema.FileSchema;
-import org.apache.iotdb.tsfile.write.schema.SchemaBuilder;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ReadBugTest {
- public static final String DELTA_OBJECT_UID = "delta-3312";
- private String outputDataFile;
- private String inputDataFile;
- private static ReadOnlyTsFile roTsFile = null;
-
- private static int rowCount;
- private static int chunkGroupSize;
- private static int pageSize;
- public final long START_TIMESTAMP = 1480562618000L;
-
- @Before
- public void setUp() throws WriteProcessException, IOException, InterruptedException {
- outputDataFile = "src/test/resources/testTsFile.tsfile";
- TSFileDescriptor.getInstance().getConfig().timeSeriesEncoder = "TS_2DIFF";
- inputDataFile = "src/test/resources/perTestInputData";
-
- rowCount = 1000;
- chunkGroupSize = 16 * 1024 * 1024;
- pageSize = 10000;
- generateSampleInputDataFile();
- writeToFile();
-
- TsFileSequenceReader reader = new TsFileSequenceReader(outputDataFile);
- roTsFile = new ReadOnlyTsFile(reader);
- }
-
- private void generateSampleInputDataFile() throws IOException {
- File file = new File(inputDataFile);
- if (file.exists()) {
- file.delete();
- }
- file.getParentFile().mkdirs();
- FileWriter fw = new FileWriter(file);
-
- long startTime = START_TIMESTAMP;
- for (int i = 0; i < rowCount; i++) {
- // write d1
- String d1 = "d1," + (startTime + i);
- if (i % 8 == 0) {
- d1 += ",s4," + "dog" + i;
- }
- fw.write(d1 + "\r\n");
- }
- fw.close();
- }
-
- private void writeToFile() throws IOException, WriteProcessException {
- File file = new File(outputDataFile);
- if (file.exists()) {
- file.delete();
- }
-
- FileSchema schema = generateTestSchema();
-
- TSFileDescriptor.getInstance().getConfig().groupSizeInByte = chunkGroupSize;
- TSFileDescriptor.getInstance().getConfig().maxNumberOfPointsInPage = pageSize;
- TsFileWriter innerWriter = new TsFileWriter(file, schema, TSFileDescriptor.getInstance().getConfig());
- Scanner in = getDataFile(inputDataFile);
- assert in != null;
- while (in.hasNextLine()) {
- String str = in.nextLine();
- TSRecord record = RecordUtils.parseSimpleTupleRecord(str, schema);
- innerWriter.write(record);
- }
- innerWriter.close();
- in.close();
- }
-
- private Scanner getDataFile(String path) {
- File file = new File(path);
- try {
- Scanner in = new Scanner(file);
- return in;
- } catch (FileNotFoundException e) {
- e.printStackTrace();
- return null;
- }
- }
-
- private FileSchema generateTestSchema() {
- SchemaBuilder schemaBuilder = new SchemaBuilder();
-// schemaBuilder.addSeries("s1", TSDataType.INT32, TSEncoding.RLE);
-// schemaBuilder.addSeries("s2", TSDataType.INT64, TSEncoding.PLAIN);
-// schemaBuilder.addSeries("s3", TSDataType.INT64, TSEncoding.TS_2DIFF);
- schemaBuilder.addSeries("s4", TSDataType.TEXT, TSEncoding.PLAIN, CompressionType.UNCOMPRESSED,
- Collections.singletonMap(Encoder.MAX_STRING_LENGTH, "20"));
-// schemaBuilder.addSeries("s5", TSDataType.BOOLEAN, TSEncoding.RLE);
-// schemaBuilder.addSeries("s6", TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY,
-// Collections.singletonMap(Encoder.MAX_POINT_NUMBER, "5"));
-// schemaBuilder.addSeries("s7", TSDataType.DOUBLE, TSEncoding.GORILLA);
- return schemaBuilder.build();
- }
-
- @After
- public void tearDown() throws IOException {
- if (roTsFile != null) {
- roTsFile.close();
- }
-
- File file = new File(inputDataFile);
- if (file.exists()) {
- file.delete();
- }
- file = new File(outputDataFile);
- if (file.exists()) {
- file.delete();
- }
- }
-
- @Test
- public void testRead() throws IOException {
- List<Path> pathList = new ArrayList<>();
- pathList.add(new Path("d1.s4"));
- QueryExpression queryExpression = QueryExpression.create(pathList, null);
- QueryDataSet dataSet = roTsFile.query(queryExpression);
-
- int count = 0;
- while (dataSet.hasNext()) {
- RowRecord r = dataSet.next();
- System.out.println(count);
- System.out.println(r);
- count++;
- }
- }
-
- @Test
- public void fixBug() throws IOException {
- TsFileSequenceReader reader = new TsFileSequenceReader(outputDataFile);
- TsFileMetaData metaData = reader.readFileMetadata();
- List<Pair<Long, Long>> offsetList = new ArrayList<>();
- long startOffset = reader.position();
- byte marker;
- while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
- switch (marker) {
- case MetaMarker.CHUNK_HEADER:
- ChunkHeader header = reader.readChunkHeader();
- for (int j = 0; j < header.getNumOfPages(); j++) {
- PageHeader pageHeader = reader.readPageHeader(header.getDataType());
- reader.readPage(pageHeader, header.getCompressionType());
- }
- break;
- case MetaMarker.CHUNK_GROUP_FOOTER:
- reader.readChunkGroupFooter();
- long endOffset = reader.position();
- offsetList.add(new Pair<>(startOffset, endOffset));
- startOffset = endOffset;
- break;
- default:
- MetaMarker.handleUnexpectedMarker(marker);
- }
- }
- int offsetListIndex = 0;
- List<TsDeviceMetadataIndex> deviceMetadataIndexList = metaData.getDeviceMap().values().stream()
- .sorted((x, y) -> (int) (x.getOffset() - y.getOffset())).collect(Collectors.toList());
- for (TsDeviceMetadataIndex index : deviceMetadataIndexList) {
- TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(index);
- List<ChunkGroupMetaData> chunkGroupMetaDataList = deviceMetadata.getChunkGroupMetaDataList();
- for (ChunkGroupMetaData chunkGroupMetaData : chunkGroupMetaDataList) {
- Pair<Long, Long> pair = offsetList.get(offsetListIndex++);
- Assert.assertEquals(chunkGroupMetaData.getStartOffsetOfChunkGroup(), (long) pair.left);
- Assert.assertEquals(chunkGroupMetaData.getEndOffsetOfChunkGroup(), (long) pair.right);
- }
- }
- reader.close();
- }
-}