You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/09/02 15:14:10 UTC
[iotdb] branch rel/0.12 updated: [IOTDB-1610][To rel/0.12] Fix
TsFileRewriteTool writing incorrect data file (#3896)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new 663dbc7 [IOTDB-1610][To rel/0.12] Fix TsFileRewriteTool writing incorrect data file (#3896)
663dbc7 is described below
commit 663dbc7b3fc93554fce26746a485f487d225f295
Author: Haonan <hh...@outlook.com>
AuthorDate: Thu Sep 2 23:13:44 2021 +0800
[IOTDB-1610][To rel/0.12] Fix TsFileRewriteTool writing incorrect data file (#3896)
---
.../apache/iotdb/db/tools/TsFileRewriteTool.java | 11 ++-
.../db/tools/upgrade/TsFileOnlineUpgradeTool.java | 2 +-
.../iotdb/db/utils/TsFileRewriteToolTest.java | 109 +++++++++++++++++++++
.../iotdb/tsfile/read/TsFileSequenceReader.java | 4 +
.../tsfile/v2/read/TsFileSequenceReaderForV2.java | 4 -
5 files changed, 124 insertions(+), 6 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java b/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
index 53ad1e6..1dcd423 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
@@ -193,7 +193,10 @@ public class TsFileRewriteTool implements AutoCloseable {
reader.readPageHeader(dataType, header.getChunkType() == MetaMarker.CHUNK_HEADER);
boolean needToDecode = checkIfNeedToDecode(dataType, encoding, pageHeader);
needToDecodeInfo.add(needToDecode);
- ByteBuffer pageData = reader.readPage(pageHeader, header.getCompressionType());
+ ByteBuffer pageData =
+ !needToDecode
+ ? reader.readCompressedPage(pageHeader)
+ : reader.readPage(pageHeader, header.getCompressionType());
pageHeadersInChunk.add(pageHeader);
dataInChunk.add(pageData);
dataSize -= pageHeader.getSerializedPageSize();
@@ -421,6 +424,12 @@ public class TsFileRewriteTool implements AutoCloseable {
}
batchData.next();
}
+ partitionChunkWriterMap
+ .values()
+ .forEach(
+ writer -> {
+ writer.sealCurrentPage();
+ });
}
/** check if the file has correct magic strings and version number */
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
index c86b940..7e8f567 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
@@ -146,7 +146,7 @@ public class TsFileOnlineUpgradeTool extends TsFileRewriteTool {
needToDecodeInfo.add(needToDecode);
ByteBuffer pageData =
!needToDecode
- ? ((TsFileSequenceReaderForV2) reader).readCompressedPage(pageHeader)
+ ? reader.readCompressedPage(pageHeader)
: reader.readPage(pageHeader, header.getCompressionType());
pageHeadersInChunk.add(pageHeader);
dataInChunk.add(pageData);
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/TsFileRewriteToolTest.java b/server/src/test/java/org/apache/iotdb/db/utils/TsFileRewriteToolTest.java
index 0981b2a..5cf93f3 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/TsFileRewriteToolTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/TsFileRewriteToolTest.java
@@ -27,6 +27,8 @@ import org.apache.iotdb.db.qp.Planner;
import org.apache.iotdb.db.qp.executor.IPlanExecutor;
import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.tools.TsFileRewriteTool;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
@@ -327,4 +329,111 @@ public class TsFileRewriteToolTest {
}
}
}
+
+ @Test
+ public void splitOneTsfileWithTwoPagesTest() {
+ createOneTsFileWithTwoPages(DEVICE1, SENSOR1);
+ splitTwoPagesFileAndQueryCheck(DEVICE1, SENSOR1);
+ }
+
+ private void createOneTsFileWithTwoPages(String device, String sensor) {
+ TSFileConfig fileConfig = TSFileDescriptor.getInstance().getConfig();
+ int originMaxNumberOfPointsInPage = fileConfig.getMaxNumberOfPointsInPage();
+ fileConfig.setMaxNumberOfPointsInPage(2);
+ try {
+ File f = FSFactoryProducer.getFSFactory().getFile(path);
+ TsFileWriter tsFileWriter = new TsFileWriter(f);
+ // add measurements into file schema
+ try {
+ tsFileWriter.registerTimeseries(
+ new Path(device, sensor),
+ new MeasurementSchema(sensor, TSDataType.INT64, TSEncoding.RLE));
+ } catch (WriteProcessException e) {
+ Assert.fail(e.getMessage());
+ }
+
+ long timestamp = 1;
+ // First page is crossing time partitions
+ // Time stamp (1, 3600001)
+ TSRecord tsRecord = new TSRecord(timestamp, device);
+ DataPoint dataPoint = new LongDataPoint(sensor, timestamp);
+ tsRecord.addTuple(dataPoint);
+ tsFileWriter.write(tsRecord);
+ timestamp += newPartitionInterval;
+ tsRecord = new TSRecord(timestamp, device);
+ dataPoint = new LongDataPoint(sensor, timestamp);
+ tsRecord.addTuple(dataPoint);
+ tsFileWriter.write(tsRecord);
+ // Second page is in one time partition
+ // Time stamp (3600002, 3600003)
+ for (int i = 0; i < 2; i++) {
+ timestamp++;
+ tsRecord = new TSRecord(timestamp, device);
+ dataPoint = new LongDataPoint(sensor, timestamp);
+ tsRecord.addTuple(dataPoint);
+ tsFileWriter.write(tsRecord);
+ }
+ tsFileWriter.flushAllChunkGroups();
+ tsFileWriter.close();
+ fileConfig.setMaxNumberOfPointsInPage(originMaxNumberOfPointsInPage);
+ } catch (Throwable e) {
+ Assert.fail(e.getMessage());
+ fileConfig.setMaxNumberOfPointsInPage(originMaxNumberOfPointsInPage);
+ }
+ }
+
+ private void splitTwoPagesFileAndQueryCheck(String device, String sensor) {
+ File tsFile = new File(path);
+ TsFileResource tsFileResource = new TsFileResource(tsFile);
+ List<TsFileResource> splitResource = new ArrayList<>();
+ try {
+ TsFileRewriteTool.rewriteTsFile(tsFileResource, splitResource);
+ } catch (IOException | WriteProcessException e) {
+ Assert.fail(e.getMessage());
+ }
+ Assert.assertEquals(2, splitResource.size());
+
+ for (int i = 0; i < splitResource.size(); i++) {
+ try {
+ queryAndCheckTsFile(splitResource.get(i).getTsFilePath(), i, device, sensor);
+ long partitionId = splitResource.get(i).getTimePartition();
+ Assert.assertEquals(i, partitionId);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public void queryAndCheckTsFile(String tsFilePath, int index, String device, String sensor)
+ throws IOException {
+ try (TsFileSequenceReader reader = new TsFileSequenceReader(tsFilePath);
+ ReadOnlyTsFile readTsFile = new ReadOnlyTsFile(reader)) {
+ ArrayList<Path> paths = new ArrayList<>();
+ paths.add(new Path(device, sensor));
+
+ QueryExpression queryExpression = QueryExpression.create(paths, null);
+ QueryDataSet queryDataSet = readTsFile.query(queryExpression);
+ if (index == 0) {
+ // First file, contains time stamp 1
+ int count = 0;
+ while (queryDataSet.hasNext()) {
+ count++;
+ RowRecord rowRecord = queryDataSet.next();
+ long timeStamp = rowRecord.getTimestamp();
+ Assert.assertEquals(1, timeStamp);
+ }
+ Assert.assertEquals(1, count);
+ } else {
+ // Second file, contains time stamp 3600001, 3600002, 3600003
+ int count = 0;
+ while (queryDataSet.hasNext()) {
+ count++;
+ RowRecord rowRecord = queryDataSet.next();
+ long timeStamp = rowRecord.getTimestamp();
+ Assert.assertEquals(newPartitionInterval + count, timeStamp);
+ }
+ Assert.assertEquals(3, count);
+ }
+ }
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index dfebda9..3eb9d09 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -847,6 +847,10 @@ public class TsFileSequenceReader implements AutoCloseable {
tsFileInput.position(tsFileInput.position() + header.getCompressedSize());
}
+ public ByteBuffer readCompressedPage(PageHeader header) throws IOException {
+ return readData(-1, header.getCompressedSize());
+ }
+
public ByteBuffer readPage(PageHeader header, CompressionType type) throws IOException {
ByteBuffer buffer = readData(-1, header.getCompressedSize());
IUnCompressor unCompressor = IUnCompressor.getUnCompressor(type);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java
index f9c0e88..73eabed 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java
@@ -570,10 +570,6 @@ public class TsFileSequenceReaderForV2 extends TsFileSequenceReader implements A
return PageHeaderV2.deserializeFrom(tsFileInput.wrapAsInputStream(), type);
}
- public ByteBuffer readCompressedPage(PageHeader header) throws IOException {
- return readData(-1, header.getCompressedSize());
- }
-
public long readVersion() throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer) == 0) {