You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/09/02 15:17:11 UTC

[iotdb] branch rewrtietool created (now 0c676de)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a change to branch rewrtietool
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 0c676de  [IOTDB-1610] Fix TsFileRewriteTool writing incorrect data file (#3896)

This branch includes the following new commits:

     new 0c676de  [IOTDB-1610] Fix TsFileRewriteTool writing incorrect data file (#3896)

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: [IOTDB-1610] Fix TsFileRewriteTool writing incorrect data file (#3896)

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch rewrtietool
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 0c676dec7152fb0894bd424aaba3f27822b8ff36
Author: Haonan <hh...@outlook.com>
AuthorDate: Thu Sep 2 23:13:44 2021 +0800

    [IOTDB-1610] Fix TsFileRewriteTool writing incorrect data file (#3896)
---
 .../apache/iotdb/db/tools/TsFileRewriteTool.java   |  11 ++-
 .../db/tools/upgrade/TsFileOnlineUpgradeTool.java  |   2 +-
 .../iotdb/db/utils/TsFileRewriteToolTest.java      | 109 +++++++++++++++++++++
 .../iotdb/tsfile/read/TsFileSequenceReader.java    |   4 +
 .../tsfile/v2/read/TsFileSequenceReaderForV2.java  |   4 -
 5 files changed, 124 insertions(+), 6 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java b/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
index 3feed55..dead0db 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
@@ -193,7 +193,10 @@ public class TsFileRewriteTool implements AutoCloseable {
                   reader.readPageHeader(dataType, header.getChunkType() == MetaMarker.CHUNK_HEADER);
               boolean needToDecode = checkIfNeedToDecode(dataType, encoding, pageHeader);
               needToDecodeInfo.add(needToDecode);
-              ByteBuffer pageData = reader.readPage(pageHeader, header.getCompressionType());
+              ByteBuffer pageData =
+                  !needToDecode
+                      ? reader.readCompressedPage(pageHeader)
+                      : reader.readPage(pageHeader, header.getCompressionType());
               pageHeadersInChunk.add(pageHeader);
               dataInChunk.add(pageData);
               dataSize -= pageHeader.getSerializedPageSize();
@@ -421,6 +424,12 @@ public class TsFileRewriteTool implements AutoCloseable {
       }
       batchData.next();
     }
+    partitionChunkWriterMap
+        .values()
+        .forEach(
+            writer -> {
+              writer.sealCurrentPage();
+            });
   }
 
   /** check if the file has correct magic strings and version number */
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
index 0198844..23e9c7f 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
@@ -142,7 +142,7 @@ public class TsFileOnlineUpgradeTool extends TsFileRewriteTool {
                 needToDecodeInfo.add(needToDecode);
                 ByteBuffer pageData =
                     !needToDecode
-                        ? ((TsFileSequenceReaderForV2) reader).readCompressedPage(pageHeader)
+                        ? reader.readCompressedPage(pageHeader)
                         : reader.readPage(pageHeader, header.getCompressionType());
                 pageHeadersInChunk.add(pageHeader);
                 dataInChunk.add(pageData);
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/TsFileRewriteToolTest.java b/server/src/test/java/org/apache/iotdb/db/utils/TsFileRewriteToolTest.java
index 9f3475b..b30cfe6 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/TsFileRewriteToolTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/TsFileRewriteToolTest.java
@@ -27,6 +27,8 @@ import org.apache.iotdb.db.qp.Planner;
 import org.apache.iotdb.db.qp.executor.IPlanExecutor;
 import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.tools.TsFileRewriteTool;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
@@ -327,4 +329,111 @@ public class TsFileRewriteToolTest {
       }
     }
   }
+
+  @Test
+  public void splitOneTsfileWithTwoPagesTest() {
+    createOneTsFileWithTwoPages(DEVICE1, SENSOR1);
+    splitTwoPagesFileAndQueryCheck(DEVICE1, SENSOR1);
+  }
+
+  private void createOneTsFileWithTwoPages(String device, String sensor) {
+    TSFileConfig fileConfig = TSFileDescriptor.getInstance().getConfig();
+    int originMaxNumberOfPointsInPage = fileConfig.getMaxNumberOfPointsInPage();
+    fileConfig.setMaxNumberOfPointsInPage(2);
+    try {
+      File f = FSFactoryProducer.getFSFactory().getFile(path);
+      TsFileWriter tsFileWriter = new TsFileWriter(f);
+      // add measurements into file schema
+      try {
+        tsFileWriter.registerTimeseries(
+            new Path(device, sensor),
+            new MeasurementSchema(sensor, TSDataType.INT64, TSEncoding.RLE));
+      } catch (WriteProcessException e) {
+        Assert.fail(e.getMessage());
+      }
+
+      long timestamp = 1;
+      // First page is crossing time partitions
+      // Time stamp (1, 3600001)
+      TSRecord tsRecord = new TSRecord(timestamp, device);
+      DataPoint dataPoint = new LongDataPoint(sensor, timestamp);
+      tsRecord.addTuple(dataPoint);
+      tsFileWriter.write(tsRecord);
+      timestamp += newPartitionInterval;
+      tsRecord = new TSRecord(timestamp, device);
+      dataPoint = new LongDataPoint(sensor, timestamp);
+      tsRecord.addTuple(dataPoint);
+      tsFileWriter.write(tsRecord);
+      // Second page is in one time partition
+      // Time stamp (3600002, 3600003)
+      for (int i = 0; i < 2; i++) {
+        timestamp++;
+        tsRecord = new TSRecord(timestamp, device);
+        dataPoint = new LongDataPoint(sensor, timestamp);
+        tsRecord.addTuple(dataPoint);
+        tsFileWriter.write(tsRecord);
+      }
+      tsFileWriter.flushAllChunkGroups();
+      tsFileWriter.close();
+      fileConfig.setMaxNumberOfPointsInPage(originMaxNumberOfPointsInPage);
+    } catch (Throwable e) {
+      Assert.fail(e.getMessage());
+      fileConfig.setMaxNumberOfPointsInPage(originMaxNumberOfPointsInPage);
+    }
+  }
+
+  private void splitTwoPagesFileAndQueryCheck(String device, String sensor) {
+    File tsFile = new File(path);
+    TsFileResource tsFileResource = new TsFileResource(tsFile);
+    List<TsFileResource> splitResource = new ArrayList<>();
+    try {
+      TsFileRewriteTool.rewriteTsFile(tsFileResource, splitResource);
+    } catch (IOException | WriteProcessException e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertEquals(2, splitResource.size());
+
+    for (int i = 0; i < splitResource.size(); i++) {
+      try {
+        queryAndCheckTsFile(splitResource.get(i).getTsFilePath(), i, device, sensor);
+        long partitionId = splitResource.get(i).getTimePartition();
+        Assert.assertEquals(i, partitionId);
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  public void queryAndCheckTsFile(String tsFilePath, int index, String device, String sensor)
+      throws IOException {
+    try (TsFileSequenceReader reader = new TsFileSequenceReader(tsFilePath);
+        ReadOnlyTsFile readTsFile = new ReadOnlyTsFile(reader)) {
+      ArrayList<Path> paths = new ArrayList<>();
+      paths.add(new Path(device, sensor));
+
+      QueryExpression queryExpression = QueryExpression.create(paths, null);
+      QueryDataSet queryDataSet = readTsFile.query(queryExpression);
+      if (index == 0) {
+        // First file, contains time stamp 1
+        int count = 0;
+        while (queryDataSet.hasNext()) {
+          count++;
+          RowRecord rowRecord = queryDataSet.next();
+          long timeStamp = rowRecord.getTimestamp();
+          Assert.assertEquals(1, timeStamp);
+        }
+        Assert.assertEquals(1, count);
+      } else {
+        // Second file, contains time stamp 3600001, 3600002, 3600003
+        int count = 0;
+        while (queryDataSet.hasNext()) {
+          count++;
+          RowRecord rowRecord = queryDataSet.next();
+          long timeStamp = rowRecord.getTimestamp();
+          Assert.assertEquals(newPartitionInterval + count, timeStamp);
+        }
+        Assert.assertEquals(3, count);
+      }
+    }
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 5b5e23a..16b1a2a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -891,6 +891,10 @@ public class TsFileSequenceReader implements AutoCloseable {
     tsFileInput.position(tsFileInput.position() + header.getCompressedSize());
   }
 
+  public ByteBuffer readCompressedPage(PageHeader header) throws IOException {
+    return readData(-1, header.getCompressedSize());
+  }
+
   public ByteBuffer readPage(PageHeader header, CompressionType type) throws IOException {
     ByteBuffer buffer = readData(-1, header.getCompressedSize());
     IUnCompressor unCompressor = IUnCompressor.getUnCompressor(type);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java
index 117da82..cbda040 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java
@@ -568,10 +568,6 @@ public class TsFileSequenceReaderForV2 extends TsFileSequenceReader implements A
     return PageHeaderV2.deserializeFrom(tsFileInput.wrapAsInputStream(), type);
   }
 
-  public ByteBuffer readCompressedPage(PageHeader header) throws IOException {
-    return readData(-1, header.getCompressedSize());
-  }
-
   public long readVersion() throws IOException {
     ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
     if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer) == 0) {