You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2020/12/05 06:04:34 UTC
[iotdb] 01/01: compatible with v2 tsfile
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch UpgradeToolV3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9d0de83a001a72efe54040efd48aec9c3caf6de5
Author: HTHou <hh...@outlook.com>
AuthorDate: Sat Dec 5 14:03:34 2020 +0800
compatible with v2 tsfile
---
.../engine/storagegroup/StorageGroupProcessor.java | 2 +-
.../db/engine/storagegroup/TsFileResource.java | 13 +-
.../iotdb/db/query/control/FileReaderManager.java | 11 +-
.../org/apache/iotdb/db/utils/FileLoaderUtils.java | 3 +-
.../iotdb/tsfile/file/header/ChunkHeader.java | 2 +-
.../iotdb/tsfile/file/metadata/TsFileMetadata.java | 4 +
.../file/metadata/statistics/BinaryStatistics.java | 2 +
.../metadata/statistics/BooleanStatistics.java | 2 +
.../file/metadata/statistics/DoubleStatistics.java | 2 +
.../file/metadata/statistics/FloatStatistics.java | 2 +
.../metadata/statistics/IntegerStatistics.java | 2 +
.../file/metadata/statistics/LongStatistics.java | 2 +
.../file/metadata/statistics/Statistics.java | 4 +-
.../org/apache/iotdb/tsfile/read/common/Chunk.java | 9 +
.../tsfile/read/reader/chunk/ChunkReader.java | 64 +-
.../iotdb/tsfile/read/reader/page/PageReader.java | 14 +-
.../tsfile/v2/file/footer/ChunkGroupFooterV2.java | 84 ++
.../iotdb/tsfile/v2/file/header/ChunkHeaderV2.java | 107 ++
.../iotdb/tsfile/v2/file/header/PageHeaderV2.java | 48 +
.../tsfile/v2/file/metadata/ChunkMetadataV2.java | 50 +
.../v2/file/metadata/MetadataIndexEntryV2.java | 33 +
.../v2/file/metadata/MetadataIndexNodeV2.java | 43 +
.../v2/file/metadata/TimeseriesMetadataV2.java | 39 +
.../tsfile/v2/file/metadata/TsFileMetadataV2.java | 75 ++
.../v2/file/metadata/enums/CompressionTypeV2.java | 58 ++
.../v2/file/metadata/enums/TSDataTypeV2.java | 71 ++
.../v2/file/metadata/enums/TSEncodingV2.java | 60 ++
.../v2/file/metadata/statistics/StatisticsV2.java | 100 ++
.../tsfile/v2/read/TsFileSequenceReaderForV2.java | 1045 ++++++++++++++++++++
.../read/reader/page/PageReaderV2.java} | 120 +--
30 files changed, 1932 insertions(+), 139 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index f4605ea..cf2037d 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -535,7 +535,7 @@ public class StorageGroupProcessor {
// the process was interrupted before the merged files could be named
continueFailedRenames(partitionFolder, MERGE_SUFFIX);
- Collections.addAll(tsFiles,
+ Collections.addAll(upgradeFiles,
fsFactory.listFilesBySuffix(partitionFolder.getAbsolutePath(), TSFILE_SUFFIX));
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 5f338ae..d2d2c57 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -335,11 +335,18 @@ public class TsFileResource {
historicalVersions = Collections.singleton(version);
}
- maxPlanIndex = ReadWriteIOUtils.readLong(inputStream);
- minPlanIndex = ReadWriteIOUtils.readLong(inputStream);
+ if (inputStream.available() >= Long.BYTES * 2) {
+ maxPlanIndex = ReadWriteIOUtils.readLong(inputStream);
+ minPlanIndex = ReadWriteIOUtils.readLong(inputStream);
+ }
if (inputStream.available() > 0) {
- String modFileName = ReadWriteIOUtils.readString(inputStream);
+ String modFileName = null;
+ try {
+ modFileName = ReadWriteIOUtils.readString(inputStream);
+ } catch (IOException e) {
+ // ignore if the tail is corrupted
+ }
File modF = new File(file.getParentFile(), modFileName);
modFile = new ModificationFile(modF.getPath());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
index 4e99292..f2a1617 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.service.ServiceType;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.UnClosedTsFileReader;
+import org.apache.iotdb.tsfile.v2.read.TsFileSequenceReaderForV2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -166,11 +167,13 @@ public class FileReaderManager implements IService {
}
else {
tsFileReader = new TsFileSequenceReader(filePath);
- switch (tsFileReader.readVersionNumber()) {
- case TSFileConfig.VERSION_NUMBER:
- break;
- default:
+ if (tsFileReader.readVersionNumber() != TSFileConfig.VERSION_NUMBER) {
+ tsFileReader.close();
+ tsFileReader = new TsFileSequenceReaderForV2(filePath);
+ if (!((TsFileSequenceReaderForV2) tsFileReader).readVersionNumberV2()
+ .equals(TSFileConfig.VERSION_NUMBER_V2)) {
throw new IOException("The version of this TsFile is not corrent. ");
+ }
}
}
readerMap.put(filePath, tsFileReader);
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
index a42d80f..69b38b4 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -153,7 +153,8 @@ public class FileLoaderUtils {
chunkReader = new MemChunkReader(memChunkLoader.getChunk(), timeFilter);
} else {
Chunk chunk = chunkLoader.loadChunk(chunkMetaData);
- chunkReader = new ChunkReader(chunk, timeFilter, chunkMetaData.isFromOldTsFile());
+ chunk.setFromOldFile(chunkMetaData.isFromOldTsFile());
+ chunkReader = new ChunkReader(chunk, timeFilter);
chunkReader.hasNextSatisfiedPage();
}
return chunkReader.loadPageReaderList();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
index 082a4c7..a3841bb 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
@@ -65,7 +65,7 @@ public class ChunkHeader {
compressionType, encoding);
}
- private ChunkHeader(byte chunkType, String measurementID, int dataSize, int headerSize,
+ public ChunkHeader(byte chunkType, String measurementID, int dataSize, int headerSize,
TSDataType dataType, CompressionType compressionType, TSEncoding encoding) {
this.chunkType = chunkType;
this.measurementID = measurementID;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java
index 2831aa5..09a7286 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java
@@ -90,6 +90,10 @@ public class TsFileMetadata {
return bloomFilter;
}
+ public void setBloomFilter(BloomFilter bloomFilter) {
+ this.bloomFilter = bloomFilter;
+ }
+
/**
* use the given outputStream to serialize.
*
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BinaryStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BinaryStatistics.java
index 6934d17..a632da4 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BinaryStatistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BinaryStatistics.java
@@ -202,12 +202,14 @@ public class BinaryStatistics extends Statistics<Binary> {
}
@Override
+ public
void deserialize(InputStream inputStream) throws IOException {
this.firstValue = ReadWriteIOUtils.readBinary(inputStream);
this.lastValue = ReadWriteIOUtils.readBinary(inputStream);
}
@Override
+ public
void deserialize(ByteBuffer byteBuffer) {
this.firstValue = ReadWriteIOUtils.readBinary(byteBuffer);
this.lastValue = ReadWriteIOUtils.readBinary(byteBuffer);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BooleanStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BooleanStatistics.java
index 201052f..388292a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BooleanStatistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BooleanStatistics.java
@@ -205,6 +205,7 @@ public class BooleanStatistics extends Statistics<Boolean> {
}
@Override
+ public
void deserialize(InputStream inputStream) throws IOException {
this.firstValue = ReadWriteIOUtils.readBool(inputStream);
this.lastValue = ReadWriteIOUtils.readBool(inputStream);
@@ -212,6 +213,7 @@ public class BooleanStatistics extends Statistics<Boolean> {
}
@Override
+ public
void deserialize(ByteBuffer byteBuffer) {
this.firstValue = ReadWriteIOUtils.readBool(byteBuffer);
this.lastValue = ReadWriteIOUtils.readBool(byteBuffer);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/DoubleStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/DoubleStatistics.java
index aba11ba..3ea8ef6 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/DoubleStatistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/DoubleStatistics.java
@@ -227,6 +227,7 @@ public class DoubleStatistics extends Statistics<Double> {
}
@Override
+ public
void deserialize(InputStream inputStream) throws IOException {
this.minValue = ReadWriteIOUtils.readDouble(inputStream);
this.maxValue = ReadWriteIOUtils.readDouble(inputStream);
@@ -236,6 +237,7 @@ public class DoubleStatistics extends Statistics<Double> {
}
@Override
+ public
void deserialize(ByteBuffer byteBuffer) {
this.minValue = ReadWriteIOUtils.readDouble(byteBuffer);
this.maxValue = ReadWriteIOUtils.readDouble(byteBuffer);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/FloatStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/FloatStatistics.java
index 106ff32..848bc7d 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/FloatStatistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/FloatStatistics.java
@@ -222,6 +222,7 @@ public class FloatStatistics extends Statistics<Float> {
}
@Override
+ public
void deserialize(InputStream inputStream) throws IOException {
this.minValue = ReadWriteIOUtils.readFloat(inputStream);
this.maxValue = ReadWriteIOUtils.readFloat(inputStream);
@@ -231,6 +232,7 @@ public class FloatStatistics extends Statistics<Float> {
}
@Override
+ public
void deserialize(ByteBuffer byteBuffer) {
this.minValue = ReadWriteIOUtils.readFloat(byteBuffer);
this.maxValue = ReadWriteIOUtils.readFloat(byteBuffer);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/IntegerStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/IntegerStatistics.java
index dec73a1..38263ad 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/IntegerStatistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/IntegerStatistics.java
@@ -223,6 +223,7 @@ public class IntegerStatistics extends Statistics<Integer> {
}
@Override
+ public
void deserialize(InputStream inputStream) throws IOException {
this.minValue = ReadWriteIOUtils.readInt(inputStream);
this.maxValue = ReadWriteIOUtils.readInt(inputStream);
@@ -232,6 +233,7 @@ public class IntegerStatistics extends Statistics<Integer> {
}
@Override
+ public
void deserialize(ByteBuffer byteBuffer) {
this.minValue = ReadWriteIOUtils.readInt(byteBuffer);
this.maxValue = ReadWriteIOUtils.readInt(byteBuffer);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/LongStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/LongStatistics.java
index 60cc881..e86087b 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/LongStatistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/LongStatistics.java
@@ -231,6 +231,7 @@ public class LongStatistics extends Statistics<Long> {
}
@Override
+ public
void deserialize(InputStream inputStream) throws IOException {
this.minValue = ReadWriteIOUtils.readLong(inputStream);
this.maxValue = ReadWriteIOUtils.readLong(inputStream);
@@ -240,6 +241,7 @@ public class LongStatistics extends Statistics<Long> {
}
@Override
+ public
void deserialize(ByteBuffer byteBuffer) {
this.minValue = ReadWriteIOUtils.readLong(byteBuffer);
this.maxValue = ReadWriteIOUtils.readLong(byteBuffer);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
index acbc339..a62fca6 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
@@ -125,9 +125,9 @@ public abstract class Statistics<T> {
/**
* read data from the inputStream.
*/
- abstract void deserialize(InputStream inputStream) throws IOException;
+ public abstract void deserialize(InputStream inputStream) throws IOException;
- abstract void deserialize(ByteBuffer byteBuffer);
+ public abstract void deserialize(ByteBuffer byteBuffer);
public abstract void setMinMaxFromBytes(byte[] minBytes, byte[] maxBytes);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
index 58d15f8..84efd9d 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
@@ -36,6 +36,7 @@ public class Chunk implements Accountable {
private ChunkHeader chunkHeader;
private Statistics chunkStatistic;
private ByteBuffer chunkData;
+ private boolean isFromOldFile = false;
/**
* A list of deleted intervals.
*/
@@ -127,4 +128,12 @@ public class Chunk implements Accountable {
public Statistics getChunkStatistic() {
return chunkStatistic;
}
+
+ public boolean isFromOldFile() {
+ return isFromOldFile;
+ }
+
+ public void setFromOldFile(boolean isFromOldFile) {
+ this.isFromOldFile = isFromOldFile;
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java
index 27a2518..059d2e7 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java
@@ -39,6 +39,8 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.reader.IChunkReader;
import org.apache.iotdb.tsfile.read.reader.IPageReader;
import org.apache.iotdb.tsfile.read.reader.page.PageReader;
+import org.apache.iotdb.tsfile.v2.file.header.PageHeaderV2;
+import org.apache.iotdb.tsfile.v2.read.reader.page.PageReaderV2;
public class ChunkReader implements IChunkReader {
@@ -53,8 +55,6 @@ public class ChunkReader implements IChunkReader {
private List<IPageReader> pageReaderList = new LinkedList<>();
- private boolean isFromOldTsFile = false;
-
/**
* A list of deleted intervals.
*/
@@ -72,20 +72,12 @@ public class ChunkReader implements IChunkReader {
this.deleteIntervalList = chunk.getDeleteIntervalList();
chunkHeader = chunk.getHeader();
this.unCompressor = IUnCompressor.getUnCompressor(chunkHeader.getCompressionType());
-
- initAllPageReaders(chunk.getChunkStatistic());
- }
-
- public ChunkReader(Chunk chunk, Filter filter, boolean isFromOldFile)
- throws IOException {
- this.filter = filter;
- this.chunkDataBuffer = chunk.getData();
- this.deleteIntervalList = chunk.getDeleteIntervalList();
- chunkHeader = chunk.getHeader();
- this.unCompressor = IUnCompressor.getUnCompressor(chunkHeader.getCompressionType());
- this.isFromOldTsFile = isFromOldFile;
-
- initAllPageReaders(chunk.getChunkStatistic());
+ if (chunk.isFromOldFile()) {
+ initAllPageReadersV2();
+ }
+ else {
+ initAllPageReaders(chunk.getChunkStatistic());
+ }
}
private void initAllPageReaders(Statistics chunkStatistic) throws IOException {
@@ -193,4 +185,44 @@ public class ChunkReader implements IChunkReader {
public List<IPageReader> loadPageReaderList() {
return pageReaderList;
}
+
+ // For reading TsFile V2
+ private void initAllPageReadersV2() throws IOException {
+ // construct next satisfied page header
+ while (chunkDataBuffer.remaining() > 0) {
+ // deserialize a PageHeader from chunkDataBuffer
+ PageHeader pageHeader = PageHeaderV2.deserializeFrom(chunkDataBuffer, chunkHeader.getDataType());
+ // if the current page satisfies
+ if (pageSatisfied(pageHeader)) {
+ pageReaderList.add(constructPageReaderForNextPageV2(pageHeader));
+ } else {
+ skipBytesInStreamByLength(pageHeader.getCompressedSize());
+ }
+ }
+ }
+
+ //For reading TsFile V2
+ private PageReader constructPageReaderForNextPageV2(PageHeader pageHeader)
+ throws IOException {
+ int compressedPageBodyLength = pageHeader.getCompressedSize();
+ byte[] compressedPageBody = new byte[compressedPageBodyLength];
+
+ // doesn't has a complete page body
+ if (compressedPageBodyLength > chunkDataBuffer.remaining()) {
+ throw new IOException("do not has a complete page body. Expected:" + compressedPageBodyLength
+ + ". Actual:" + chunkDataBuffer.remaining());
+ }
+
+ chunkDataBuffer.get(compressedPageBody);
+ Decoder valueDecoder = Decoder
+ .getDecoderByType(chunkHeader.getEncodingType(), chunkHeader.getDataType());
+ byte[] uncompressedPageData = new byte[pageHeader.getUncompressedSize()];
+ unCompressor.uncompress(compressedPageBody,0, compressedPageBodyLength,
+ uncompressedPageData, 0);
+ ByteBuffer pageData = ByteBuffer.wrap(uncompressedPageData);
+ PageReader reader = new PageReaderV2(pageHeader, pageData, chunkHeader.getDataType(),
+ valueDecoder, timeDecoder, filter);
+ reader.setDeleteIntervalList(deleteIntervalList);
+ return reader;
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
index efc3b01..b6eb60f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
@@ -39,29 +39,29 @@ public class PageReader implements IPageReader {
private PageHeader pageHeader;
- private TSDataType dataType;
+ protected TSDataType dataType;
/**
* decoder for value column
*/
- private Decoder valueDecoder;
+ protected Decoder valueDecoder;
/**
* decoder for time column
*/
- private Decoder timeDecoder;
+ protected Decoder timeDecoder;
/**
* time column in memory
*/
- private ByteBuffer timeBuffer;
+ protected ByteBuffer timeBuffer;
/**
* value column in memory
*/
- private ByteBuffer valueBuffer;
+ protected ByteBuffer valueBuffer;
- private Filter filter;
+ protected Filter filter;
/**
* A list of deleted intervals.
@@ -178,7 +178,7 @@ public class PageReader implements IPageReader {
return pageHeader.isModified();
}
- private boolean isDeleted(long timestamp) {
+ protected boolean isDeleted(long timestamp) {
while (deleteIntervalList != null && deleteCursor < deleteIntervalList.size()) {
if (deleteIntervalList.get(deleteCursor).contains(timestamp)) {
return true;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/footer/ChunkGroupFooterV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/footer/ChunkGroupFooterV2.java
new file mode 100644
index 0000000..438f83b
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/footer/ChunkGroupFooterV2.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.v2.file.footer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
+import org.apache.iotdb.tsfile.read.reader.TsFileInput;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+public class ChunkGroupFooterV2 {
+
+ /**
+ * deserialize from inputStream.
+ *
+ * @param markerRead Whether the marker of the CHUNK_GROUP_FOOTER is read ahead.
+ */
+ public static ChunkGroupHeader deserializeFrom(InputStream inputStream, boolean markerRead) throws IOException {
+ if (!markerRead) {
+ byte marker = (byte) inputStream.read();
+ if (marker != MetaMarker.CHUNK_GROUP_HEADER) {
+ MetaMarker.handleUnexpectedMarker(marker);
+ }
+ }
+
+ String deviceID = ReadWriteIOUtils.readString(inputStream);
+ // dataSize
+ ReadWriteIOUtils.readLong(inputStream);
+ // numOfChunks
+ ReadWriteIOUtils.readInt(inputStream);
+ return new ChunkGroupHeader(deviceID);
+ }
+
+ /**
+ * deserialize from TsFileInput.
+ *
+ * @param markerRead Whether the marker of the CHUNK_GROUP_FOOTER is read ahead.
+ */
+ public static ChunkGroupHeader deserializeFrom(TsFileInput input, long offset, boolean markerRead)
+ throws IOException {
+ long offsetVar = offset;
+ if (!markerRead) {
+ offsetVar++;
+ }
+ ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES);
+ input.read(buffer, offsetVar);
+ buffer.flip();
+ int size = buffer.getInt();
+ offsetVar += Integer.BYTES;
+ buffer = ByteBuffer.allocate(getSerializedSize(size));
+ ReadWriteIOUtils.readAsPossible(input, offsetVar, buffer);
+ buffer.flip();
+ String deviceID = ReadWriteIOUtils.readStringWithLength(buffer, size);
+ // dataSize
+ ReadWriteIOUtils.readLong(buffer);
+ // numOfChunks
+ ReadWriteIOUtils.readInt(buffer);
+ return new ChunkGroupHeader(deviceID);
+ }
+
+ private static int getSerializedSize(int deviceIdLength) {
+ return deviceIdLength + Long.BYTES + Integer.BYTES;
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/header/ChunkHeaderV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/header/ChunkHeaderV2.java
new file mode 100644
index 0000000..7574d31
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/header/ChunkHeaderV2.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.v2.file.header;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.reader.TsFileInput;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.v2.file.metadata.enums.CompressionTypeV2;
+import org.apache.iotdb.tsfile.v2.file.metadata.enums.TSDataTypeV2;
+import org.apache.iotdb.tsfile.v2.file.metadata.enums.TSEncodingV2;
+
+public class ChunkHeaderV2 {
+
+ /**
+ * deserialize from inputStream.
+ *
+ * @param markerRead Whether the marker of the CHUNK_HEADER has been read
+ */
+ public static ChunkHeader deserializeFrom(InputStream inputStream, boolean markerRead)
+ throws IOException {
+ if (!markerRead) {
+ byte marker = (byte) inputStream.read();
+ if (marker != MetaMarker.CHUNK_HEADER) {
+ MetaMarker.handleUnexpectedMarker(marker);
+ }
+ }
+
+ String measurementID = ReadWriteIOUtils.readString(inputStream);
+ int dataSize = ReadWriteIOUtils.readInt(inputStream);
+ TSDataType dataType = TSDataTypeV2.deserialize(ReadWriteIOUtils.readShort(inputStream));
+ int numOfPages = ReadWriteIOUtils.readInt(inputStream);
+ CompressionType type = CompressionTypeV2.deserialize(ReadWriteIOUtils.readShort(inputStream));
+ TSEncoding encoding = TSEncodingV2.deserialize(ReadWriteIOUtils.readShort(inputStream));
+ return new ChunkHeader(measurementID, dataSize, dataType, type, encoding, numOfPages);
+ }
+
+ /**
+ * deserialize from TsFileInput.
+ *
+ * @param input TsFileInput
+ * @param offset offset
+ * @param chunkHeaderSize the size of chunk's header
+ * @param markerRead read marker (boolean type)
+ * @return CHUNK_HEADER object
+ * @throws IOException IOException
+ */
+ public static ChunkHeader deserializeFrom(TsFileInput input, long offset, int chunkHeaderSize,
+ boolean markerRead)
+ throws IOException {
+ long offsetVar = offset;
+ if (!markerRead) {
+ offsetVar++;
+ }
+
+ // read chunk header from input to buffer
+ ByteBuffer buffer = ByteBuffer.allocate(chunkHeaderSize);
+ input.read(buffer, offsetVar);
+ buffer.flip();
+
+ // read measurementID
+ int size = buffer.getInt();
+ String measurementID = ReadWriteIOUtils.readStringWithLength(buffer, size);
+ int dataSize = ReadWriteIOUtils.readInt(buffer);
+ TSDataType dataType = TSDataTypeV2.deserialize(ReadWriteIOUtils.readShort(buffer));
+ int numOfPages = ReadWriteIOUtils.readInt(buffer);
+ CompressionType type = CompressionTypeV2.deserialize(ReadWriteIOUtils.readShort(buffer));
+ TSEncoding encoding = TSEncodingV2.deserialize(ReadWriteIOUtils.readShort(buffer));
+ return new ChunkHeader(numOfPages <= 1 ? MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER : MetaMarker.CHUNK_HEADER,
+ measurementID, dataSize, chunkHeaderSize, dataType, type, encoding);
+ }
+
+ public static int getSerializedSize(String measurementID) {
+ return Byte.BYTES // marker
+ + Integer.BYTES // measurementID length
+ + measurementID.getBytes(TSFileConfig.STRING_CHARSET).length // measurementID
+ + Integer.BYTES // dataSize
+ + Short.BYTES // dataType
+ + Short.BYTES // compressionType
+ + Short.BYTES // encodingType
+ + Integer.BYTES; // numOfPages
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/header/PageHeaderV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/header/PageHeaderV2.java
new file mode 100644
index 0000000..52a5bb7
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/header/PageHeaderV2.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.v2.file.header;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.v2.file.metadata.statistics.StatisticsV2;
+
+public class PageHeaderV2 {
+
+ public static PageHeader deserializeFrom(InputStream inputStream, TSDataType dataType)
+ throws IOException {
+ int uncompressedSize = ReadWriteIOUtils.readInt(inputStream);
+ int compressedSize = ReadWriteIOUtils.readInt(inputStream);
+ Statistics<?> statistics = StatisticsV2.deserialize(inputStream, dataType);
+ return new PageHeader(uncompressedSize, compressedSize, statistics);
+ }
+
+ public static PageHeader deserializeFrom(ByteBuffer buffer, TSDataType dataType) {
+ int uncompressedSize = ReadWriteIOUtils.readInt(buffer);
+ int compressedSize = ReadWriteIOUtils.readInt(buffer);
+ Statistics<?> statistics = StatisticsV2.deserialize(buffer, dataType);
+ return new PageHeader(uncompressedSize, compressedSize, statistics);
+ }
+
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/ChunkMetadataV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/ChunkMetadataV2.java
new file mode 100644
index 0000000..4eac6df
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/ChunkMetadataV2.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.v2.file.metadata;
+
+import java.nio.ByteBuffer;
+
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.v2.file.metadata.enums.TSDataTypeV2;
+import org.apache.iotdb.tsfile.v2.file.metadata.statistics.StatisticsV2;
+
+public class ChunkMetadataV2 {
+ /**
+ * deserialize from ByteBuffer.
+ *
+ * @param buffer ByteBuffer
+ * @return ChunkMetaData object
+ */
+ public static ChunkMetadata deserializeFrom(ByteBuffer buffer) {
+
+ String measurementUid = ReadWriteIOUtils.readString(buffer);
+ long offsetOfChunkHeader = ReadWriteIOUtils.readLong(buffer);
+ TSDataType tsDataType = TSDataTypeV2.deserialize(ReadWriteIOUtils.readShort(buffer));
+
+ Statistics<?> statistics = StatisticsV2.deserialize(buffer, tsDataType);
+ ChunkMetadata chunkMetaData = new ChunkMetadata(measurementUid, tsDataType,
+ offsetOfChunkHeader, statistics);
+ chunkMetaData.setFromOldTsFile(true);
+ return chunkMetaData;
+ }
+
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/MetadataIndexEntryV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/MetadataIndexEntryV2.java
new file mode 100644
index 0000000..40e5abc
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/MetadataIndexEntryV2.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.v2.file.metadata;
+
+import java.nio.ByteBuffer;
+
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+public class MetadataIndexEntryV2 {
+
+ public static MetadataIndexEntry deserializeFrom(ByteBuffer buffer) {
+ String name = ReadWriteIOUtils.readString(buffer);
+ long offset = ReadWriteIOUtils.readLong(buffer);
+ return new MetadataIndexEntry(name, offset);
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/MetadataIndexNodeV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/MetadataIndexNodeV2.java
new file mode 100644
index 0000000..ac3a4fd
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/MetadataIndexNodeV2.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.v2.file.metadata;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
+import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+public class MetadataIndexNodeV2 {
+ public static MetadataIndexNode deserializeFrom(ByteBuffer buffer) {
+ List<MetadataIndexEntry> children = new ArrayList<>();
+ int size = ReadWriteIOUtils.readInt(buffer);
+ for (int i = 0; i < size; i++) {
+ children.add(MetadataIndexEntryV2.deserializeFrom(buffer));
+ }
+ long offset = ReadWriteIOUtils.readLong(buffer);
+ MetadataIndexNodeType nodeType = MetadataIndexNodeType
+ .deserialize(ReadWriteIOUtils.readByte(buffer));
+ return new MetadataIndexNode(children, offset, nodeType);
+ }
+
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/TimeseriesMetadataV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/TimeseriesMetadataV2.java
new file mode 100644
index 0000000..d2409b5
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/TimeseriesMetadataV2.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.v2.file.metadata;
+
+import java.nio.ByteBuffer;
+
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.v2.file.metadata.enums.TSDataTypeV2;
+import org.apache.iotdb.tsfile.v2.file.metadata.statistics.StatisticsV2;
+
+public class TimeseriesMetadataV2 {
+
+ public static TimeseriesMetadata deserializeFrom(ByteBuffer buffer) {
+ TimeseriesMetadata timeseriesMetaData = new TimeseriesMetadata();
+ timeseriesMetaData.setMeasurementId(ReadWriteIOUtils.readString(buffer));
+ timeseriesMetaData.setTSDataType(TSDataTypeV2.deserialize(ReadWriteIOUtils.readShort(buffer)));
+ timeseriesMetaData.setOffsetOfChunkMetaDataList(ReadWriteIOUtils.readLong(buffer));
+ timeseriesMetaData.setDataSizeOfChunkMetaDataList(ReadWriteIOUtils.readInt(buffer));
+ timeseriesMetaData.setStatistics(StatisticsV2.deserialize(buffer, timeseriesMetaData.getTSDataType()));
+ return timeseriesMetaData;
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/TsFileMetadataV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/TsFileMetadataV2.java
new file mode 100644
index 0000000..aa9cab0
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/TsFileMetadataV2.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.v2.file.metadata;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
+import org.apache.iotdb.tsfile.utils.BloomFilter;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+public class TsFileMetadataV2 {
+
+ /**
+ * deserialize data from the buffer.
+ *
+ * @param buffer -buffer use to deserialize
+ * @return -a instance of TsFileMetaData
+ */
+ public static TsFileMetadata deserializeFrom(ByteBuffer buffer) {
+ TsFileMetadata fileMetaData = new TsFileMetadata();
+
+ // metadataIndex
+ fileMetaData.setMetadataIndex(MetadataIndexNodeV2.deserializeFrom(buffer));
+ // totalChunkNum
+ ReadWriteIOUtils.readInt(buffer);
+ // invalidChunkNum
+ ReadWriteIOUtils.readInt(buffer);
+
+ // versionInfo
+ List<Pair<Long, Long>> versionInfo = new ArrayList<>();
+ int versionSize = ReadWriteIOUtils.readInt(buffer);
+ for (int i = 0; i < versionSize; i++) {
+ long versionPos = ReadWriteIOUtils.readLong(buffer);
+ long version = ReadWriteIOUtils.readLong(buffer);
+ versionInfo.add(new Pair<>(versionPos, version));
+ }
+ fileMetaData.setVersionInfo(versionInfo);
+
+ // metaOffset
+ long metaOffset = ReadWriteIOUtils.readLong(buffer);
+ fileMetaData.setMetaOffset(metaOffset);
+
+ // read bloom filter
+ if (buffer.hasRemaining()) {
+ int byteLength = ReadWriteIOUtils.readInt(buffer);
+ byte[] bytes = new byte[byteLength];
+ buffer.get(bytes);
+ int filterSize = ReadWriteIOUtils.readInt(buffer);
+ int hashFunctionSize = ReadWriteIOUtils.readInt(buffer);
+ fileMetaData.setBloomFilter(BloomFilter.buildBloomFilter(bytes, filterSize, hashFunctionSize));
+ }
+
+ return fileMetaData;
+ }
+
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/enums/CompressionTypeV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/enums/CompressionTypeV2.java
new file mode 100644
index 0000000..91e6d8a
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/enums/CompressionTypeV2.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.v2.file.metadata.enums;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+
+public class CompressionTypeV2 {
+
+ /**
+ * deserialize short number.
+ *
+ * @param compressor short number
+ * @return CompressionType
+ */
+ public static CompressionType deserialize(short compressor) {
+ return getCompressionType(compressor);
+ }
+
+ private static CompressionType getCompressionType(short compressor) {
+ if (compressor >= 8 || compressor < 0) {
+ throw new IllegalArgumentException("Invalid input: " + compressor);
+ }
+ switch (compressor) {
+ case 1:
+ return CompressionType.SNAPPY;
+ case 2:
+ return CompressionType.GZIP;
+ case 3:
+ return CompressionType.LZO;
+ case 4:
+ return CompressionType.SDT;
+ case 5:
+ return CompressionType.PAA;
+ case 6:
+ return CompressionType.PLA;
+ case 7:
+ return CompressionType.LZ4;
+ default:
+ return CompressionType.UNCOMPRESSED;
+ }
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/enums/TSDataTypeV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/enums/TSDataTypeV2.java
new file mode 100644
index 0000000..9760275
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/enums/TSDataTypeV2.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.v2.file.metadata.enums;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+public class TSDataTypeV2 {
+
+ /**
+ * give an integer to return a data type.
+ *
+ * @param type -param to judge enum type
+ * @return -enum type
+ */
+ public static TSDataType deserialize(short type) {
+ return getTsDataType(type);
+ }
+
+ private static TSDataType getTsDataType(short type) {
+ if (type >= 6 || type < 0) {
+ throw new IllegalArgumentException("Invalid input: " + type);
+ }
+ switch (type) {
+ case 0:
+ return TSDataType.BOOLEAN;
+ case 1:
+ return TSDataType.INT32;
+ case 2:
+ return TSDataType.INT64;
+ case 3:
+ return TSDataType.FLOAT;
+ case 4:
+ return TSDataType.DOUBLE;
+ default:
+ return TSDataType.TEXT;
+ }
+ }
+
+ public static byte deserializeToByte(short type) {
+ if (type >= 6 || type < 0) {
+ throw new IllegalArgumentException("Invalid input: " + type);
+ }
+ return (byte) type;
+ }
+
+ /**
+ * give an byte to return a data type.
+ *
+ * @param type byte number
+ * @return data type
+ */
+ public static TSDataType byteToEnum(byte type) {
+ return getTsDataType(type);
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/enums/TSEncodingV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/enums/TSEncodingV2.java
new file mode 100644
index 0000000..f238fab
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/enums/TSEncodingV2.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.v2.file.metadata.enums;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+public class TSEncodingV2 {
+
+ /**
+ * judge the encoding deserialize type.
+ *
+ * @param encoding -use to determine encoding type
+ * @return -encoding type
+ */
+ public static TSEncoding deserialize(short encoding) {
+ return getTsEncoding(encoding);
+ }
+
+ private static TSEncoding getTsEncoding(short encoding) {
+ if (encoding < 0 || 8 < encoding) {
+ throw new IllegalArgumentException("Invalid input: " + encoding);
+ }
+ switch (encoding) {
+ case 1:
+ return TSEncoding.PLAIN_DICTIONARY;
+ case 2:
+ return TSEncoding.RLE;
+ case 3:
+ return TSEncoding.DIFF;
+ case 4:
+ return TSEncoding.TS_2DIFF;
+ case 5:
+ return TSEncoding.BITMAP;
+ case 6:
+ return TSEncoding.GORILLA_V1;
+ case 7:
+ return TSEncoding.REGULAR;
+ case 8:
+ return TSEncoding.GORILLA;
+ default:
+ return TSEncoding.PLAIN;
+ }
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/statistics/StatisticsV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/statistics/StatisticsV2.java
new file mode 100644
index 0000000..e2b71fa
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/statistics/StatisticsV2.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.v2.file.metadata.statistics;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.iotdb.tsfile.exception.write.UnknownColumnTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.BooleanStatistics;
+import org.apache.iotdb.tsfile.file.metadata.statistics.IntegerStatistics;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+public class StatisticsV2 {
+
+
+ public static Statistics deserialize(InputStream inputStream, TSDataType dataType)
+ throws IOException {
+ Statistics<?> statistics = Statistics.getStatsByType(dataType);
+ statistics.setCount((int) ReadWriteIOUtils.readLong(inputStream));
+ statistics.setStartTime(ReadWriteIOUtils.readLong(inputStream));
+ statistics.setEndTime(ReadWriteIOUtils.readLong(inputStream));
+ switch (dataType) {
+ case BOOLEAN:
+ boolean firstBool = ReadWriteIOUtils.readBool(inputStream);
+ boolean lastBool = ReadWriteIOUtils.readBool(inputStream);
+ ((BooleanStatistics) statistics).initializeStats(firstBool, lastBool, 0);
+ break;
+ case INT32:
+ int minValue = ReadWriteIOUtils.readInt(inputStream);
+ int maxValue = ReadWriteIOUtils.readInt(inputStream);
+ int firstValue = ReadWriteIOUtils.readInt(inputStream);
+ int lastValue = ReadWriteIOUtils.readInt(inputStream);
+ long sumValue = (long) ReadWriteIOUtils.readDouble(inputStream);
+ ((IntegerStatistics) statistics).initializeStats(minValue, maxValue, firstValue, lastValue, sumValue);
+ break;
+ case INT64:
+ case TEXT:
+ case DOUBLE:
+ case FLOAT:
+ statistics.deserialize(inputStream);
+ break;
+ default:
+ throw new UnknownColumnTypeException(dataType.toString());
+ }
+ statistics.setEmpty(false);
+ return statistics;
+ }
+
+ public static Statistics deserialize(ByteBuffer buffer, TSDataType dataType) {
+ Statistics<?> statistics = Statistics.getStatsByType(dataType);
+ statistics.setCount((int) ReadWriteIOUtils.readLong(buffer));
+ statistics.setStartTime(ReadWriteIOUtils.readLong(buffer));
+ statistics.setEndTime(ReadWriteIOUtils.readLong(buffer));
+ switch (dataType) {
+ case BOOLEAN:
+ boolean firstBool = ReadWriteIOUtils.readBool(buffer);
+ boolean lastBool = ReadWriteIOUtils.readBool(buffer);
+ ((BooleanStatistics) statistics).initializeStats(firstBool, lastBool, 0);
+ break;
+ case INT32:
+ int minValue = ReadWriteIOUtils.readInt(buffer);
+ int maxValue = ReadWriteIOUtils.readInt(buffer);
+ int firstValue = ReadWriteIOUtils.readInt(buffer);
+ int lastValue = ReadWriteIOUtils.readInt(buffer);
+ long sumValue = (long) ReadWriteIOUtils.readDouble(buffer);
+ ((IntegerStatistics) statistics).initializeStats(minValue, maxValue, firstValue, lastValue, sumValue);
+ break;
+ case INT64:
+ case TEXT:
+ case DOUBLE:
+ case FLOAT:
+ statistics.deserialize(buffer);
+ break;
+ default:
+ throw new UnknownColumnTypeException(dataType.toString());
+ }
+ statistics.setEmpty(false);
+ return statistics;
+ }
+
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java
new file mode 100644
index 0000000..9e1402f
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java
@@ -0,0 +1,1045 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.v2.read;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.TsFileCheckStatus;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.controller.MetadataQuerierByFileImpl;
+import org.apache.iotdb.tsfile.read.reader.TsFileInput;
+import org.apache.iotdb.tsfile.utils.BloomFilter;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.utils.VersionUtils;
+import org.apache.iotdb.tsfile.v2.file.footer.ChunkGroupFooterV2;
+import org.apache.iotdb.tsfile.v2.file.header.ChunkHeaderV2;
+import org.apache.iotdb.tsfile.v2.file.header.PageHeaderV2;
+import org.apache.iotdb.tsfile.v2.file.metadata.ChunkMetadataV2;
+import org.apache.iotdb.tsfile.v2.file.metadata.MetadataIndexNodeV2;
+import org.apache.iotdb.tsfile.v2.file.metadata.TimeseriesMetadataV2;
+import org.apache.iotdb.tsfile.v2.file.metadata.TsFileMetadataV2;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TsFileSequenceReaderForV2 extends TsFileSequenceReader implements AutoCloseable {
+
+ private static final Logger logger = LoggerFactory.getLogger(TsFileSequenceReaderForV2.class);
+ private static final Logger resourceLogger = LoggerFactory.getLogger("FileMonitor");
+ private long fileMetadataPos;
+ private int fileMetadataSize;
+ private ByteBuffer markerBuffer = ByteBuffer.allocate(Byte.BYTES);
+ private int totalChunkNum;
+ private TsFileMetadata tsFileMetaData;
+ // device -> measurement -> TimeseriesMetadata
+ private Map<String, Map<String, TimeseriesMetadata>> cachedDeviceMetadata = new ConcurrentHashMap<>();
+ private static final ReadWriteLock cacheLock = new ReentrantReadWriteLock();
+ private boolean cacheDeviceMetadata;
+
+ /**
+ * Create a file reader of the given file. The reader will read the tail of the file to get the
+ * file metadata size.Then the reader will skip the first TSFileConfig.MAGIC_STRING.getBytes().length
+ * + TSFileConfig.NUMBER_VERSION.getBytes().length bytes of the file for preparing reading real
+ * data.
+ *
+ * @param file the data file
+ * @throws IOException If some I/O error occurs
+ */
+ public TsFileSequenceReaderForV2(String file) throws IOException {
+ this(file, true);
+ }
+
+ /**
+ * construct function for TsFileSequenceReader.
+ *
+ * @param file -given file name
+ * @param loadMetadataSize -whether load meta data size
+ */
+ public TsFileSequenceReaderForV2(String file, boolean loadMetadataSize) throws IOException {
+ super(file, loadMetadataSize);
+ }
+
+ // used in merge resource
+ public TsFileSequenceReaderForV2(String file, boolean loadMetadata, boolean cacheDeviceMetadata)
+ throws IOException {
+ this(file, loadMetadata);
+ this.cacheDeviceMetadata = cacheDeviceMetadata;
+ }
+
+ /**
+ * Create a file reader of the given file. The reader will read the tail of the file to get the
+ * file metadata size.Then the reader will skip the first TSFileConfig.MAGIC_STRING.getBytes().length
+ * + TSFileConfig.NUMBER_VERSION.getBytes().length bytes of the file for preparing reading real
+ * data.
+ *
+ * @param input given input
+ */
+ public TsFileSequenceReaderForV2(TsFileInput input) throws IOException {
+ this(input, true);
+ }
+
+ /**
+ * construct function for TsFileSequenceReader.
+ *
+ * @param input -given input
+ * @param loadMetadataSize -load meta data size
+ */
+ public TsFileSequenceReaderForV2(TsFileInput input, boolean loadMetadataSize) throws IOException {
+ super(input, loadMetadataSize);
+ }
+
+ /**
+ * construct function for TsFileSequenceReader.
+ *
+ * @param input the input of a tsfile. The current position should be a markder and
+ * then a chunk Header, rather than the magic number
+ * @param fileMetadataPos the position of the file metadata in the TsFileInput from the beginning
+ * of the input to the current position
+ * @param fileMetadataSize the byte size of the file metadata in the input
+ */
+ public TsFileSequenceReaderForV2(TsFileInput input, long fileMetadataPos, int fileMetadataSize) {
+ super(input, fileMetadataPos, fileMetadataSize);
+ this.fileMetadataPos = fileMetadataPos;
+ this.fileMetadataSize = fileMetadataSize;
+ }
+
+ @Override
+ public void loadMetadataSize() throws IOException {
+ ByteBuffer metadataSize = ByteBuffer.allocate(Integer.BYTES);
+ if (readTailMagic().equals(TSFileConfig.MAGIC_STRING)) {
+ tsFileInput.read(metadataSize,
+ tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length - Integer.BYTES);
+ metadataSize.flip();
+ // read file metadata size and position
+ fileMetadataSize = ReadWriteIOUtils.readInt(metadataSize);
+ fileMetadataPos = tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length
+ - Integer.BYTES - fileMetadataSize;
+ }
+ }
+
+ public long getFileMetadataPos() {
+ return fileMetadataPos;
+ }
+
+ public int getFileMetadataSize() {
+ return fileMetadataSize;
+ }
+
+ /**
+ * this function does not modify the position of the file reader.
+ */
+ public String readTailMagic() throws IOException {
+ long totalSize = tsFileInput.size();
+ ByteBuffer magicStringBytes = ByteBuffer
+ .allocate(TSFileConfig.MAGIC_STRING.getBytes().length);
+ tsFileInput.read(magicStringBytes, totalSize - TSFileConfig.MAGIC_STRING.getBytes().length);
+ magicStringBytes.flip();
+ return new String(magicStringBytes.array());
+ }
+
+ /**
+ * whether the file is a complete TsFile: only if the head magic and tail magic string exists.
+ */
+ public boolean isComplete() throws IOException {
+ return tsFileInput.size() >= TSFileConfig.MAGIC_STRING.getBytes().length * 2
+ + TSFileConfig.VERSION_NUMBER_V2.getBytes().length
+ && (readTailMagic().equals(readHeadMagic()) || readTailMagic()
+ .equals(TSFileConfig.VERSION_NUMBER_V1));
+ }
+
+ /**
+ * this function does not modify the position of the file reader.
+ */
+ public String readHeadMagic() throws IOException {
+ ByteBuffer magicStringBytes = ByteBuffer
+ .allocate(TSFileConfig.MAGIC_STRING.getBytes().length);
+ tsFileInput.read(magicStringBytes, 0);
+ magicStringBytes.flip();
+ return new String(magicStringBytes.array());
+ }
+
+ /**
+ * this function reads version number and checks compatibility of TsFile.
+ */
+ public String readVersionNumberV2() throws IOException {
+ ByteBuffer versionNumberBytes = ByteBuffer
+ .allocate(TSFileConfig.VERSION_NUMBER_V2.getBytes().length);
+ tsFileInput.read(versionNumberBytes, TSFileConfig.MAGIC_STRING.getBytes().length);
+ versionNumberBytes.flip();
+ return new String(versionNumberBytes.array());
+ }
+
+ /**
+ * this function does not modify the position of the file reader.
+ *
+ * @throws IOException io error
+ */
+ public TsFileMetadata readFileMetadata() throws IOException {
+ if (tsFileMetaData == null) {
+ tsFileMetaData = TsFileMetadataV2.deserializeFrom(readData(fileMetadataPos, fileMetadataSize));
+ }
+ return tsFileMetaData;
+ }
+
+ /**
+ * this function does not modify the position of the file reader.
+ *
+ * @throws IOException io error
+ */
+ public BloomFilter readBloomFilter() throws IOException {
+ readFileMetadata();
+ return tsFileMetaData.getBloomFilter();
+ }
+
+ /**
+ * this function reads measurements and TimeseriesMetaDatas in given device Thread Safe
+ *
+ * @param device name
+ * @return the map measurementId -> TimeseriesMetaData in one device
+ * @throws IOException io error
+ */
+ public Map<String, TimeseriesMetadata> readDeviceMetadata(String device) throws IOException {
+ if (!cacheDeviceMetadata) {
+ return readDeviceMetadataFromDisk(device);
+ }
+
+ cacheLock.readLock().lock();
+ try {
+ if (cachedDeviceMetadata.containsKey(device)) {
+ return cachedDeviceMetadata.get(device);
+ }
+ } finally {
+ cacheLock.readLock().unlock();
+ }
+
+ cacheLock.writeLock().lock();
+ try {
+ if (cachedDeviceMetadata.containsKey(device)) {
+ return cachedDeviceMetadata.get(device);
+ }
+ readFileMetadata();
+ Map<String, TimeseriesMetadata> deviceMetadata = readDeviceMetadataFromDisk(device);
+ cachedDeviceMetadata.put(device, deviceMetadata);
+ return deviceMetadata;
+ } finally {
+ cacheLock.writeLock().unlock();
+ }
+ }
+
+ private Map<String, TimeseriesMetadata> readDeviceMetadataFromDisk(String device)
+ throws IOException {
+ readFileMetadata();
+ List<TimeseriesMetadata> timeseriesMetadataList = getDeviceTimeseriesMetadata(device);
+ Map<String, TimeseriesMetadata> deviceMetadata = new HashMap<>();
+ for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) {
+ deviceMetadata.put(timeseriesMetadata.getMeasurementId(), timeseriesMetadata);
+ }
+ return deviceMetadata;
+ }
+
+ public TimeseriesMetadata readTimeseriesMetadata(Path path) throws IOException {
+ readFileMetadata();
+ MetadataIndexNode deviceMetadataIndexNode = tsFileMetaData.getMetadataIndex();
+ Pair<MetadataIndexEntry, Long> metadataIndexPair = getMetadataAndEndOffset(
+ deviceMetadataIndexNode, path.getDevice(), MetadataIndexNodeType.INTERNAL_DEVICE, true);
+ if (metadataIndexPair == null) {
+ return null;
+ }
+ ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
+ MetadataIndexNode metadataIndexNode = deviceMetadataIndexNode;
+ if (!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
+ metadataIndexNode = MetadataIndexNodeV2.deserializeFrom(buffer);
+ metadataIndexPair = getMetadataAndEndOffset(metadataIndexNode,
+ path.getMeasurement(), MetadataIndexNodeType.INTERNAL_MEASUREMENT, false);
+ }
+ if (metadataIndexPair == null) {
+ return null;
+ }
+ List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
+ buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
+ while (buffer.hasRemaining()) {
+ timeseriesMetadataList.add(TimeseriesMetadataV2.deserializeFrom(buffer));
+ }
+ // return null if path does not exist in the TsFile
+ int searchResult = binarySearchInTimeseriesMetadataList(timeseriesMetadataList,
+ path.getMeasurement());
+ return searchResult >= 0 ? timeseriesMetadataList.get(searchResult) : null;
+ }
+
+ public List<TimeseriesMetadata> readTimeseriesMetadata(String device, Set<String> measurements)
+ throws IOException {
+ readFileMetadata();
+ MetadataIndexNode deviceMetadataIndexNode = tsFileMetaData.getMetadataIndex();
+ Pair<MetadataIndexEntry, Long> metadataIndexPair = getMetadataAndEndOffset(
+ deviceMetadataIndexNode, device, MetadataIndexNodeType.INTERNAL_DEVICE, false);
+ if (metadataIndexPair == null) {
+ return Collections.emptyList();
+ }
+ List<TimeseriesMetadata> resultTimeseriesMetadataList = new ArrayList<>();
+ List<String> measurementList = new ArrayList<>(measurements);
+ Set<String> measurementsHadFound = new HashSet<>();
+ for (int i = 0; i < measurementList.size(); i++) {
+ if (measurementsHadFound.contains(measurementList.get(i))) {
+ continue;
+ }
+ ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
+ Pair<MetadataIndexEntry, Long> measurementMetadataIndexPair = metadataIndexPair;
+ List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
+ MetadataIndexNode metadataIndexNode = deviceMetadataIndexNode;
+ if (!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
+ metadataIndexNode = MetadataIndexNodeV2.deserializeFrom(buffer);
+ measurementMetadataIndexPair = getMetadataAndEndOffset(metadataIndexNode,
+ measurementList.get(i), MetadataIndexNodeType.INTERNAL_MEASUREMENT, false);
+ }
+ if (measurementMetadataIndexPair == null) {
+ return Collections.emptyList();
+ }
+ buffer = readData(measurementMetadataIndexPair.left.getOffset(),
+ measurementMetadataIndexPair.right);
+ while (buffer.hasRemaining()) {
+ timeseriesMetadataList.add(TimeseriesMetadataV2.deserializeFrom(buffer));
+ }
+ for (int j = i; j < measurementList.size(); j++) {
+ String current = measurementList.get(j);
+ if (!measurementsHadFound.contains(current)) {
+ int searchResult = binarySearchInTimeseriesMetadataList(timeseriesMetadataList, current);
+ if (searchResult >= 0) {
+ resultTimeseriesMetadataList.add(timeseriesMetadataList.get(searchResult));
+ measurementsHadFound.add(current);
+ }
+ }
+ if (measurementsHadFound.size() == measurements.size()) {
+ return resultTimeseriesMetadataList;
+ }
+ }
+ }
+ return resultTimeseriesMetadataList;
+ }
+
+ private int binarySearchInTimeseriesMetadataList(List<TimeseriesMetadata> timeseriesMetadataList,
+ String key) {
+ int low = 0;
+ int high = timeseriesMetadataList.size() - 1;
+
+ while (low <= high) {
+ int mid = (low + high) >>> 1;
+ TimeseriesMetadata midVal = timeseriesMetadataList.get(mid);
+ int cmp = midVal.getMeasurementId().compareTo(key);
+
+ if (cmp < 0) {
+ low = mid + 1;
+ } else if (cmp > 0) {
+ high = mid - 1;
+ } else {
+ return mid; // key found
+ }
+ }
+ return -1; // key not found
+ }
+
+ public List<String> getAllDevices() throws IOException {
+ if (tsFileMetaData == null) {
+ readFileMetadata();
+ }
+ return getAllDevices(tsFileMetaData.getMetadataIndex());
+ }
+
+ private List<String> getAllDevices(MetadataIndexNode metadataIndexNode) throws IOException {
+ List<String> deviceList = new ArrayList<>();
+ int metadataIndexListSize = metadataIndexNode.getChildren().size();
+ if (metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.INTERNAL_MEASUREMENT)) {
+ for (MetadataIndexEntry index : metadataIndexNode.getChildren()) {
+ deviceList.add(index.getName());
+ }
+ } else {
+ for (int i = 0; i < metadataIndexListSize; i++) {
+ long endOffset = metadataIndexNode.getEndOffset();
+ if (i != metadataIndexListSize - 1) {
+ endOffset = metadataIndexNode.getChildren().get(i + 1).getOffset();
+ }
+ ByteBuffer buffer = readData(metadataIndexNode.getChildren().get(i).getOffset(), endOffset);
+ MetadataIndexNode node = MetadataIndexNodeV2.deserializeFrom(buffer);
+ if (node.getNodeType().equals(MetadataIndexNodeType.LEAF_DEVICE)) {
+ // if node in next level is LEAF_DEVICE, put all devices in node entry into the set
+ deviceList.addAll(node.getChildren().stream().map(MetadataIndexEntry::getName).collect(
+ Collectors.toList()));
+ } else {
+ // keep traversing
+ deviceList.addAll(getAllDevices(node));
+ }
+ }
+ }
+ return deviceList;
+ }
+
+ /**
+ * read all ChunkMetaDatas of given device
+ *
+ * @param device name
+ * @return measurement -> ChunkMetadata list
+ * @throws IOException io error
+ */
+ public Map<String, List<ChunkMetadata>> readChunkMetadataInDevice(String device)
+ throws IOException {
+ if (tsFileMetaData == null) {
+ readFileMetadata();
+ }
+
+ long start = 0;
+ int size = 0;
+ List<TimeseriesMetadata> timeseriesMetadataMap = getDeviceTimeseriesMetadata(device);
+ for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataMap) {
+ if (start == 0) {
+ start = timeseriesMetadata.getOffsetOfChunkMetaDataList();
+ }
+ size += timeseriesMetadata.getDataSizeOfChunkMetaDataList();
+ }
+ // read buffer of all ChunkMetadatas of this device
+ ByteBuffer buffer = readData(start, size);
+ Map<String, List<ChunkMetadata>> seriesMetadata = new HashMap<>();
+ while (buffer.hasRemaining()) {
+ ChunkMetadata chunkMetadata = ChunkMetadataV2.deserializeFrom(buffer);
+ seriesMetadata.computeIfAbsent(chunkMetadata.getMeasurementUid(), key -> new ArrayList<>())
+ .add(chunkMetadata);
+ }
+
+ // set version in ChunkMetadata
+ List<Pair<Long, Long>> versionInfo = tsFileMetaData.getVersionInfo();
+ for (Entry<String, List<ChunkMetadata>> entry : seriesMetadata.entrySet()) {
+ VersionUtils.applyVersion(entry.getValue(), versionInfo);
+ }
+ return seriesMetadata;
+ }
+
+ /**
+ * this function return all timeseries names in this file
+ *
+ * @return list of Paths
+ * @throws IOException io error
+ */
+ public List<Path> getAllPaths() throws IOException {
+ List<Path> paths = new ArrayList<>();
+ for (String device : getAllDevices()) {
+ Map<String, TimeseriesMetadata> timeseriesMetadataMap = readDeviceMetadata(device);
+ for (String measurementId : timeseriesMetadataMap.keySet()) {
+ paths.add(new Path(device, measurementId));
+ }
+ }
+ return paths;
+ }
+
+ /**
+ * Traverse the metadata index from MetadataIndexEntry to get TimeseriesMetadatas
+ *
+ * @param metadataIndex MetadataIndexEntry
+ * @param buffer byte buffer
+ * @param deviceId String
+ * @param timeseriesMetadataMap map: deviceId -> timeseriesMetadata list
+ */
+ private void generateMetadataIndex(MetadataIndexEntry metadataIndex, ByteBuffer buffer,
+ String deviceId, MetadataIndexNodeType type,
+ Map<String, List<TimeseriesMetadata>> timeseriesMetadataMap) throws IOException {
+ switch (type) {
+ case INTERNAL_DEVICE:
+ case LEAF_DEVICE:
+ case INTERNAL_MEASUREMENT:
+ deviceId = metadataIndex.getName();
+ MetadataIndexNode metadataIndexNode = MetadataIndexNodeV2.deserializeFrom(buffer);
+ int metadataIndexListSize = metadataIndexNode.getChildren().size();
+ for (int i = 0; i < metadataIndexListSize; i++) {
+ long endOffset = metadataIndexNode.getEndOffset();
+ if (i != metadataIndexListSize - 1) {
+ endOffset = metadataIndexNode.getChildren().get(i + 1).getOffset();
+ }
+ ByteBuffer nextBuffer = readData(metadataIndexNode.getChildren().get(i).getOffset(),
+ endOffset);
+ generateMetadataIndex(metadataIndexNode.getChildren().get(i), nextBuffer, deviceId,
+ metadataIndexNode.getNodeType(), timeseriesMetadataMap);
+ }
+ break;
+ case LEAF_MEASUREMENT:
+ List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
+ while (buffer.hasRemaining()) {
+ timeseriesMetadataList.add(TimeseriesMetadataV2.deserializeFrom(buffer));
+ }
+ timeseriesMetadataMap.computeIfAbsent(deviceId, k -> new ArrayList<>())
+ .addAll(timeseriesMetadataList);
+ break;
+ }
+ }
+
+ public Map<String, List<TimeseriesMetadata>> getAllTimeseriesMetadata() throws IOException {
+ if (tsFileMetaData == null) {
+ readFileMetadata();
+ }
+ Map<String, List<TimeseriesMetadata>> timeseriesMetadataMap = new HashMap<>();
+ MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex();
+ List<MetadataIndexEntry> metadataIndexEntryList = metadataIndexNode.getChildren();
+ for (int i = 0; i < metadataIndexEntryList.size(); i++) {
+ MetadataIndexEntry metadataIndexEntry = metadataIndexEntryList.get(i);
+ long endOffset = tsFileMetaData.getMetadataIndex().getEndOffset();
+ if (i != metadataIndexEntryList.size() - 1) {
+ endOffset = metadataIndexEntryList.get(i + 1).getOffset();
+ }
+ ByteBuffer buffer = readData(metadataIndexEntry.getOffset(), endOffset);
+ generateMetadataIndex(metadataIndexEntry, buffer, null,
+ metadataIndexNode.getNodeType(), timeseriesMetadataMap);
+ }
+ return timeseriesMetadataMap;
+ }
+
+ private List<TimeseriesMetadata> getDeviceTimeseriesMetadata(String device) throws IOException {
+ MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex();
+ Pair<MetadataIndexEntry, Long> metadataIndexPair = getMetadataAndEndOffset(
+ metadataIndexNode, device, MetadataIndexNodeType.INTERNAL_DEVICE, true);
+ if (metadataIndexPair == null) {
+ return Collections.emptyList();
+ }
+ ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
+ Map<String, List<TimeseriesMetadata>> timeseriesMetadataMap = new TreeMap<>();
+ generateMetadataIndex(metadataIndexPair.left, buffer, device,
+ MetadataIndexNodeType.INTERNAL_MEASUREMENT, timeseriesMetadataMap);
+ List<TimeseriesMetadata> deviceTimeseriesMetadata = new ArrayList<>();
+ for (List<TimeseriesMetadata> timeseriesMetadataList : timeseriesMetadataMap.values()) {
+ deviceTimeseriesMetadata.addAll(timeseriesMetadataList);
+ }
+ return deviceTimeseriesMetadata;
+ }
+
+ /**
+ * Get target MetadataIndexEntry and its end offset
+ *
+ * @param metadataIndex given MetadataIndexNode
+ * @param name target device / measurement name
+ * @param type target MetadataIndexNodeType, either INTERNAL_DEVICE or
+ * INTERNAL_MEASUREMENT. When searching for a device node, return when it is
+ * not INTERNAL_DEVICE. Likewise, when searching for a measurement node,
+ * return when it is not INTERNAL_MEASUREMENT. This works for the situation
+ * when the index tree does NOT have the device level and ONLY has the
+ * measurement level.
+ * @param exactSearch if is in exact search mode, return null when there is no entry with name;
+ * or else return the nearest MetadataIndexEntry before it (for deeper
+ * search)
+ * @return target MetadataIndexEntry, endOffset pair
+ */
+ private Pair<MetadataIndexEntry, Long> getMetadataAndEndOffset(MetadataIndexNode metadataIndex,
+ String name, MetadataIndexNodeType type, boolean exactSearch) throws IOException {
+ if (!metadataIndex.getNodeType().equals(type)) {
+ return metadataIndex.getChildIndexEntry(name, exactSearch);
+ } else {
+ Pair<MetadataIndexEntry, Long> childIndexEntry = metadataIndex
+ .getChildIndexEntry(name, false);
+ ByteBuffer buffer = readData(childIndexEntry.left.getOffset(), childIndexEntry.right);
+ return getMetadataAndEndOffset(MetadataIndexNodeV2.deserializeFrom(buffer), name, type,
+ false);
+ }
+ }
+
+ /**
+ * read data from current position of the input, and deserialize it to a CHUNK_GROUP_FOOTER. <br>
+ * This method is not threadsafe.
+ *
+ * @return a CHUNK_GROUP_FOOTER
+ * @throws IOException io error
+ */
+ public ChunkGroupHeader readChunkGroupFooter() throws IOException {
+ return ChunkGroupFooterV2.deserializeFrom(tsFileInput.wrapAsInputStream(), true);
+ }
+
+ /**
+ * read data from current position of the input, and deserialize it to a CHUNK_GROUP_FOOTER.
+ *
+ * @param position the offset of the chunk group footer in the file
+ * @param markerRead true if the offset does not contains the marker , otherwise false
+ * @return a CHUNK_GROUP_FOOTER
+ * @throws IOException io error
+ */
+ public ChunkGroupHeader readChunkGroupFooter(long position, boolean markerRead)
+ throws IOException {
+ return ChunkGroupFooterV2.deserializeFrom(tsFileInput, position, markerRead);
+ }
+
+ public long readVersion() throws IOException {
+ ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+ if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer) == 0) {
+ throw new IOException("reach the end of the file.");
+ }
+ buffer.flip();
+ return buffer.getLong();
+ }
+
+ /**
+ * read data from current position of the input, and deserialize it to a CHUNK_HEADER. <br> This
+ * method is not threadsafe.
+ *
+ * @return a CHUNK_HEADER
+ * @throws IOException io error
+ */
+ public ChunkHeader readChunkHeader() throws IOException {
+ return ChunkHeaderV2.deserializeFrom(tsFileInput.wrapAsInputStream(), true);
+ }
+
+ /**
+ * read the chunk's header.
+ *
+ * @param position the file offset of this chunk's header
+ * @param chunkHeaderSize the size of chunk's header
+ * @param markerRead true if the offset does not contains the marker , otherwise false
+ */
+ private ChunkHeader readChunkHeader(long position, int chunkHeaderSize, boolean markerRead)
+ throws IOException {
+ return ChunkHeaderV2.deserializeFrom(tsFileInput, position, chunkHeaderSize, markerRead);
+ }
+
+ /**
+ * notice, this function will modify channel's position.
+ *
+ * @param dataSize the size of chunkdata
+ * @param position the offset of the chunk data
+ * @return the pages of this chunk
+ */
+ private ByteBuffer readChunk(long position, int dataSize) throws IOException {
+ return readData(position, dataSize);
+ }
+
+ /**
+ * read memory chunk.
+ *
+ * @param metaData -given chunk meta data
+ * @return -chunk
+ */
+ public Chunk readMemChunk(ChunkMetadata metaData) throws IOException {
+ int chunkHeadSize = ChunkHeaderV2.getSerializedSize(metaData.getMeasurementUid());
+ ChunkHeader header = readChunkHeader(metaData.getOffsetOfChunkHeader(), chunkHeadSize, false);
+ ByteBuffer buffer = readChunk(metaData.getOffsetOfChunkHeader() + header.getSerializedSize(),
+ header.getDataSize());
+ Chunk chunk = new Chunk(header, buffer, metaData.getDeleteIntervalList(), metaData.getStatistics());
+ chunk.setFromOldFile(true);
+ return chunk;
+ }
+
+ /**
+ * read all Chunks of given device.
+ * <p>
+ * note that this method loads all the chunks into memory, so it needs to be invoked carefully.
+ *
+ * @param device name
+ * @return measurement -> chunks list
+ */
+ public Map<String, List<Chunk>> readChunksInDevice(String device) throws IOException {
+ List<ChunkMetadata> chunkMetadataList = new ArrayList<>();
+ Map<String, List<ChunkMetadata>> chunkMetadataInDevice = readChunkMetadataInDevice(device);
+ for (List<ChunkMetadata> chunkMetadataListInDevice : chunkMetadataInDevice.values()) {
+ chunkMetadataList.addAll(chunkMetadataListInDevice);
+ }
+
+ Map<String, List<Chunk>> chunksInDevice = new HashMap<>();
+ chunkMetadataList.sort(Comparator.comparing(ChunkMetadata::getOffsetOfChunkHeader));
+ for (ChunkMetadata chunkMetadata : chunkMetadataList) {
+ Chunk chunk = readMemChunk(chunkMetadata);
+ String measurement = chunk.getHeader().getMeasurementID();
+ if (!chunksInDevice.containsKey(measurement)) {
+ chunksInDevice.put(measurement, new ArrayList<>());
+ }
+ chunksInDevice.get(measurement).add(chunk);
+ }
+ return chunksInDevice;
+ }
+
+ /**
+ * not thread safe.
+ *
+ * @param type given tsfile data type
+ */
+ public PageHeader readPageHeader(TSDataType type) throws IOException {
+ return PageHeaderV2.deserializeFrom(tsFileInput.wrapAsInputStream(), type);
+ }
+
+ public long position() throws IOException {
+ return tsFileInput.position();
+ }
+
+ public void position(long offset) throws IOException {
+ tsFileInput.position(offset);
+ }
+
+ public void skipPageData(PageHeader header) throws IOException {
+ tsFileInput.position(tsFileInput.position() + header.getCompressedSize());
+ }
+
+ public ByteBuffer readPage(PageHeader header, CompressionType type) throws IOException {
+ return readPage(header, type, -1);
+ }
+
+ private ByteBuffer readPage(PageHeader header, CompressionType type, long position)
+ throws IOException {
+ ByteBuffer buffer = readData(position, header.getCompressedSize());
+ IUnCompressor unCompressor = IUnCompressor.getUnCompressor(type);
+ ByteBuffer uncompressedBuffer = ByteBuffer.allocate(header.getUncompressedSize());
+ if (type == CompressionType.UNCOMPRESSED) {
+ return buffer;
+ }// FIXME if the buffer is not array-implemented.
+ unCompressor.uncompress(buffer.array(), buffer.position(), buffer.remaining(),
+ uncompressedBuffer.array(),
+ 0);
+ return uncompressedBuffer;
+ }
+
+ /**
+ * read one byte from the input. <br> this method is not thread safe
+ */
+ public byte readMarker() throws IOException {
+ markerBuffer.clear();
+ if (ReadWriteIOUtils.readAsPossible(tsFileInput, markerBuffer) == 0) {
+ throw new IOException("reach the end of the file.");
+ }
+ markerBuffer.flip();
+ return markerBuffer.get();
+ }
+
+ public void close() throws IOException {
+ if (resourceLogger.isDebugEnabled()) {
+ resourceLogger.debug("{} reader is closed.", file);
+ }
+ this.tsFileInput.close();
+ }
+
+ public String getFileName() {
+ return this.file;
+ }
+
+ public long fileSize() throws IOException {
+ return tsFileInput.size();
+ }
+
+ /**
+ * read data from tsFileInput, from the current position (if position = -1), or the given
+ * position. <br> if position = -1, the tsFileInput's position will be changed to the current
+ * position + real data size that been read. Other wise, the tsFileInput's position is not
+ * changed.
+ *
+ * @param position the start position of data in the tsFileInput, or the current position if
+ * position = -1
+ * @param size the size of data that want to read
+ * @return data that been read.
+ */
+ private ByteBuffer readData(long position, int size) throws IOException {
+ ByteBuffer buffer = ByteBuffer.allocate(size);
+ if (position < 0) {
+ if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer) != size) {
+ throw new IOException("reach the end of the data");
+ }
+ } else {
+ long actualReadSize = ReadWriteIOUtils.readAsPossible(tsFileInput, buffer, position, size);
+ if (actualReadSize != size) {
+ throw new IOException(
+ String.format("reach the end of the data. Size of data that want to read: %s,"
+ + "actual read size: %s, posiotion: %s", size, actualReadSize, position));
+ }
+ }
+ buffer.flip();
+ return buffer;
+ }
+
+ /**
+ * read data from tsFileInput, from the current position (if position = -1), or the given
+ * position.
+ *
+ * @param start the start position of data in the tsFileInput, or the current position if position
+ * = -1
+ * @param end the end position of data that want to read
+ * @return data that been read.
+ */
+ private ByteBuffer readData(long start, long end) throws IOException {
+ return readData(start, (int) (end - start));
+ }
+
+ /**
+ * notice, the target bytebuffer are not flipped.
+ */
+ public int readRaw(long position, int length, ByteBuffer target) throws IOException {
+ return ReadWriteIOUtils.readAsPossible(tsFileInput, target, position, length);
+ }
+
+ /**
+ * Self Check the file and return the position before where the data is safe.
+ *
+ * @param newSchema the schema on each time series in the file
+ * @param chunkGroupMetadataList ChunkGroupMetadata List
+ * @param versionInfo version pair List
+ * @param fastFinish if true and the file is complete, then newSchema and
+ * chunkGroupMetadataList parameter will be not modified.
+ * @return the position of the file that is fine. All data after the position in the file should
+ * be truncated.
+ */
+ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
+ public long selfCheck(Map<Path, MeasurementSchema> newSchema,
+ List<ChunkGroupMetadata> chunkGroupMetadataList,
+ List<Pair<Long, Long>> versionInfo,
+ boolean fastFinish) throws IOException {
+ File checkFile = FSFactoryProducer.getFSFactory().getFile(this.file);
+ long fileSize;
+ if (!checkFile.exists()) {
+ return TsFileCheckStatus.FILE_NOT_FOUND;
+ } else {
+ fileSize = checkFile.length();
+ }
+ ChunkMetadata currentChunk;
+ String measurementID;
+ TSDataType dataType;
+ long fileOffsetOfChunk;
+
+ // ChunkMetadata of current ChunkGroup
+ List<ChunkMetadata> chunkMetadataList = null;
+ String deviceID;
+
+ int headerLength = TSFileConfig.MAGIC_STRING.getBytes().length + TSFileConfig.VERSION_NUMBER_V2
+ .getBytes().length;
+ if (fileSize < headerLength) {
+ return TsFileCheckStatus.INCOMPATIBLE_FILE;
+ }
+ if (!TSFileConfig.MAGIC_STRING.equals(readHeadMagic()) || !TSFileConfig.VERSION_NUMBER_V2
+ .equals(readVersionNumberV2())) {
+ return TsFileCheckStatus.INCOMPATIBLE_FILE;
+ }
+
+ tsFileInput.position(headerLength);
+ if (fileSize == headerLength) {
+ return headerLength;
+ } else if (isComplete()) {
+ loadMetadataSize();
+ if (fastFinish) {
+ return TsFileCheckStatus.COMPLETE_FILE;
+ }
+ }
+ boolean newChunkGroup = true;
+ // not a complete file, we will recover it...
+ long truncatedSize = headerLength;
+ byte marker;
+ int chunkCnt = 0;
+ List<MeasurementSchema> measurementSchemaList = new ArrayList<>();
+ try {
+ while ((marker = this.readMarker()) != MetaMarker.SEPARATOR) {
+ switch (marker) {
+ case MetaMarker.CHUNK_HEADER:
+ // this is the first chunk of a new ChunkGroup.
+ if (newChunkGroup) {
+ newChunkGroup = false;
+ chunkMetadataList = new ArrayList<>();
+ }
+ fileOffsetOfChunk = this.position() - 1;
+ // if there is something wrong with a chunk, we will drop the whole ChunkGroup
+ // as different chunks may be created by the same insertions(sqls), and partial
+ // insertion is not tolerable
+ ChunkHeader chunkHeader = this.readChunkHeader();
+ measurementID = chunkHeader.getMeasurementID();
+ MeasurementSchema measurementSchema = new MeasurementSchema(measurementID,
+ chunkHeader.getDataType(),
+ chunkHeader.getEncodingType(), chunkHeader.getCompressionType());
+ measurementSchemaList.add(measurementSchema);
+ dataType = chunkHeader.getDataType();
+ Statistics<?> chunkStatistics = Statistics.getStatsByType(dataType);
+ for (int j = 0; j < chunkHeader.getNumOfPages(); j++) {
+ // a new Page
+ PageHeader pageHeader = this.readPageHeader(chunkHeader.getDataType());
+ chunkStatistics.mergeStatistics(pageHeader.getStatistics());
+ this.skipPageData(pageHeader);
+ }
+ currentChunk = new ChunkMetadata(measurementID, dataType, fileOffsetOfChunk,
+ chunkStatistics);
+ chunkMetadataList.add(currentChunk);
+ chunkCnt++;
+ break;
+ case MetaMarker.CHUNK_GROUP_HEADER:
+ // this is a chunk group
+ // if there is something wrong with the ChunkGroup Footer, we will drop this ChunkGroup
+ // because we can not guarantee the correctness of the deviceId.
+ ChunkGroupHeader chunkGroupFooter = this.readChunkGroupFooter();
+ deviceID = chunkGroupFooter.getDeviceID();
+ if (newSchema != null) {
+ for (MeasurementSchema tsSchema : measurementSchemaList) {
+ newSchema.putIfAbsent(new Path(deviceID, tsSchema.getMeasurementId()), tsSchema);
+ }
+ }
+ chunkGroupMetadataList.add(new ChunkGroupMetadata(deviceID, chunkMetadataList));
+ newChunkGroup = true;
+ truncatedSize = this.position();
+
+ totalChunkNum += chunkCnt;
+ chunkCnt = 0;
+ measurementSchemaList = new ArrayList<>();
+ break;
+ case MetaMarker.VERSION:
+ long version = readVersion();
+ versionInfo.add(new Pair<>(position(), version));
+ truncatedSize = this.position();
+ break;
+ default:
+ // the disk file is corrupted, using this file may be dangerous
+ throw new IOException("Unexpected marker " + marker);
+ }
+ }
+ // now we read the tail of the data section, so we are sure that the last
+ // ChunkGroupFooter is complete.
+ truncatedSize = this.position() - 1;
+ } catch (Exception e) {
+ logger.info("TsFile {} self-check cannot proceed at position {} " + "recovered, because : {}",
+ file, this.position(), e.getMessage());
+ }
+ // Despite the completeness of the data section, we will discard current FileMetadata
+ // so that we can continue to write data into this tsfile.
+ return truncatedSize;
+ }
+
+ public int getTotalChunkNum() {
+ return totalChunkNum;
+ }
+
+ /**
+ * get ChunkMetaDatas of given path
+ *
+ * @param path timeseries path
+ * @return List of ChunkMetaData
+ */
+ public List<ChunkMetadata> getChunkMetadataList(Path path) throws IOException {
+ TimeseriesMetadata timeseriesMetaData = readTimeseriesMetadata(path);
+ if (timeseriesMetaData == null) {
+ return Collections.emptyList();
+ }
+ List<ChunkMetadata> chunkMetadataList = readChunkMetaDataList(timeseriesMetaData);
+ chunkMetadataList.sort(Comparator.comparingLong(ChunkMetadata::getStartTime));
+ return chunkMetadataList;
+ }
+
+ /**
+ * get ChunkMetaDatas in given TimeseriesMetaData
+ *
+ * @return List of ChunkMetaData
+ */
+ public List<ChunkMetadata> readChunkMetaDataList(TimeseriesMetadata timeseriesMetaData)
+ throws IOException {
+ readFileMetadata();
+ List<Pair<Long, Long>> versionInfo = tsFileMetaData.getVersionInfo();
+ ArrayList<ChunkMetadata> chunkMetadataList = new ArrayList<>();
+ long startOffsetOfChunkMetadataList = timeseriesMetaData.getOffsetOfChunkMetaDataList();
+ int dataSizeOfChunkMetadataList = timeseriesMetaData.getDataSizeOfChunkMetaDataList();
+
+ ByteBuffer buffer = readData(startOffsetOfChunkMetadataList, dataSizeOfChunkMetadataList);
+ while (buffer.hasRemaining()) {
+ chunkMetadataList.add(ChunkMetadataV2.deserializeFrom(buffer));
+ }
+
+ VersionUtils.applyVersion(chunkMetadataList, versionInfo);
+
+ // minimize the storage of an ArrayList instance.
+ chunkMetadataList.trimToSize();
+ return chunkMetadataList;
+ }
+
+ /**
+ * get all measurements in this file
+ *
+ * @return measurement -> datatype
+ */
+ public Map<String, TSDataType> getAllMeasurements() throws IOException {
+ Map<String, TSDataType> result = new HashMap<>();
+ for (String device : getAllDevices()) {
+ Map<String, TimeseriesMetadata> timeseriesMetadataMap = readDeviceMetadata(device);
+ for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataMap.values()) {
+ result.put(timeseriesMetadata.getMeasurementId(), timeseriesMetadata.getTSDataType());
+ }
+ }
+ return result;
+ }
+
+ /**
+ * get device names which has valid chunks in [start, end)
+ *
+ * @param start start of the partition
+ * @param end end of the partition
+ * @return device names in range
+ */
+ public List<String> getDeviceNameInRange(long start, long end) throws IOException {
+ List<String> res = new ArrayList<>();
+ for (String device : getAllDevices()) {
+ Map<String, List<ChunkMetadata>> seriesMetadataMap = readChunkMetadataInDevice(device);
+ if (hasDataInPartition(seriesMetadataMap, start, end)) {
+ res.add(device);
+ }
+ }
+ return res;
+ }
+
+ /**
+ * Check if the device has at least one Chunk in this partition
+ *
+ * @param seriesMetadataMap chunkMetaDataList of each measurement
+ * @param start the start position of the space partition
+ * @param end the end position of the space partition
+ */
+ private boolean hasDataInPartition(Map<String, List<ChunkMetadata>> seriesMetadataMap,
+ long start, long end) {
+ for (List<ChunkMetadata> chunkMetadataList : seriesMetadataMap.values()) {
+ for (ChunkMetadata chunkMetadata : chunkMetadataList) {
+ org.apache.iotdb.tsfile.read.TsFileSequenceReader.LocateStatus location = MetadataQuerierByFileImpl
+ .checkLocateStatus(chunkMetadata, start, end);
+ if (location == LocateStatus.in) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+}
\ No newline at end of file
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/reader/page/PageReaderV2.java
similarity index 56%
copy from tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
copy to tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/reader/page/PageReaderV2.java
index efc3b01..0b5de6e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/reader/page/PageReaderV2.java
@@ -16,88 +16,32 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.tsfile.read.reader.page;
+package org.apache.iotdb.tsfile.v2.read.reader.page;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
-import java.util.List;
import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
+import org.apache.iotdb.tsfile.encoding.decoder.PlainDecoder;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
-import org.apache.iotdb.tsfile.read.common.TimeRange;
-import org.apache.iotdb.tsfile.read.reader.IPageReader;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.reader.page.PageReader;
import org.apache.iotdb.tsfile.utils.Binary;
-import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-public class PageReader implements IPageReader {
-
- private PageHeader pageHeader;
-
- private TSDataType dataType;
-
- /**
- * decoder for value column
- */
- private Decoder valueDecoder;
-
- /**
- * decoder for time column
- */
- private Decoder timeDecoder;
-
- /**
- * time column in memory
- */
- private ByteBuffer timeBuffer;
-
- /**
- * value column in memory
- */
- private ByteBuffer valueBuffer;
-
- private Filter filter;
-
- /**
- * A list of deleted intervals.
- */
- private List<TimeRange> deleteIntervalList;
- private int deleteCursor = 0;
+public class PageReaderV2 extends PageReader {
- public PageReader(ByteBuffer pageData, TSDataType dataType, Decoder valueDecoder,
+ public PageReaderV2(ByteBuffer pageData, TSDataType dataType, Decoder valueDecoder,
Decoder timeDecoder, Filter filter) {
this(null, pageData, dataType, valueDecoder, timeDecoder, filter);
}
- public PageReader(PageHeader pageHeader, ByteBuffer pageData, TSDataType dataType,
+ public PageReaderV2(PageHeader pageHeader, ByteBuffer pageData, TSDataType dataType,
Decoder valueDecoder, Decoder timeDecoder, Filter filter) {
- this.dataType = dataType;
- this.valueDecoder = valueDecoder;
- this.timeDecoder = timeDecoder;
- this.filter = filter;
- this.pageHeader = pageHeader;
- splitDataToTimeStampAndValue(pageData);
- }
-
- /**
- * split pageContent into two stream: time and value
- *
- * @param pageData uncompressed bytes size of time column, time column, value column
- */
- private void splitDataToTimeStampAndValue(ByteBuffer pageData) {
- int timeBufferLength = ReadWriteForEncodingUtils.readUnsignedVarInt(pageData);
-
- timeBuffer = pageData.slice();
- timeBuffer.limit(timeBufferLength);
-
- valueBuffer = pageData.slice();
- valueBuffer.position(timeBufferLength);
+ super(pageHeader, pageData, dataType, valueDecoder, timeDecoder, filter);
}
/**
@@ -119,7 +63,8 @@ public class PageReader implements IPageReader {
}
break;
case INT32:
- int anInt = valueDecoder.readInt(valueBuffer);
+ int anInt = (valueDecoder instanceof PlainDecoder) ?
+ valueBuffer.getInt() : valueDecoder.readInt(valueBuffer);
if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, anInt))) {
pageData.putInt(timestamp, anInt);
}
@@ -143,7 +88,10 @@ public class PageReader implements IPageReader {
}
break;
case TEXT:
- Binary aBinary = valueDecoder.readBinary(valueBuffer);
+ int length = valueBuffer.getInt();
+ byte[] buf = new byte[length];
+ valueBuffer.get(buf, 0, buf.length);
+ Binary aBinary = new Binary(buf);
if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aBinary))) {
pageData.putBinary(timestamp, aBinary);
}
@@ -154,40 +102,4 @@ public class PageReader implements IPageReader {
}
return pageData.flip();
}
-
- @Override
- public Statistics getStatistics() {
- return pageHeader.getStatistics();
- }
-
- @Override
- public void setFilter(Filter filter) {
- this.filter = filter;
- }
-
- public void setDeleteIntervalList(List<TimeRange> list) {
- this.deleteIntervalList = list;
- }
-
- public List<TimeRange> getDeleteIntervalList() {
- return deleteIntervalList;
- }
-
- @Override
- public boolean isModified() {
- return pageHeader.isModified();
- }
-
- private boolean isDeleted(long timestamp) {
- while (deleteIntervalList != null && deleteCursor < deleteIntervalList.size()) {
- if (deleteIntervalList.get(deleteCursor).contains(timestamp)) {
- return true;
- } else if (deleteIntervalList.get(deleteCursor).getMax() < timestamp) {
- deleteCursor++;
- } else {
- return false;
- }
- }
- return false;
- }
}