You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2022/10/27 02:16:44 UTC

[iotdb] branch master updated: [IOTDB-4693] Support broken tsfile rewrite (#7677)

This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new baed39decc [IOTDB-4693] Support broken tsfile rewrite (#7677)
baed39decc is described below

commit baed39decc2ac76e77aac3de7507aa401c74654c
Author: Liu Xuxin <37...@users.noreply.github.com>
AuthorDate: Thu Oct 27 10:16:39 2022 +0800

    [IOTDB-4693] Support broken tsfile rewrite (#7677)
---
 .../db/integration/IoTDBRewriteTsFileToolIT.java   | 523 ++++++++++++++++-
 .../java/org/apache/iotdb/RewriteTsFileTool.java   | 620 +++++++++++++++------
 2 files changed, 932 insertions(+), 211 deletions(-)

diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBRewriteTsFileToolIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBRewriteTsFileToolIT.java
index 00175fb202..012f5b6c92 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBRewriteTsFileToolIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBRewriteTsFileToolIT.java
@@ -19,11 +19,14 @@
 package org.apache.iotdb.db.integration;
 
 import org.apache.iotdb.RewriteTsFileTool;
+import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.jdbc.Config;
 
@@ -36,6 +39,7 @@ import java.io.File;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.sql.Statement;
 
 public class IoTDBRewriteTsFileToolIT {
@@ -55,6 +59,41 @@ public class IoTDBRewriteTsFileToolIT {
   @After
   public void tearDown() throws Exception {
     EnvironmentUtils.cleanEnv();
+    if (tmpDir != null) {
+      FileUtils.deleteDirectory(new File(tmpDir));
+    }
+  }
+
+  public void unload(Statement statement)
+      throws IllegalPathException, SQLException, StorageEngineException {
+    for (TsFileResource resource :
+        StorageEngine.getInstance()
+            .getProcessor(new PartialPath("root.sg"))
+            .getSequenceFileList()) {
+      if (tmpDir == null) {
+        tmpDir =
+            resource
+                    .getTsFile()
+                    .getParentFile()
+                    .getParentFile()
+                    .getParentFile()
+                    .getParentFile()
+                    .getParent()
+                + File.separator
+                + "tmp";
+        File tmpFile = new File(tmpDir);
+        if (!tmpFile.exists()) {
+          tmpFile.mkdirs();
+        }
+      }
+      statement.execute(String.format("unload '%s' '%s'", resource.getTsFilePath(), tmpDir));
+    }
+    for (TsFileResource resource :
+        StorageEngine.getInstance()
+            .getProcessor(new PartialPath("root.sg"))
+            .getUnSequenceFileList()) {
+      statement.execute(String.format("unload '%s' '%s'", resource.getTsFilePath(), tmpDir));
+    }
   }
 
   public void prepareTsFiles() throws Exception {
@@ -92,34 +131,7 @@ public class IoTDBRewriteTsFileToolIT {
 
       statement.execute("delete from root.sg.a.s1 where time > 2");
 
-      for (TsFileResource resource :
-          StorageEngine.getInstance()
-              .getProcessor(new PartialPath("root.sg"))
-              .getSequenceFileList()) {
-        if (tmpDir == null) {
-          tmpDir =
-              resource
-                      .getTsFile()
-                      .getParentFile()
-                      .getParentFile()
-                      .getParentFile()
-                      .getParentFile()
-                      .getParent()
-                  + File.separator
-                  + "tmp";
-          File tmpFile = new File(tmpDir);
-          if (!tmpFile.exists()) {
-            tmpFile.mkdirs();
-          }
-        }
-        statement.execute(String.format("unload '%s' '%s'", resource.getTsFilePath(), tmpDir));
-      }
-      for (TsFileResource resource :
-          StorageEngine.getInstance()
-              .getProcessor(new PartialPath("root.sg"))
-              .getUnSequenceFileList()) {
-        statement.execute(String.format("unload '%s' '%s'", resource.getTsFilePath(), tmpDir));
-      }
+      unload(statement);
     }
   }
 
@@ -151,4 +163,459 @@ public class IoTDBRewriteTsFileToolIT {
       Assert.fail();
     }
   }
+
+  @Test
+  public void testAlignedTsFileR() throws Exception {
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      for (int fileIndex = 0; fileIndex < 5; fileIndex++) {
+        for (long timestamp = fileIndex * 512, end = fileIndex * 512 + 512;
+            timestamp < end;
+            ++timestamp) {
+          for (int deviceIndex = 0; deviceIndex < 5; ++deviceIndex) {
+            statement.execute(
+                String.format(
+                    "insert into root.sg.d%d(time, s1, s2, s3) aligned values(%d, %d, %d, %d)",
+                    deviceIndex, timestamp, timestamp + 1, timestamp + 2, timestamp + 3));
+          }
+        }
+        statement.execute("FLUSH");
+      }
+      unload(statement);
+    }
+    RewriteTsFileTool.main(
+        new String[] {
+          "-h",
+          "127.0.0.1",
+          "-p",
+          "6667",
+          "-u",
+          "root",
+          "-pw",
+          "root",
+          "-f",
+          tmpDir,
+          "-rm",
+          "r",
+          "-ig"
+        });
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      for (long timestamp = 0; timestamp < 512 * 5; ++timestamp) {
+        for (int deviceIndex = 0; deviceIndex < 5; ++deviceIndex) {
+          ResultSet resultSet =
+              statement.executeQuery(
+                  String.format(
+                      "select s1, s2, s3 from root.sg.d%d where time=%d", deviceIndex, timestamp));
+          Assert.assertTrue(resultSet.next());
+          float s1Val = resultSet.getFloat("root.sg.d" + deviceIndex + ".s1");
+          float s2Val = resultSet.getFloat("root.sg.d" + deviceIndex + ".s2");
+          float s3Val = resultSet.getFloat("root.sg.d" + deviceIndex + ".s3");
+          Assert.assertEquals(s1Val, timestamp + 1, 0.001);
+          Assert.assertEquals(s2Val, timestamp + 2, 0.001);
+          Assert.assertEquals(s3Val, timestamp + 3, 0.001);
+          Assert.assertFalse(resultSet.next());
+          resultSet.close();
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testAlignedTsFileS() throws Exception {
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      for (int fileIndex = 0; fileIndex < 5; fileIndex++) {
+        for (long timestamp = fileIndex * 512, end = fileIndex * 512 + 512;
+            timestamp < end;
+            ++timestamp) {
+          for (int deviceIndex = 0; deviceIndex < 5; ++deviceIndex) {
+            statement.execute(
+                String.format(
+                    "insert into root.sg.d%d(time, s1, s2, s3) aligned values(%d, %d, %d, %d)",
+                    deviceIndex, timestamp, timestamp + 1, timestamp + 2, timestamp + 3));
+          }
+        }
+        statement.execute("FLUSH");
+      }
+      unload(statement);
+    }
+    RewriteTsFileTool.main(
+        new String[] {
+          "-h",
+          "127.0.0.1",
+          "-p",
+          "6667",
+          "-u",
+          "root",
+          "-pw",
+          "root",
+          "-f",
+          tmpDir,
+          "-rm",
+          "s",
+          "-ig"
+        });
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      for (long timestamp = 0; timestamp < 512 * 5; ++timestamp) {
+        for (int deviceIndex = 0; deviceIndex < 5; ++deviceIndex) {
+          ResultSet resultSet =
+              statement.executeQuery(
+                  String.format(
+                      "select s1, s2, s3 from root.sg.d%d where time=%d", deviceIndex, timestamp));
+          Assert.assertTrue(resultSet.next());
+          float s1Val = resultSet.getFloat("root.sg.d" + deviceIndex + ".s1");
+          float s2Val = resultSet.getFloat("root.sg.d" + deviceIndex + ".s2");
+          float s3Val = resultSet.getFloat("root.sg.d" + deviceIndex + ".s3");
+          Assert.assertEquals(s1Val, timestamp + 1, 0.001);
+          Assert.assertEquals(s2Val, timestamp + 2, 0.001);
+          Assert.assertEquals(s3Val, timestamp + 3, 0.001);
+          Assert.assertFalse(resultSet.next());
+          resultSet.close();
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testAlignedTsFileWithNullR() throws Exception {
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      for (int fileIndex = 0; fileIndex < 5; fileIndex++) {
+        for (long timestamp = fileIndex * 512, end = fileIndex * 512 + 512;
+            timestamp < end;
+            ++timestamp) {
+          for (int deviceIndex = 0; deviceIndex < 5; ++deviceIndex) {
+            if (timestamp % 3 == 0) {
+              statement.execute(
+                  String.format(
+                      "insert into root.sg.d%d(time, s2, s3) aligned values(%d, %d, %d)",
+                      deviceIndex, timestamp, timestamp + 2, timestamp + 3));
+            } else if (timestamp % 3 == 1) {
+              statement.execute(
+                  String.format(
+                      "insert into root.sg.d%d(time, s1, s3) aligned values(%d, %d, %d)",
+                      deviceIndex, timestamp, timestamp + 1, timestamp + 3));
+            } else {
+              statement.execute(
+                  String.format(
+                      "insert into root.sg.d%d(time, s1, s2) aligned values(%d, %d, %d)",
+                      deviceIndex, timestamp, timestamp + 1, timestamp + 2));
+            }
+          }
+        }
+        statement.execute("FLUSH");
+      }
+      unload(statement);
+    }
+    RewriteTsFileTool.main(
+        new String[] {
+          "-h",
+          "127.0.0.1",
+          "-p",
+          "6667",
+          "-u",
+          "root",
+          "-pw",
+          "root",
+          "-f",
+          tmpDir,
+          "-rm",
+          "r",
+          "-ig"
+        });
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      for (long timestamp = 0; timestamp < 512 * 5; ++timestamp) {
+        for (int deviceIndex = 0; deviceIndex < 5; ++deviceIndex) {
+          ResultSet resultSet =
+              statement.executeQuery(
+                  String.format(
+                      "select s1, s2, s3 from root.sg.d%d where time=%d", deviceIndex, timestamp));
+          Assert.assertTrue(resultSet.next());
+          float s1Val = resultSet.getFloat("root.sg.d" + deviceIndex + ".s1");
+          float s2Val = resultSet.getFloat("root.sg.d" + deviceIndex + ".s2");
+          float s3Val = resultSet.getFloat("root.sg.d" + deviceIndex + ".s3");
+          if (timestamp % 3 != 0) {
+            Assert.assertEquals(s1Val, timestamp + 1, 0.001);
+          } else {
+            Assert.assertEquals(s1Val, 0, 0.001);
+          }
+          if (timestamp % 3 != 1) {
+            Assert.assertEquals(s2Val, timestamp + 2, 0.001);
+          } else {
+            Assert.assertEquals(s2Val, 0, 0.001);
+          }
+          if (timestamp % 3 != 2) {
+            Assert.assertEquals(s3Val, timestamp + 3, 0.001);
+          } else {
+            Assert.assertEquals(s3Val, 0, 0.001);
+          }
+          Assert.assertFalse(resultSet.next());
+          resultSet.close();
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testAlignedTsFileWithNullS() throws Exception {
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      for (int fileIndex = 0; fileIndex < 5; fileIndex++) {
+        for (long timestamp = fileIndex * 512, end = fileIndex * 512 + 512;
+            timestamp < end;
+            ++timestamp) {
+          for (int deviceIndex = 0; deviceIndex < 5; ++deviceIndex) {
+            if (timestamp % 3 == 0) {
+              statement.execute(
+                  String.format(
+                      "insert into root.sg.d%d(time, s2, s3) aligned values(%d, %d, %d)",
+                      deviceIndex, timestamp, timestamp + 2, timestamp + 3));
+            } else if (timestamp % 3 == 1) {
+              statement.execute(
+                  String.format(
+                      "insert into root.sg.d%d(time, s1, s3) aligned values(%d, %d, %d)",
+                      deviceIndex, timestamp, timestamp + 1, timestamp + 3));
+            } else {
+              statement.execute(
+                  String.format(
+                      "insert into root.sg.d%d(time, s1, s2) aligned values(%d, %d, %d)",
+                      deviceIndex, timestamp, timestamp + 1, timestamp + 2));
+            }
+          }
+        }
+        statement.execute("FLUSH");
+      }
+      unload(statement);
+    }
+    RewriteTsFileTool.main(
+        new String[] {
+          "-h",
+          "127.0.0.1",
+          "-p",
+          "6667",
+          "-u",
+          "root",
+          "-pw",
+          "root",
+          "-f",
+          tmpDir,
+          "-rm",
+          "s",
+          "-ig"
+        });
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      for (long timestamp = 0; timestamp < 512 * 5; ++timestamp) {
+        for (int deviceIndex = 0; deviceIndex < 5; ++deviceIndex) {
+          ResultSet resultSet =
+              statement.executeQuery(
+                  String.format(
+                      "select s1, s2, s3 from root.sg.d%d where time=%d", deviceIndex, timestamp));
+          Assert.assertTrue(resultSet.next());
+          float s1Val = resultSet.getFloat("root.sg.d" + deviceIndex + ".s1");
+          float s2Val = resultSet.getFloat("root.sg.d" + deviceIndex + ".s2");
+          float s3Val = resultSet.getFloat("root.sg.d" + deviceIndex + ".s3");
+          if (timestamp % 3 != 0) {
+            Assert.assertEquals(s1Val, timestamp + 1, 0.001);
+          } else {
+            Assert.assertEquals(s1Val, 0, 0.001);
+          }
+          if (timestamp % 3 != 1) {
+            Assert.assertEquals(s2Val, timestamp + 2, 0.001);
+          } else {
+            Assert.assertEquals(s2Val, 0, 0.001);
+          }
+          if (timestamp % 3 != 2) {
+            Assert.assertEquals(s3Val, timestamp + 3, 0.001);
+          } else {
+            Assert.assertEquals(s3Val, 0, 0.001);
+          }
+          Assert.assertFalse(resultSet.next());
+          resultSet.close();
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testSimpleTsFileWithDeletionR() throws Exception {
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      for (int fileIndex = 0; fileIndex < 5; fileIndex++) {
+        for (long timestamp = fileIndex * 512, end = fileIndex * 512 + 512;
+            timestamp < end;
+            ++timestamp) {
+          for (int deviceIndex = 0; deviceIndex < 5; ++deviceIndex) {
+            statement.execute(
+                String.format(
+                    "insert into root.sg.d%d(time, s1, s2, s3) aligned values(%d, %d, %d, %d)",
+                    deviceIndex, timestamp, timestamp + 1, timestamp + 2, timestamp + 3));
+          }
+        }
+        statement.execute("FLUSH");
+      }
+    }
+    long deleteStart = 1024, deleteEnd = 2048;
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      for (int deviceIndex = 0; deviceIndex < 5; ++deviceIndex) {
+        statement.execute(
+            "delete from root.sg.d"
+                + deviceIndex
+                + ".** where time >= "
+                + deleteStart
+                + " and time <= "
+                + deleteEnd);
+      }
+      unload(statement);
+    }
+    RewriteTsFileTool.main(
+        new String[] {
+          "-h",
+          "127.0.0.1",
+          "-p",
+          "6667",
+          "-u",
+          "root",
+          "-pw",
+          "root",
+          "-f",
+          tmpDir,
+          "-rm",
+          "r",
+          "-ig"
+        });
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      for (long timestamp = 0; timestamp < 512 * 5; ++timestamp) {
+        for (int deviceIndex = 0; deviceIndex < 5; ++deviceIndex) {
+          ResultSet resultSet =
+              statement.executeQuery(
+                  String.format(
+                      "select s1, s2, s3 from root.sg.d%d where time=%d", deviceIndex, timestamp));
+          if (timestamp >= deleteStart && timestamp <= deleteEnd) {
+            Assert.assertFalse(resultSet.next());
+            continue;
+          }
+          Assert.assertTrue(resultSet.next());
+          float s1Val = resultSet.getFloat("root.sg.d" + deviceIndex + ".s1");
+          float s2Val = resultSet.getFloat("root.sg.d" + deviceIndex + ".s2");
+          float s3Val = resultSet.getFloat("root.sg.d" + deviceIndex + ".s3");
+          Assert.assertEquals(s1Val, timestamp + 1, 0.001);
+          Assert.assertEquals(s2Val, timestamp + 2, 0.001);
+          Assert.assertEquals(s3Val, timestamp + 3, 0.001);
+          Assert.assertFalse(resultSet.next());
+          resultSet.close();
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testSimpleTsFileWithDeletionS() throws Exception {
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      for (int fileIndex = 0; fileIndex < 5; fileIndex++) {
+        for (long timestamp = fileIndex * 512, end = fileIndex * 512 + 512;
+            timestamp < end;
+            ++timestamp) {
+          for (int deviceIndex = 0; deviceIndex < 5; ++deviceIndex) {
+            statement.execute(
+                String.format(
+                    "insert into root.sg.d%d(time, s1, s2, s3) aligned values(%d, %d, %d, %d)",
+                    deviceIndex, timestamp, timestamp + 1, timestamp + 2, timestamp + 3));
+          }
+        }
+        statement.execute("FLUSH");
+      }
+    }
+    long deleteStart = 1024, deleteEnd = 2048;
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      for (int deviceIndex = 0; deviceIndex < 5; ++deviceIndex) {
+        statement.execute(
+            "delete from root.sg.d"
+                + deviceIndex
+                + ".** where time >= "
+                + deleteStart
+                + " and time <= "
+                + deleteEnd);
+      }
+      unload(statement);
+    }
+    RewriteTsFileTool.main(
+        new String[] {
+          "-h",
+          "127.0.0.1",
+          "-p",
+          "6667",
+          "-u",
+          "root",
+          "-pw",
+          "root",
+          "-f",
+          tmpDir,
+          "-rm",
+          "s",
+          "-ig"
+        });
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      for (long timestamp = 0; timestamp < 512 * 5; ++timestamp) {
+        for (int deviceIndex = 0; deviceIndex < 5; ++deviceIndex) {
+          ResultSet resultSet =
+              statement.executeQuery(
+                  String.format(
+                      "select s1, s2, s3 from root.sg.d%d where time=%d", deviceIndex, timestamp));
+          if (timestamp >= deleteStart && timestamp <= deleteEnd) {
+            Assert.assertFalse(resultSet.next());
+            continue;
+          }
+          Assert.assertTrue(resultSet.next());
+          float s1Val = resultSet.getFloat("root.sg.d" + deviceIndex + ".s1");
+          float s2Val = resultSet.getFloat("root.sg.d" + deviceIndex + ".s2");
+          float s3Val = resultSet.getFloat("root.sg.d" + deviceIndex + ".s3");
+          Assert.assertEquals(s1Val, timestamp + 1, 0.001);
+          Assert.assertEquals(s2Val, timestamp + 2, 0.001);
+          Assert.assertEquals(s3Val, timestamp + 3, 0.001);
+          Assert.assertFalse(resultSet.next());
+          resultSet.close();
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testWriteAlignedTsFileWithDeletion() {}
 }
diff --git a/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java b/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java
index 4b1d47976d..f403f60bfd 100644
--- a/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java
+++ b/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java
@@ -27,42 +27,38 @@ import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.utils.QueryUtils;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.session.Session;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
 import org.apache.iotdb.tsfile.exception.write.NoMeasurementException;
 import org.apache.iotdb.tsfile.file.MetaMarker;
 import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
 import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
 import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.TsFileAlignedSeriesReaderIterator;
+import org.apache.iotdb.tsfile.read.TsFileCheckStatus;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.Chunk;
-import org.apache.iotdb.tsfile.read.common.Field;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.read.controller.CachedChunkLoaderImpl;
-import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
-import org.apache.iotdb.tsfile.read.controller.IMetadataQuerier;
-import org.apache.iotdb.tsfile.read.controller.MetadataQuerierByFileImpl;
-import org.apache.iotdb.tsfile.read.query.dataset.DataSetWithoutTimeGenerator;
-import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.read.reader.IChunkReader;
 import org.apache.iotdb.tsfile.read.reader.IPointReader;
 import org.apache.iotdb.tsfile.read.reader.chunk.AlignedChunkReader;
 import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
-import org.apache.iotdb.tsfile.read.reader.series.AbstractFileSeriesReader;
-import org.apache.iotdb.tsfile.read.reader.series.EmptyFileSeriesReader;
-import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReader;
+import org.apache.iotdb.tsfile.read.reader.page.PageReader;
+import org.apache.iotdb.tsfile.read.reader.page.TimePageReader;
+import org.apache.iotdb.tsfile.read.reader.page.ValuePageReader;
 import org.apache.iotdb.tsfile.utils.FilePathUtils;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
@@ -79,12 +75,13 @@ import org.apache.commons.cli.ParseException;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -99,8 +96,7 @@ public class RewriteTsFileTool {
   private static String password = "root";
   private static String filePath = "";
   private static String readMode = "s";
-
-  private static Map<String, Set<MeasurementSchema>> device2Measurements;
+  private static boolean ignoreBrokenChunk = false;
 
   public static void main(String[] args) {
     Session session = null;
@@ -138,6 +134,7 @@ public class RewriteTsFileTool {
       password = getArgOrDefault(commandLine, "pw", password);
       filePath = getArgOrDefault(commandLine, "f", filePath);
       readMode = getArgOrDefault(commandLine, "rm", readMode);
+      ignoreBrokenChunk = commandLine.hasOption("ig");
     } catch (ParseException e) {
       System.out.printf("Parse Args Error. %s%n", e.getMessage());
       priHelp(options);
@@ -203,6 +200,8 @@ public class RewriteTsFileTool {
             .required()
             .build();
     options.addOption(readModeOpt);
+
+    options.addOption("ig", "ignore-broken chunks");
     return options;
   }
 
@@ -231,27 +230,10 @@ public class RewriteTsFileTool {
    */
   public static void writeToIoTDB(List<File> files, Session session) {
     sortTsFiles(files);
-    int size = files.size();
-    List<File> unloadTsFiles = new ArrayList<>();
-    System.out.printf("Collect TsFiles successfully, %d files to be loaded.%n", size);
+    System.out.printf("Collect TsFiles successfully, %d files to be loaded.%n", files.size());
     System.out.println("Start Loading TsFiles...");
     if (readMode.equals("s")) {
-      for (int i = 0; i < size; i++) {
-        File file = files.get(i);
-        System.out.printf("Loading %s(%d/%d)...", file.getPath(), i + 1, size);
-        try {
-          seqWriteTsFile(file.getPath(), session);
-        } catch (Exception e) {
-          System.out.println(
-              "------------------------------Error Message------------------------------");
-          e.printStackTrace();
-          System.out.println(
-              "------------------------------End Message------------------------------");
-          unloadTsFiles.add(file);
-          continue;
-        }
-        System.out.println("Done");
-      }
+      writeTsFileSequentially(files, session);
     } else {
       try {
         reverseWriteTsFile(files, session);
@@ -268,6 +250,28 @@ public class RewriteTsFileTool {
       }
     }
     System.out.println("Finish Loading TsFiles");
+  }
+
+  private static void writeTsFileSequentially(List<File> files, Session session) {
+    int size = files.size();
+    List<File> unloadTsFiles = new ArrayList<>();
+    for (int i = 0; i < files.size(); i++) {
+      File file = files.get(i);
+      System.out.printf("Loading %s(%d/%d)...", file.getPath(), i + 1, size);
+      try {
+        seqWriteSingleTsFile(file.getPath(), session);
+        session.executeNonQueryStatement("FLUSH");
+      } catch (Exception e) {
+        System.out.println(
+            "------------------------------Error Message------------------------------");
+        e.printStackTrace();
+        System.out.println(
+            "------------------------------End Message------------------------------");
+        unloadTsFiles.add(file);
+        continue;
+      }
+      System.out.println("Done");
+    }
     System.out.printf(
         "Load %d TsFiles successfully, %d TsFiles not loaded.%n",
         size - unloadTsFiles.size(), unloadTsFiles.size());
@@ -306,104 +310,31 @@ public class RewriteTsFileTool {
    * @param filename the file path to be loaded
    * @param session IoTDB session
    */
-  public static void seqWriteTsFile(String filename, Session session)
+  public static void seqWriteSingleTsFile(String filename, Session session)
       throws IOException, IllegalPathException, IoTDBConnectionException,
           StatementExecutionException, NoMeasurementException {
-    // parse modifications from .mods
-    List<Modification> modifications = null;
-    if (FSFactoryProducer.getFSFactory()
-        .getFile(filename + ModificationFile.FILE_SUFFIX)
-        .exists()) {
-      modifications =
-          (List<Modification>)
-              new ModificationFile(filename + ModificationFile.FILE_SUFFIX).getModifications();
-    }
-
-    // read all device and their measurements
-    parseDeviceFromTsFile(filename);
 
     try (TsFileSequenceReader reader = new TsFileSequenceReader(filename)) {
-      for (Map.Entry<String, Set<MeasurementSchema>> entry : device2Measurements.entrySet()) {
-        // collect measurements for device
-        boolean isAligned = false;
-        String curDevice = entry.getKey();
-        List<MeasurementSchema> measurementSchemas = new ArrayList<>();
-        ArrayList<Path> paths = new ArrayList<>();
-        for (MeasurementSchema measurementSchema : entry.getValue()) {
-          if (!measurementSchema.getType().equals(TSDataType.VECTOR)) {
-            measurementSchemas.add(measurementSchema);
-          } else {
-            isAligned = true;
-          }
-        }
-        for (MeasurementSchema measurementSchema : measurementSchemas) {
-          paths.add(new Path(curDevice, measurementSchema.getMeasurementId()));
-        }
-
-        // construct query to this tsfile
-        List<AbstractFileSeriesReader> readersOfSelectedSeries = new ArrayList<>();
-        List<TSDataType> dataTypes = new ArrayList<>();
-        IMetadataQuerier metadataQuerier = new MetadataQuerierByFileImpl(reader);
-        IChunkLoader chunkLoader = new CachedChunkLoaderImpl(reader);
-        for (Path path : paths) {
-          List<IChunkMetadata> chunkMetadataList = metadataQuerier.getChunkMetaDataList(path);
-          modifyChunkMetadata(isAligned, path, chunkMetadataList, modifications);
-          AbstractFileSeriesReader seriesReader;
-          if (chunkMetadataList.isEmpty()) {
-            seriesReader = new EmptyFileSeriesReader();
-            dataTypes.add(metadataQuerier.getDataType(path));
-          } else {
-            seriesReader = new FileSeriesReader(chunkLoader, chunkMetadataList, null);
-            dataTypes.add(chunkMetadataList.get(0).getDataType());
-          }
-          readersOfSelectedSeries.add(seriesReader);
-        }
-
-        // read data from tsfile and construct session to send to IoTDB
-        QueryDataSet dataSet =
-            new DataSetWithoutTimeGenerator(paths, dataTypes, readersOfSelectedSeries);
-        Tablet tablet = new Tablet(curDevice, measurementSchemas, MAX_TABLET_LENGTH);
-        tablet.initBitMaps();
-        int measurementSize = measurementSchemas.size();
-        while (dataSet.hasNext()) {
-          RowRecord rowRecord = dataSet.next();
-          tablet.addTimestamp(tablet.rowSize, rowRecord.getTimestamp());
-          for (int i = 0; i < measurementSize; i++) {
-            Field field = rowRecord.getFields().get(i);
-            if (field == null) {
-              tablet.bitMaps[i].mark(tablet.rowSize);
-            } else {
-              tablet.addValue(
-                  measurementSchemas.get(i).getMeasurementId(),
-                  tablet.rowSize,
-                  field.getObjectValue(field.getDataType()));
-            }
-          }
-          tablet.rowSize++;
-          if (tablet.rowSize == MAX_TABLET_LENGTH) {
-            if (isAligned) {
-              session.insertAlignedTablet(tablet);
-            } else {
-              session.insertTablet(tablet);
-            }
-            tablet.reset();
-          }
-        }
-        if (isAligned) {
-          session.insertAlignedTablet(tablet);
-        } else {
-          session.insertTablet(tablet);
+      if (!ignoreBrokenChunk) {
+        long status = reader.selfCheck(new HashMap<>(), new ArrayList<>(), true);
+        if (status == TsFileCheckStatus.INCOMPATIBLE_FILE
+            || status == TsFileCheckStatus.FILE_EXISTS_MISTAKES) {
+          throw new IOException(
+              String.format(
+                  "The file %s is incompatible, cannot rewrite it to IoTDB. If you want to rewrite "
+                      + "all the good chunks in the file, retry with option -ignore-broken.",
+                  filename));
         }
       }
-    }
-  }
-
-  private static void parseDeviceFromTsFile(String filename) throws IOException {
-    device2Measurements = new HashMap<>();
-    try (TsFileSequenceReader reader = new TsFileSequenceReader(filename)) {
       reader.position((long) TSFileConfig.MAGIC_STRING.getBytes().length + 1);
-      String curDevice = null;
+      List<long[]> timeBatch = new ArrayList<>();
+      int pageIndex = 0;
       byte marker;
+      String currentDevice = null;
+      boolean isAlignedChunk = false;
+      List<List<TsPrimitiveType>> valueForAlignedSeries = new ArrayList<>();
+      List<Long> timeForAlignedSeries = new ArrayList<>();
+      List<MeasurementSchema> schemaForAlignedSeries = new ArrayList<>();
       while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
         switch (marker) {
           case MetaMarker.CHUNK_HEADER:
@@ -413,60 +344,316 @@ public class RewriteTsFileTool {
           case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
           case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
             ChunkHeader header = reader.readChunkHeader(marker);
-            MeasurementSchema measurementSchema =
-                new MeasurementSchema(
-                    header.getMeasurementID(),
-                    header.getDataType(),
-                    header.getEncodingType(),
-                    header.getCompressionType());
-            device2Measurements
-                .computeIfAbsent(curDevice, o -> new HashSet<>())
-                .add(measurementSchema);
-            reader.position(reader.position() + header.getDataSize());
+            if (header.getDataSize() == 0) {
+              // empty value chunk
+              break;
+            }
+            Decoder defaultTimeDecoder =
+                Decoder.getDecoderByType(
+                    TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
+                    TSDataType.INT64);
+            Decoder valueDecoder =
+                Decoder.getDecoderByType(header.getEncodingType(), header.getDataType());
+            int dataSize = header.getDataSize();
+            pageIndex = 0;
+            if (header.getDataType() == TSDataType.VECTOR) {
+              timeBatch.clear();
+            }
+            boolean addSchema = false;
+            List<TsPrimitiveType> valueList = new ArrayList<>();
+            while (dataSize > 0) {
+              valueDecoder.reset();
+              PageHeader pageHeader =
+                  reader.readPageHeader(
+                      header.getDataType(),
+                      (header.getChunkType() & 0x3F) == MetaMarker.CHUNK_HEADER);
+              ByteBuffer pageData = reader.readPage(pageHeader, header.getCompressionType());
+              if ((header.getChunkType() & (byte) TsFileConstant.TIME_COLUMN_MASK)
+                  == (byte) TsFileConstant.TIME_COLUMN_MASK) { // Time Chunk
+                readAndSendTimePage(
+                    addSchema,
+                    schemaForAlignedSeries,
+                    header,
+                    pageHeader,
+                    pageData,
+                    defaultTimeDecoder,
+                    timeBatch,
+                    pageIndex,
+                    timeForAlignedSeries);
+                addSchema = true;
+                isAlignedChunk = true;
+              } else if ((header.getChunkType() & (byte) TsFileConstant.VALUE_COLUMN_MASK)
+                  == (byte) TsFileConstant.VALUE_COLUMN_MASK) { // Value Chunk
+                readAndSendValuePage(
+                    addSchema,
+                    schemaForAlignedSeries,
+                    header,
+                    pageHeader,
+                    pageData,
+                    valueDecoder,
+                    timeBatch,
+                    pageIndex,
+                    valueList);
+                addSchema = true;
+              } else { // NonAligned Chunk
+                readAndSendSingleSeriesPage(
+                    currentDevice, header, pageData, valueDecoder, defaultTimeDecoder, session);
+              }
+              pageIndex++;
+              dataSize -= pageHeader.getSerializedPageSize();
+            }
+            if (isAlignedChunk && header.getDataType() != TSDataType.VECTOR) {
+              valueForAlignedSeries.add(valueList);
+            }
             break;
           case MetaMarker.CHUNK_GROUP_HEADER:
+            // get the next chunk group
+            if (isAlignedChunk) {
+              Tablet tablet = new Tablet(currentDevice, schemaForAlignedSeries, MAX_TABLET_LENGTH);
+              for (int i = 0; i < timeForAlignedSeries.size(); ++i) {
+                tablet.addTimestamp(tablet.rowSize, timeForAlignedSeries.get(i));
+                for (int j = 0; j < valueForAlignedSeries.size(); ++j) {
+                  if (valueForAlignedSeries.get(j).get(i) == null) {
+                    continue;
+                  }
+                  switch (valueForAlignedSeries.get(j).get(i).getDataType()) {
+                    case INT32:
+                      tablet.addValue(
+                          schemaForAlignedSeries.get(j).getMeasurementId(),
+                          tablet.rowSize,
+                          valueForAlignedSeries.get(j).get(i).getInt());
+                      break;
+                    case INT64:
+                      tablet.addValue(
+                          schemaForAlignedSeries.get(j).getMeasurementId(),
+                          tablet.rowSize,
+                          valueForAlignedSeries.get(j).get(i).getLong());
+                      break;
+                    case TEXT:
+                      tablet.addValue(
+                          schemaForAlignedSeries.get(j).getMeasurementId(),
+                          tablet.rowSize,
+                          valueForAlignedSeries.get(j).get(i).getStringValue());
+                      break;
+                    case BOOLEAN:
+                      tablet.addValue(
+                          schemaForAlignedSeries.get(j).getMeasurementId(),
+                          tablet.rowSize,
+                          valueForAlignedSeries.get(j).get(i).getBoolean());
+                      break;
+                    case FLOAT:
+                      tablet.addValue(
+                          schemaForAlignedSeries.get(j).getMeasurementId(),
+                          tablet.rowSize,
+                          valueForAlignedSeries.get(j).get(i).getFloat());
+                      break;
+                    case DOUBLE:
+                      tablet.addValue(
+                          schemaForAlignedSeries.get(j).getMeasurementId(),
+                          tablet.rowSize,
+                          valueForAlignedSeries.get(j).get(i).getDouble());
+                      break;
+                  }
+                }
+                tablet.rowSize++;
+                if (tablet.rowSize >= MAX_TABLET_LENGTH) {
+                  session.insertAlignedTablet(tablet);
+                  tablet.reset();
+                }
+              }
+              if (tablet.rowSize >= 0) {
+                session.insertAlignedTablet(tablet);
+                tablet.reset();
+              }
+              timeForAlignedSeries.clear();
+              valueForAlignedSeries.clear();
+              schemaForAlignedSeries.clear();
+            }
+            isAlignedChunk = false;
             ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader();
-            curDevice = chunkGroupHeader.getDeviceID();
+            currentDevice = chunkGroupHeader.getDeviceID();
             break;
           case MetaMarker.OPERATION_INDEX_RANGE:
             reader.readPlanIndex();
             break;
           default:
-            MetaMarker.handleUnexpectedMarker(marker);
+            System.out.printf(
+                "Cannot handle marker %d in position %d, stop reading %s%n",
+                marker, reader.position(), filename);
         }
       }
+
+      if (isAlignedChunk) {
+        Tablet tablet = new Tablet(currentDevice, schemaForAlignedSeries, MAX_TABLET_LENGTH);
+        for (int i = 0; i < timeForAlignedSeries.size(); ++i) {
+          tablet.addTimestamp(tablet.rowSize, timeForAlignedSeries.get(i));
+          for (int j = 0; j < valueForAlignedSeries.size(); ++j) {
+            if (valueForAlignedSeries.get(j).get(i) == null) {
+              continue;
+            }
+            switch (valueForAlignedSeries.get(j).get(i).getDataType()) {
+              case INT32:
+                tablet.addValue(
+                    schemaForAlignedSeries.get(j).getMeasurementId(),
+                    tablet.rowSize,
+                    valueForAlignedSeries.get(j).get(i).getInt());
+                break;
+              case INT64:
+                tablet.addValue(
+                    schemaForAlignedSeries.get(j).getMeasurementId(),
+                    tablet.rowSize,
+                    valueForAlignedSeries.get(j).get(i).getLong());
+                break;
+              case TEXT:
+                tablet.addValue(
+                    schemaForAlignedSeries.get(j).getMeasurementId(),
+                    tablet.rowSize,
+                    valueForAlignedSeries.get(j).get(i).getStringValue());
+                break;
+              case BOOLEAN:
+                tablet.addValue(
+                    schemaForAlignedSeries.get(j).getMeasurementId(),
+                    tablet.rowSize,
+                    valueForAlignedSeries.get(j).get(i).getBoolean());
+                break;
+              case FLOAT:
+                tablet.addValue(
+                    schemaForAlignedSeries.get(j).getMeasurementId(),
+                    tablet.rowSize,
+                    valueForAlignedSeries.get(j).get(i).getFloat());
+                break;
+              case DOUBLE:
+                tablet.addValue(
+                    schemaForAlignedSeries.get(j).getMeasurementId(),
+                    tablet.rowSize,
+                    valueForAlignedSeries.get(j).get(i).getDouble());
+                break;
+            }
+          }
+          tablet.rowSize++;
+          if (tablet.rowSize >= MAX_TABLET_LENGTH) {
+            session.insertAlignedTablet(tablet);
+            tablet.reset();
+          }
+        }
+        if (tablet.rowSize >= 0) {
+          session.insertAlignedTablet(tablet);
+          tablet.reset();
+        }
+        timeForAlignedSeries.clear();
+        valueForAlignedSeries.clear();
+        schemaForAlignedSeries.clear();
+      }
     }
+
+    writeModification(filename, session);
   }
 
-  private static void modifyChunkMetadata(
-      boolean isAligned,
-      Path path,
-      List<IChunkMetadata> chunkMetadataList,
-      List<Modification> modifications)
-      throws IllegalPathException {
-    if (modifications == null || modifications.isEmpty()) {
-      return;
+  private static void readAndSendValuePage(
+      boolean addSchema,
+      List<MeasurementSchema> schemaForAlignedSeries,
+      ChunkHeader header,
+      PageHeader pageHeader,
+      ByteBuffer pageData,
+      Decoder valueDecoder,
+      List<long[]> timeBatch,
+      int pageIndex,
+      List<TsPrimitiveType> valueList) {
+    if (!addSchema && header.getDataType() != TSDataType.VECTOR) {
+      schemaForAlignedSeries.add(
+          new MeasurementSchema(
+              header.getMeasurementID(),
+              header.getDataType(),
+              header.getEncodingType(),
+              header.getCompressionType()));
     }
-    List<Modification> measurementModifications = new ArrayList<>();
-    Iterator<Modification> modsIterator = modifications.listIterator();
-    Deletion currentDeletion;
-    while (modsIterator.hasNext()) {
-      currentDeletion = (Deletion) modsIterator.next();
-      // if deletion path match the chunkPath, then add the deletion to the list
-      if (currentDeletion.getPath().matchFullPath(new PartialPath(path.getFullPath()))) {
-        measurementModifications.add(currentDeletion);
+    ValuePageReader valuePageReader =
+        new ValuePageReader(pageHeader, pageData, header.getDataType(), valueDecoder);
+    TsPrimitiveType[] valueBatch = valuePageReader.nextValueBatch(timeBatch.get(pageIndex));
+    valueList.addAll(Arrays.asList(valueBatch));
+  }
+
+  private static void readAndSendTimePage(
+      boolean addSchema,
+      List<MeasurementSchema> schemaForAlignedSeries,
+      ChunkHeader header,
+      PageHeader pageHeader,
+      ByteBuffer pageData,
+      Decoder defaultTimeDecoder,
+      List<long[]> timeBatch,
+      int pageIndex,
+      List<Long> timeForAlignedSeries)
+      throws IOException {
+    if (!addSchema && header.getDataType() != TSDataType.VECTOR) {
+      schemaForAlignedSeries.add(
+          new MeasurementSchema(
+              header.getMeasurementID(),
+              header.getDataType(),
+              header.getEncodingType(),
+              header.getCompressionType()));
+    }
+    TimePageReader timePageReader = new TimePageReader(pageHeader, pageData, defaultTimeDecoder);
+    timeBatch.add(timePageReader.getNextTimeBatch());
+    for (int i = 0; i < timeBatch.get(pageIndex).length; i++) {
+      timeForAlignedSeries.add(timeBatch.get(pageIndex)[i]);
+    }
+  }
+
+  private static void readAndSendSingleSeriesPage(
+      String currentDevice,
+      ChunkHeader header,
+      ByteBuffer pageData,
+      Decoder valueDecoder,
+      Decoder defaultTimeDecoder,
+      Session session)
+      throws IOException, IoTDBConnectionException, StatementExecutionException {
+    String measurementId = header.getMeasurementID();
+    Tablet tablet =
+        new Tablet(
+            currentDevice,
+            Collections.singletonList(
+                new MeasurementSchema(
+                    measurementId,
+                    header.getDataType(),
+                    header.getEncodingType(),
+                    header.getCompressionType())),
+            MAX_TABLET_LENGTH);
+    PageReader pageReader =
+        new PageReader(pageData, header.getDataType(), valueDecoder, defaultTimeDecoder, null);
+    BatchData batchData = pageReader.getAllSatisfiedPageData();
+    while (batchData.hasCurrent()) {
+      tablet.addTimestamp(tablet.rowSize, batchData.currentTime());
+      tablet.addValue(measurementId, tablet.rowSize++, batchData.currentValue());
+      if (tablet.rowSize >= MAX_TABLET_LENGTH) {
+        session.insertTablet(tablet);
+        tablet.reset();
       }
+      batchData.next();
     }
-    if (!isAligned) {
-      QueryUtils.modifyChunkMetaData(chunkMetadataList, measurementModifications);
-    } else {
-      List<AlignedChunkMetadata> alignedChunkMetadataList = new ArrayList<>();
-      for (IChunkMetadata chunkMetadata : chunkMetadataList) {
-        alignedChunkMetadataList.add((AlignedChunkMetadata) chunkMetadata);
+    if (tablet.rowSize > 0) {
+      session.insertTablet(tablet);
+      tablet.reset();
+    }
+  }
+
+  private static void writeModification(String filename, Session session)
+      throws IoTDBConnectionException, StatementExecutionException {
+    List<Modification> modifications = null;
+    if (FSFactoryProducer.getFSFactory()
+        .getFile(filename + ModificationFile.FILE_SUFFIX)
+        .exists()) {
+      modifications =
+          (List<Modification>)
+              new ModificationFile(filename + ModificationFile.FILE_SUFFIX).getModifications();
+      for (Modification modification : modifications) {
+        session.executeNonQueryStatement(
+            String.format(
+                "delete from %s.%s where time >= %d and time <= %d",
+                modification.getDevice(),
+                modification.getMeasurement(),
+                ((Deletion) modification).getStartTime(),
+                ((Deletion) modification).getEndTime()));
       }
-      // AlignedChunk only contains one valueChunkMetadata which is measurement with this path
-      QueryUtils.modifyAlignedChunkMetaData(
-          alignedChunkMetadataList, Collections.singletonList(measurementModifications));
     }
   }
 
@@ -486,26 +673,30 @@ public class RewriteTsFileTool {
           StatementExecutionException, NoMeasurementException {
     List<TsFileResource> resources = new ArrayList<>();
     files.forEach(x -> resources.add(new TsFileResource(x)));
-    try (MultiTsFileDeviceIterator deviceIterator = new MultiTsFileDeviceIterator(resources)) {
-      while (deviceIterator.hasNextDevice()) {
-        Pair<String, Boolean> devicePair = deviceIterator.nextDevice();
-        String device = devicePair.left;
-        boolean isAligned = devicePair.right;
-        if (isAligned) {
-          try {
-            writeAlignedSeries(device, deviceIterator, session);
-          } catch (Throwable t) {
-            // this is a broken aligned chunk, skip it
-            System.out.println("Skip aligned chunk " + device);
-          }
-        } else {
-          MultiTsFileDeviceIterator.MeasurementIterator seriesIterator =
-              deviceIterator.iterateNotAlignedSeries(device, true);
-          while (seriesIterator.hasNextSeries()) {
-            writeSingleSeries(device, seriesIterator, session);
+    for (TsFileResource resource : resources) {
+      try (MultiTsFileDeviceIterator deviceIterator =
+          new MultiTsFileDeviceIterator(Collections.singletonList(resource))) {
+        while (deviceIterator.hasNextDevice()) {
+          Pair<String, Boolean> devicePair = deviceIterator.nextDevice();
+          String device = devicePair.left;
+          boolean isAligned = devicePair.right;
+          if (isAligned) {
+            try {
+              writeAlignedSeries(device, deviceIterator, session);
+            } catch (Throwable t) {
+              // this is a broken aligned chunk, skip it
+              System.out.println("Skip aligned chunk " + device);
+            }
+          } else {
+            MultiTsFileDeviceIterator.MeasurementIterator seriesIterator =
+                deviceIterator.iterateNotAlignedSeries(device, true);
+            while (seriesIterator.hasNextSeries()) {
+              writeSingleSeries(device, seriesIterator, session);
+            }
           }
         }
       }
+      writeModification(resource.getTsFile().getAbsolutePath(), session);
     }
   }
 
@@ -526,6 +717,7 @@ public class RewriteTsFileTool {
           writeSingleChunk(device, p, chunkMetadata, reader, session);
         } catch (Throwable t) {
           // this is a broken chunk, skip it
+          t.printStackTrace();
           System.out.printf("Skip broken chunk in device %s.%s%n", device, p.getMeasurement());
         }
       }
@@ -554,8 +746,33 @@ public class RewriteTsFileTool {
       IPointReader batchIterator = chunkReader.nextPageData().getBatchDataIterator();
       while (batchIterator.hasNextTimeValuePair()) {
         TimeValuePair timeValuePair = batchIterator.nextTimeValuePair();
-        tablet.timestamps[tablet.rowSize] = timeValuePair.getTimestamp();
-        tablet.values[tablet.rowSize++] = timeValuePair.getValue();
+        tablet.addTimestamp(tablet.rowSize, timeValuePair.getTimestamp());
+        switch (timeValuePair.getValue().getDataType()) {
+          case TEXT:
+            tablet.addValue(
+                p.getMeasurement(), tablet.rowSize++, timeValuePair.getValue().getStringValue());
+            break;
+          case BOOLEAN:
+            tablet.addValue(
+                p.getMeasurement(), tablet.rowSize++, timeValuePair.getValue().getBoolean());
+            break;
+          case DOUBLE:
+            tablet.addValue(
+                p.getMeasurement(), tablet.rowSize++, timeValuePair.getValue().getDouble());
+            break;
+          case FLOAT:
+            tablet.addValue(
+                p.getMeasurement(), tablet.rowSize++, timeValuePair.getValue().getFloat());
+            break;
+          case INT64:
+            tablet.addValue(
+                p.getMeasurement(), tablet.rowSize++, timeValuePair.getValue().getLong());
+            break;
+          case INT32:
+            tablet.addValue(
+                p.getMeasurement(), tablet.rowSize++, timeValuePair.getValue().getInt());
+            break;
+        }
         if (tablet.rowSize >= MAX_TABLET_LENGTH) {
           session.insertTablet(tablet);
           tablet.reset();
@@ -640,9 +857,46 @@ public class RewriteTsFileTool {
         IBatchDataIterator batchDataIterator =
             alignedChunkReader.nextPageData().getBatchDataIterator();
         while (batchDataIterator.hasNext()) {
+          tablet.addTimestamp(tablet.rowSize, batchDataIterator.currentTime());
           TsPrimitiveType[] pointsData = (TsPrimitiveType[]) batchDataIterator.currentValue();
-          tablet.timestamps[tablet.rowSize] = batchDataIterator.currentTime();
-          tablet.values[tablet.rowSize++] = batchDataIterator.currentValue();
+          for (int i = 0; i < schemaList.size(); ++i) {
+            if (pointsData[i] == null) {
+              continue;
+            }
+            switch (pointsData[i].getDataType()) {
+              case INT32:
+                tablet.addValue(
+                    schemaList.get(i).getMeasurementId(), tablet.rowSize, pointsData[i].getInt());
+                break;
+              case INT64:
+                tablet.addValue(
+                    schemaList.get(i).getMeasurementId(), tablet.rowSize, pointsData[i].getLong());
+                break;
+              case FLOAT:
+                tablet.addValue(
+                    schemaList.get(i).getMeasurementId(), tablet.rowSize, pointsData[i].getFloat());
+                break;
+              case DOUBLE:
+                tablet.addValue(
+                    schemaList.get(i).getMeasurementId(),
+                    tablet.rowSize,
+                    pointsData[i].getDouble());
+                break;
+              case BOOLEAN:
+                tablet.addValue(
+                    schemaList.get(i).getMeasurementId(),
+                    tablet.rowSize,
+                    pointsData[i].getBoolean());
+                break;
+              case TEXT:
+                tablet.addValue(
+                    schemaList.get(i).getMeasurementId(),
+                    tablet.rowSize,
+                    pointsData[i].getStringValue());
+                break;
+            }
+          }
+          tablet.rowSize++;
           batchDataIterator.next();
           if (tablet.rowSize >= MAX_TABLET_LENGTH) {
             session.insertAlignedTablet(tablet);