You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/04/11 06:26:16 UTC
[incubator-iotdb] branch master updated: [IOTDB-585] fix recover
version bug (#1024)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 107e7f7 [IOTDB-585] fix recover version bug (#1024)
107e7f7 is described below
commit 107e7f74e665d18ba189168aecafe186309641f0
Author: Jialin Qiao <qj...@mails.tsinghua.edu.cn>
AuthorDate: Sat Apr 11 14:26:01 2020 +0800
[IOTDB-585] fix recover version bug (#1024)
* fix recover and version bug
---
.../apache/iotdb/tsfile/TsFileSequenceRead.java | 4 ++
.../iotdb/db/engine/flush/MemTableFlushTask.java | 2 +-
.../iotdb/db/engine/merge/task/MergeFileTask.java | 6 +--
.../db/engine/merge/task/MergeMultiChunkTask.java | 3 +-
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 4 +-
.../writelog/recover/TsFileRecoverPerformer.java | 1 +
.../engine/storagegroup/TsFileProcessorTest.java | 3 +-
.../iotdb/db/integration/IoTDBRestartIT.java | 58 ++++++++++++++++++++++
.../query/reader/series/SeriesReaderTestUtil.java | 4 +-
tsfile/format-changelist.md | 1 +
.../org/apache/iotdb/tsfile/file/MetaMarker.java | 1 +
.../iotdb/tsfile/read/TsFileSequenceReader.java | 42 +++++++++-------
.../MetaMarker.java => utils/VersionUtils.java} | 34 +++++++------
.../apache/iotdb/tsfile/write/TsFileWriter.java | 10 ++--
.../write/writer/RestorableTsFileIOWriter.java | 6 ++-
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 16 ++++--
.../tsfile/read/TsFileSequenceReaderTest.java | 3 ++
.../iotdb/tsfile/write/TsFileIOWriterTest.java | 6 ++-
.../iotdb/tsfile/write/TsFileWriterTest.java | 2 +-
.../write/writer/RestorableTsFileIOWriterTest.java | 6 ++-
20 files changed, 150 insertions(+), 62 deletions(-)
diff --git a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
index 75c4823..0ec5fb4 100644
--- a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
+++ b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
@@ -98,6 +98,10 @@ public class TsFileSequenceRead {
ChunkGroupFooter chunkGroupFooter = reader.readChunkGroupFooter();
System.out.println("device: " + chunkGroupFooter.getDeviceID());
break;
+ case MetaMarker.VERSION:
+ long version = reader.readVersion();
+ System.out.println("version: " + version);
+ break;
default:
MetaMarker.handleUnexpectedMarker(marker);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 8b6e0ad..1703844 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -104,7 +104,7 @@ public class MemTableFlushTask {
ioTaskFuture.get();
try {
- writer.addVersionPair(new Pair<>(writer.getPos(), memTable.getVersion()));
+ writer.writeVersion(memTable.getVersion());
} catch (IOException e) {
throw new ExecutionException(e);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
index aa74eea..8c1b848 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
@@ -40,7 +40,6 @@ import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.writer.ForceAppendTsFileWriter;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
@@ -211,7 +210,7 @@ class MergeFileTask {
chunkMetaData.getVersion() > maxVersion ? chunkMetaData.getVersion() : maxVersion;
context.incTotalPointWritten(chunkMetaData.getNumOfPoints());
}
- fileWriter.addVersionPair(new Pair<>(fileWriter.getPos(), maxVersion));
+ fileWriter.writeVersion(maxVersion);
fileWriter.endChunkGroup();
}
@@ -242,8 +241,7 @@ class MergeFileTask {
fileWriter.startChunkGroup(path.getDevice());
long maxVersion = writeUnmergedChunks(chunkStartTimes, chunkMetadataList,
resource.getFileReader(seqFile), fileWriter);
- Pair<Long, Long> versionPair = new Pair<>(fileWriter.getPos(), maxVersion + 1);
- fileWriter.addVersionPair(versionPair);
+ fileWriter.writeVersion(maxVersion + 1);
fileWriter.endChunkGroup();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
index dc61946..286b229 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
@@ -50,7 +50,6 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
-import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
@@ -195,7 +194,7 @@ class MergeMultiChunkTask {
boolean dataWritten = mergeChunks(seqChunkMeta, isLastFile, fileSequenceReader, unseqReaders,
mergeFileWriter, currTsFile);
if (dataWritten) {
- mergeFileWriter.addVersionPair(new Pair<>(mergeFileWriter.getPos(), 0L));
+ mergeFileWriter.writeVersion(0L);
mergeFileWriter.endChunkGroup();
mergeLogger.logFilePosition(mergeFileWriter.getFile());
currTsFile.getStartTimeMap().put(deviceId, currDeviceMinTime);
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 82a7723..f2d45e2 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -576,9 +576,11 @@ public class PlanExecutor implements IPlanExecutor {
}
Map<Path, MeasurementSchema> schemaMap = new HashMap<>();
+ List<Pair<Long, Long>> versionInfo = new ArrayList<>();
+
List<ChunkGroupMetadata> chunkGroupMetadataList = new ArrayList<>();
try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath(), false)) {
- reader.selfCheck(schemaMap, chunkGroupMetadataList, false);
+ reader.selfCheck(schemaMap, chunkGroupMetadataList, versionInfo, false);
}
FileLoaderUtils.checkTsFileResource(tsFileResource);
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index aff32c8..71726cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -192,6 +192,7 @@ public class TsFileRecoverPerformer {
private void redoLogs(RestorableTsFileIOWriter restorableTsFileIOWriter)
throws StorageGroupProcessorException {
IMemTable recoverMemTable = new PrimitiveMemTable();
+ recoverMemTable.setVersion(versionController.nextVersion());
LogReplayer logReplayer = new LogReplayer(logNodePrefix, filePath, resource.getModFile(),
versionController, resource, recoverMemTable, acceptUnseq);
logReplayer.replayLogs();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
index 661cc45..4398041 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
@@ -183,8 +183,7 @@ public class TsFileProcessorTest {
Map<String, List<ChunkMetadata>> restoredChunkMetaDataListInChunkGroups = restorableTsFileIOWriter
.getDeviceChunkMetadataMap();
assertEquals(chunkMetaDataListInChunkGroups.size(), restoredChunkMetaDataListInChunkGroups.size());
- for (Map.Entry<String, List<ChunkMetadata>> entry1
- : chunkMetaDataListInChunkGroups.entrySet()) {
+ for (Map.Entry<String, List<ChunkMetadata>> entry1 : chunkMetaDataListInChunkGroups.entrySet()) {
for (Map.Entry<String, List<ChunkMetadata>> entry2
: restoredChunkMetaDataListInChunkGroups.entrySet()) {
assertEquals(entry1.getKey(), entry2.getKey());
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
index 931ecb2..e74b252 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
@@ -84,4 +84,62 @@ public class IoTDBRestartIT {
EnvironmentUtils.cleanEnv();
}
+
+
+ @Test
+ public void testRestartDelete()
+ throws SQLException, ClassNotFoundException, IOException, StorageEngineException {
+ EnvironmentUtils.envSetUp();
+ Class.forName(Config.JDBC_DRIVER_NAME);
+
+ try(Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+ "root");
+ Statement statement = connection.createStatement()){
+ statement.execute("insert into root.turbine.d1(timestamp,s1) values(1,1)");
+ statement.execute("insert into root.turbine.d1(timestamp,s1) values(2,2)");
+ statement.execute("insert into root.turbine.d1(timestamp,s1) values(3,3)");
+ }
+
+ EnvironmentUtils.restartDaemon();
+
+ try(Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+ "root");
+ Statement statement = connection.createStatement()){
+ statement.execute("delete from root.turbine.d1.s1 where time<=1");
+
+ boolean hasResultSet = statement.execute("SELECT s1 FROM root.turbine.d1");
+ assertTrue(hasResultSet);
+ String[] exp = new String[]{
+ "2,2",
+ "3,3"
+ };
+ ResultSet resultSet = statement.getResultSet();
+ int cnt = 0;
+ while (resultSet.next()) {
+ String result = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(2);
+ assertEquals(exp[cnt], result);
+ cnt++;
+ }
+
+ statement.execute("flush");
+ statement.execute("delete from root.turbine.d1.s1 where time<=2");
+
+ hasResultSet = statement.execute("SELECT s1 FROM root.turbine.d1");
+ assertTrue(hasResultSet);
+ exp = new String[]{
+ "3,3"
+ };
+ resultSet = statement.getResultSet();
+ cnt = 0;
+ while (resultSet.next()) {
+ String result = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(2);
+ assertEquals(exp[cnt], result);
+ cnt++;
+ }
+ }
+
+ EnvironmentUtils.cleanEnv();
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
index 86ab955..74689eb 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
@@ -145,9 +145,7 @@ public class SeriesReaderTestUtil {
}
if ((i + 1) % flushInterval == 0) {
fileWriter.flushAllChunkGroups();
- Pair<Long, Long> versionPair = new Pair<>(fileWriter.getIOWriter().getPos(),
- tsFileResource.getHistoricalVersions().iterator().next());
- fileWriter.addVersionPair(versionPair);
+ fileWriter.writeVersion(tsFileResource.getHistoricalVersions().iterator().next());
}
}
fileWriter.close();
diff --git a/tsfile/format-changelist.md b/tsfile/format-changelist.md
index 7fb6d16..37eb392 100644
--- a/tsfile/format-changelist.md
+++ b/tsfile/format-changelist.md
@@ -28,6 +28,7 @@ Last Updated on 2019-11-28 by Jialin Qiao.
| ---- | ------------------------------------------------------------ | --------------- | ------------------------------------------------------------ |
| 587 | [IOTDB-325] Refactor Statistics | qiaojialin | Move start time, end time, count in PageHeader and ChunkMetadata into Statistics; Remove maxTombstoneTime in ChunkHeader |
| 855 | New TsFile | HTHou | Remove ChunkGroupMetadata, store ChunkMetadata list by series, Add TimeseriesMetadata for each series |
+| 1024 | [IOTDB-585] Fix recover version bug | qiaojialin | Add MetaMarker.VERSION and version behind each flushing memtable (flushAllChunkGroups) |
# 0.8.0 (version-0) -> version-1
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/MetaMarker.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/MetaMarker.java
index 306b49c..758f0d5 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/MetaMarker.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/MetaMarker.java
@@ -29,6 +29,7 @@ public class MetaMarker {
public static final byte CHUNK_GROUP_FOOTER = 0;
public static final byte CHUNK_HEADER = 1;
public static final byte SEPARATOR = 2;
+ public static final byte VERSION = 3;
private MetaMarker() {
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 999ac28..eb0c6ae 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.tsfile.read.controller.MetadataQuerierByFileImpl;
import org.apache.iotdb.tsfile.read.reader.TsFileInput;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.utils.VersionUtils;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -347,22 +348,11 @@ public class TsFileSequenceReader implements AutoCloseable {
// set version in ChunkMetadata
List<Pair<Long, Long>> versionInfo = tsFileMetaData.getVersionInfo();
for (Entry<String, List<ChunkMetadata>> entry : seriesMetadata.entrySet()) {
- setVersion(entry.getValue(), versionInfo);
+ VersionUtils.applyVersion(entry.getValue(), versionInfo);
}
return seriesMetadata;
}
-
- private void setVersion(List<ChunkMetadata> chunkMetadataList, List<Pair<Long, Long>> versionInfo) {
- int versionIndex = 0;
- for (ChunkMetadata chunkMetadata : chunkMetadataList) {
- while (chunkMetadata.getOffsetOfChunkHeader() >= versionInfo.get(versionIndex).left) {
- versionIndex++;
- }
- chunkMetadata.setVersion(versionInfo.get(versionIndex).right);
- }
- }
-
/**
* this function return all timeseries names in this file
*
@@ -411,6 +401,15 @@ public class TsFileSequenceReader implements AutoCloseable {
return ChunkGroupFooter.deserializeFrom(tsFileInput, position, markerRead);
}
+ public long readVersion() throws IOException {
+ ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+ if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer) == 0) {
+ throw new IOException("reach the end of the file.");
+ }
+ buffer.flip();
+ return buffer.getLong();
+ }
+
/**
* read data from current position of the input, and deserialize it to a CHUNK_HEADER. <br> This
* method is not threadsafe.
@@ -570,7 +569,9 @@ public class TsFileSequenceReader implements AutoCloseable {
*/
public long selfCheck(Map<Path, MeasurementSchema> newSchema,
- List<ChunkGroupMetadata> chunkGroupMetadataList, boolean fastFinish) throws IOException {
+ List<ChunkGroupMetadata> chunkGroupMetadataList,
+ List<Pair<Long, Long>> versionInfo,
+ boolean fastFinish) throws IOException {
File checkFile = FSFactoryProducer.getFSFactory().getFile(this.file);
long fileSize;
if (!checkFile.exists()) {
@@ -587,18 +588,18 @@ public class TsFileSequenceReader implements AutoCloseable {
List<ChunkMetadata> chunkMetadataList = null;
String deviceID;
- int position = TSFileConfig.MAGIC_STRING.getBytes().length + TSFileConfig.VERSION_NUMBER
+ int headerLength = TSFileConfig.MAGIC_STRING.getBytes().length + TSFileConfig.VERSION_NUMBER
.getBytes().length;
- if (fileSize < position) {
+ if (fileSize < headerLength) {
return TsFileCheckStatus.INCOMPATIBLE_FILE;
}
String magic = readHeadMagic(true);
- tsFileInput.position(position);
+ tsFileInput.position(headerLength);
if (!magic.equals(TSFileConfig.MAGIC_STRING)) {
return TsFileCheckStatus.INCOMPATIBLE_FILE;
}
- if (fileSize == position) {
+ if (fileSize == headerLength) {
return TsFileCheckStatus.ONLY_MAGIC_HEAD;
} else if (readTailMagic().equals(magic)) {
loadMetadataSize();
@@ -663,6 +664,11 @@ public class TsFileSequenceReader implements AutoCloseable {
chunkCnt = 0;
measurementSchemaList = new ArrayList<>();
break;
+ case MetaMarker.VERSION:
+ long version = readVersion();
+ versionInfo.add(new Pair<>(position(), version));
+ truncatedPosition = this.position();
+ break;
default:
// the disk file is corrupted, using this file may be dangerous
throw new IOException("Unexpected marker " + marker);
@@ -721,7 +727,7 @@ public class TsFileSequenceReader implements AutoCloseable {
chunkMetadataList.add(ChunkMetadata.deserializeFrom(buffer));
}
- setVersion(chunkMetadataList, versionInfo);
+ VersionUtils.applyVersion(chunkMetadataList, versionInfo);
return chunkMetadataList;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/MetaMarker.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/VersionUtils.java
similarity index 50%
copy from tsfile/src/main/java/org/apache/iotdb/tsfile/file/MetaMarker.java
copy to tsfile/src/main/java/org/apache/iotdb/tsfile/utils/VersionUtils.java
index 306b49c..bd31800 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/MetaMarker.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/VersionUtils.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -16,24 +16,28 @@
* specific language governing permissions and limitations
* under the License.
*/
+package org.apache.iotdb.tsfile.utils;
-package org.apache.iotdb.tsfile.file;
+import java.util.List;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
-import java.io.IOException;
+public class VersionUtils {
-/**
- * MetaMarker denotes the type of headers and footers. Enum is not used for space saving.
- */
-public class MetaMarker {
-
- public static final byte CHUNK_GROUP_FOOTER = 0;
- public static final byte CHUNK_HEADER = 1;
- public static final byte SEPARATOR = 2;
-
- private MetaMarker() {
+ private VersionUtils() {
+ throw new IllegalStateException("Utility class");
}
- public static void handleUnexpectedMarker(byte marker) throws IOException {
- throw new IOException("Unexpected marker " + marker);
+ public static void applyVersion(List<ChunkMetadata> chunkMetadataList, List<Pair<Long, Long>> versionInfo) {
+ if (versionInfo == null || versionInfo.isEmpty()) {
+ return;
+ }
+ int versionIndex = 0;
+ for (ChunkMetadata chunkMetadata : chunkMetadataList) {
+ while (chunkMetadata.getOffsetOfChunkHeader() >= versionInfo.get(versionIndex).left) {
+ versionIndex++;
+ }
+ chunkMetadata.setVersion(versionInfo.get(versionIndex).right);
+ }
}
+
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
index 292e8f8..2ded552 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.exception.write.NoMeasurementException;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.chunk.ChunkGroupWriterImpl;
import org.apache.iotdb.tsfile.write.chunk.IChunkGroupWriter;
import org.apache.iotdb.tsfile.write.record.RowBatch;
@@ -34,10 +33,8 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import org.apache.iotdb.tsfile.write.writer.TsFileOutput;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
@@ -326,8 +323,7 @@ public class TsFileWriter implements AutoCloseable {
throw new IOException(
String.format(
"Flushed data size is inconsistent with computation! Estimated: %d, Actual: %d",
- dataSize,
- fileWriter.getPos() - pos));
+ dataSize, fileWriter.getPos() - pos));
}
fileWriter.endChunkGroup();
}
@@ -364,7 +360,7 @@ public class TsFileWriter implements AutoCloseable {
return this.fileWriter;
}
- public void addVersionPair(Pair<Long, Long> versionPair) {
- fileWriter.addVersionPair(versionPair);
+ public void writeVersion(long versionPair) throws IOException {
+ fileWriter.writeVersion(versionPair);
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
index 3af18fc..a55e8a5 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.tsfile.read.TsFileCheckStatus;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.VersionUtils;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,7 +91,7 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
}
// uncompleted file
- truncatedPosition = reader.selfCheck(knownSchemas, chunkGroupMetadataList, true);
+ truncatedPosition = reader.selfCheck(knownSchemas, chunkGroupMetadataList, versionInfo, true);
totalChunkNum = reader.getTotalChunkNum();
if (truncatedPosition == TsFileCheckStatus.INCOMPATIBLE_FILE) {
out.close();
@@ -195,6 +196,9 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
if (!newlyFlushedMetadataList.isEmpty()) {
for (ChunkGroupMetadata chunkGroupMetadata : newlyFlushedMetadataList) {
List<ChunkMetadata> rowMetaDataList = chunkGroupMetadata.getChunkMetadataList();
+
+ VersionUtils.applyVersion(rowMetaDataList, versionInfo);
+
String device = chunkGroupMetadata.getDevice();
for (ChunkMetadata chunkMetaData : rowMetaDataList) {
String measurementId = chunkMetaData.getMeasurementUid();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 6f0ed79..d26b1b9 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.tsfile.utils.BytesUtils;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.utils.VersionUtils;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,7 +84,7 @@ public class TsFileIOWriter {
private long markedPosition;
private String currentChunkGroupDeviceId;
private long currentChunkGroupStartOffset;
- private List<Pair<Long, Long>> versionInfo = new ArrayList<>();
+ protected List<Pair<Long, Long>> versionInfo = new ArrayList<>();
/**
* empty construct function.
@@ -333,6 +334,7 @@ public class TsFileIOWriter {
Map<String, List<ChunkMetadata>> deviceChunkMetadataMap = new HashMap<>();
for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
+ VersionUtils.applyVersion(chunkGroupMetadata.getChunkMetadataList(), versionInfo);
deviceChunkMetadataMap.computeIfAbsent(chunkGroupMetadata.getDevice(), k -> new ArrayList<>())
.addAll(chunkGroupMetadata.getChunkMetadataList());
}
@@ -416,8 +418,14 @@ public class TsFileIOWriter {
}
}
- public void addVersionPair(Pair<Long, Long> versionPair) {
- versionInfo.add(versionPair);
+ /**
+ * write MetaMarker.VERSION with version
+ * Then, cache offset-version in versionInfo
+ */
+ public void writeVersion(long version) throws IOException {
+ ReadWriteIOUtils.write(MetaMarker.VERSION, out.wrapAsStream());
+ ReadWriteIOUtils.write(version, out.wrapAsStream());
+ versionInfo.add(new Pair<>(getPos(), version));
}
public void setDefaultVersionPair() {
@@ -433,6 +441,6 @@ public class TsFileIOWriter {
* @return TsFileOutput
*/
public TsFileOutput getIOWriterOut() {
- return this.out;
+ return out;
}
}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/TsFileSequenceReaderTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/TsFileSequenceReaderTest.java
index 933eec2..c1c5d18 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/TsFileSequenceReaderTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/TsFileSequenceReaderTest.java
@@ -83,6 +83,9 @@ public class TsFileSequenceReaderTest {
metadatas.add(pair);
startOffset = endOffset;
break;
+ case MetaMarker.VERSION:
+ reader.readVersion();
+ break;
default:
MetaMarker.handleUnexpectedMarker(marker);
}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
index 6e13ba5..29afae4 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
@@ -66,7 +66,7 @@ public class TsFileIOWriterTest {
writer.endCurrentChunk();
writer.endChunkGroup();
- writer.addVersionPair(new Pair<>(writer.getPos(), 0L));
+ writer.writeVersion(0L);
// end file
writer.endFile();
}
@@ -101,6 +101,10 @@ public class TsFileIOWriterTest {
Assert.assertEquals(deviceId, footer.getDeviceID());
// separator
+ Assert.assertEquals(MetaMarker.VERSION, reader.readMarker());
+
+ reader.readVersion();
+
Assert.assertEquals(MetaMarker.SEPARATOR, reader.readMarker());
// FileMetaData
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileWriterTest.java
index 9beb695..a9c7580 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileWriterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileWriterTest.java
@@ -241,7 +241,7 @@ public class TsFileWriterTest {
public void flushForTestWithVersion() throws IOException {
//The interface is just for test
writer.flushAllChunkGroups();
- writer.addVersionPair(new Pair<>(writer.getIOWriter().getPos(), 10L));
+ writer.writeVersion(10L);
closeFile();
readNothing();
}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java
index 51fee77..255ab84 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java
@@ -147,14 +147,16 @@ public class RestorableTsFileIOWriterTest {
writer.write(new TSRecord(2, "d1").addTuple(new FloatDataPoint("s1", 5))
.addTuple(new FloatDataPoint("s2", 4)));
writer.flushAllChunkGroups();
+ writer.writeVersion(0);
long pos = writer.getIOWriter().getPos();
- // let's delete one byte.
+ // let's delete one byte. the version is broken
writer.getIOWriter().out.truncate(pos - 1);
writer.getIOWriter().close();
RestorableTsFileIOWriter rWriter = new RestorableTsFileIOWriter(file);
writer = new TsFileWriter(rWriter);
writer.close();
- assertEquals(TsFileIOWriter.magicStringBytes.length, rWriter.getTruncatedPosition());
+ // truncate version marker and version
+ assertEquals(pos - 1 - Long.BYTES, rWriter.getTruncatedPosition());
assertTrue(file.delete());
}