You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/09/02 15:14:10 UTC

[iotdb] branch rel/0.12 updated: [IOTDB-1610][To rel/0.12] Fix TsFileRewriteTool writing incorrect data file (#3896)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new 663dbc7  [IOTDB-1610][To rel/0.12] Fix TsFileRewriteTool writing incorrect data file (#3896)
663dbc7 is described below

commit 663dbc7b3fc93554fce26746a485f487d225f295
Author: Haonan <hh...@outlook.com>
AuthorDate: Thu Sep 2 23:13:44 2021 +0800

    [IOTDB-1610][To rel/0.12] Fix TsFileRewriteTool writing incorrect data file (#3896)
---
 .../apache/iotdb/db/tools/TsFileRewriteTool.java   |  11 ++-
 .../db/tools/upgrade/TsFileOnlineUpgradeTool.java  |   2 +-
 .../iotdb/db/utils/TsFileRewriteToolTest.java      | 109 +++++++++++++++++++++
 .../iotdb/tsfile/read/TsFileSequenceReader.java    |   4 +
 .../tsfile/v2/read/TsFileSequenceReaderForV2.java  |   4 -
 5 files changed, 124 insertions(+), 6 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java b/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
index 53ad1e6..1dcd423 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
@@ -193,7 +193,10 @@ public class TsFileRewriteTool implements AutoCloseable {
                   reader.readPageHeader(dataType, header.getChunkType() == MetaMarker.CHUNK_HEADER);
               boolean needToDecode = checkIfNeedToDecode(dataType, encoding, pageHeader);
               needToDecodeInfo.add(needToDecode);
-              ByteBuffer pageData = reader.readPage(pageHeader, header.getCompressionType());
+              ByteBuffer pageData =
+                  !needToDecode
+                      ? reader.readCompressedPage(pageHeader)
+                      : reader.readPage(pageHeader, header.getCompressionType());
               pageHeadersInChunk.add(pageHeader);
               dataInChunk.add(pageData);
               dataSize -= pageHeader.getSerializedPageSize();
@@ -421,6 +424,12 @@ public class TsFileRewriteTool implements AutoCloseable {
       }
       batchData.next();
     }
+    partitionChunkWriterMap
+        .values()
+        .forEach(
+            writer -> {
+              writer.sealCurrentPage();
+            });
   }
 
   /** check if the file has correct magic strings and version number */
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
index c86b940..7e8f567 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
@@ -146,7 +146,7 @@ public class TsFileOnlineUpgradeTool extends TsFileRewriteTool {
                 needToDecodeInfo.add(needToDecode);
                 ByteBuffer pageData =
                     !needToDecode
-                        ? ((TsFileSequenceReaderForV2) reader).readCompressedPage(pageHeader)
+                        ? reader.readCompressedPage(pageHeader)
                         : reader.readPage(pageHeader, header.getCompressionType());
                 pageHeadersInChunk.add(pageHeader);
                 dataInChunk.add(pageData);
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/TsFileRewriteToolTest.java b/server/src/test/java/org/apache/iotdb/db/utils/TsFileRewriteToolTest.java
index 0981b2a..5cf93f3 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/TsFileRewriteToolTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/TsFileRewriteToolTest.java
@@ -27,6 +27,8 @@ import org.apache.iotdb.db.qp.Planner;
 import org.apache.iotdb.db.qp.executor.IPlanExecutor;
 import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.tools.TsFileRewriteTool;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
@@ -327,4 +329,111 @@ public class TsFileRewriteToolTest {
       }
     }
   }
+
+  @Test
+  public void splitOneTsfileWithTwoPagesTest() {
+    createOneTsFileWithTwoPages(DEVICE1, SENSOR1);
+    splitTwoPagesFileAndQueryCheck(DEVICE1, SENSOR1);
+  }
+
+  private void createOneTsFileWithTwoPages(String device, String sensor) {
+    TSFileConfig fileConfig = TSFileDescriptor.getInstance().getConfig();
+    int originMaxNumberOfPointsInPage = fileConfig.getMaxNumberOfPointsInPage();
+    fileConfig.setMaxNumberOfPointsInPage(2);
+    try {
+      File f = FSFactoryProducer.getFSFactory().getFile(path);
+      TsFileWriter tsFileWriter = new TsFileWriter(f);
+      // add measurements into file schema
+      try {
+        tsFileWriter.registerTimeseries(
+            new Path(device, sensor),
+            new MeasurementSchema(sensor, TSDataType.INT64, TSEncoding.RLE));
+      } catch (WriteProcessException e) {
+        Assert.fail(e.getMessage());
+      }
+
+      long timestamp = 1;
+      // First page is crossing time partitions
+      // Time stamp (1, 3600001)
+      TSRecord tsRecord = new TSRecord(timestamp, device);
+      DataPoint dataPoint = new LongDataPoint(sensor, timestamp);
+      tsRecord.addTuple(dataPoint);
+      tsFileWriter.write(tsRecord);
+      timestamp += newPartitionInterval;
+      tsRecord = new TSRecord(timestamp, device);
+      dataPoint = new LongDataPoint(sensor, timestamp);
+      tsRecord.addTuple(dataPoint);
+      tsFileWriter.write(tsRecord);
+      // Second page is in one time partition
+      // Time stamp (3600002, 3600003)
+      for (int i = 0; i < 2; i++) {
+        timestamp++;
+        tsRecord = new TSRecord(timestamp, device);
+        dataPoint = new LongDataPoint(sensor, timestamp);
+        tsRecord.addTuple(dataPoint);
+        tsFileWriter.write(tsRecord);
+      }
+      tsFileWriter.flushAllChunkGroups();
+      tsFileWriter.close();
+      fileConfig.setMaxNumberOfPointsInPage(originMaxNumberOfPointsInPage);
+    } catch (Throwable e) {
+      Assert.fail(e.getMessage());
+      fileConfig.setMaxNumberOfPointsInPage(originMaxNumberOfPointsInPage);
+    }
+  }
+
+  private void splitTwoPagesFileAndQueryCheck(String device, String sensor) {
+    File tsFile = new File(path);
+    TsFileResource tsFileResource = new TsFileResource(tsFile);
+    List<TsFileResource> splitResource = new ArrayList<>();
+    try {
+      TsFileRewriteTool.rewriteTsFile(tsFileResource, splitResource);
+    } catch (IOException | WriteProcessException e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertEquals(2, splitResource.size());
+
+    for (int i = 0; i < splitResource.size(); i++) {
+      try {
+        queryAndCheckTsFile(splitResource.get(i).getTsFilePath(), i, device, sensor);
+        long partitionId = splitResource.get(i).getTimePartition();
+        Assert.assertEquals(i, partitionId);
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  public void queryAndCheckTsFile(String tsFilePath, int index, String device, String sensor)
+      throws IOException {
+    try (TsFileSequenceReader reader = new TsFileSequenceReader(tsFilePath);
+        ReadOnlyTsFile readTsFile = new ReadOnlyTsFile(reader)) {
+      ArrayList<Path> paths = new ArrayList<>();
+      paths.add(new Path(device, sensor));
+
+      QueryExpression queryExpression = QueryExpression.create(paths, null);
+      QueryDataSet queryDataSet = readTsFile.query(queryExpression);
+      if (index == 0) {
+        // First file, contains time stamp 1
+        int count = 0;
+        while (queryDataSet.hasNext()) {
+          count++;
+          RowRecord rowRecord = queryDataSet.next();
+          long timeStamp = rowRecord.getTimestamp();
+          Assert.assertEquals(1, timeStamp);
+        }
+        Assert.assertEquals(1, count);
+      } else {
+        // Second file, contains time stamp 3600001, 3600002, 3600003
+        int count = 0;
+        while (queryDataSet.hasNext()) {
+          count++;
+          RowRecord rowRecord = queryDataSet.next();
+          long timeStamp = rowRecord.getTimestamp();
+          Assert.assertEquals(newPartitionInterval + count, timeStamp);
+        }
+        Assert.assertEquals(3, count);
+      }
+    }
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index dfebda9..3eb9d09 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -847,6 +847,10 @@ public class TsFileSequenceReader implements AutoCloseable {
     tsFileInput.position(tsFileInput.position() + header.getCompressedSize());
   }
 
+  public ByteBuffer readCompressedPage(PageHeader header) throws IOException {
+    return readData(-1, header.getCompressedSize());
+  }
+
   public ByteBuffer readPage(PageHeader header, CompressionType type) throws IOException {
     ByteBuffer buffer = readData(-1, header.getCompressedSize());
     IUnCompressor unCompressor = IUnCompressor.getUnCompressor(type);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java
index f9c0e88..73eabed 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java
@@ -570,10 +570,6 @@ public class TsFileSequenceReaderForV2 extends TsFileSequenceReader implements A
     return PageHeaderV2.deserializeFrom(tsFileInput.wrapAsInputStream(), type);
   }
 
-  public ByteBuffer readCompressedPage(PageHeader header) throws IOException {
-    return readData(-1, header.getCompressedSize());
-  }
-
   public long readVersion() throws IOException {
     ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
     if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer) == 0) {