You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2023/03/14 08:30:59 UTC

[iotdb] 01/01: fix issue and add test

This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch IOTDB-5662-1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 7151a81f1455fee470d8b2e07c9bfce8592bebab
Author: Liu Xuxin <li...@outlook.com>
AuthorDate: Tue Mar 14 16:30:47 2023 +0800

    fix issue and add test
---
 .../readchunk/SingleSeriesCompactionExecutor.java  |  38 ++-
 .../ReadChunkCompactionPerformerNoAlignedTest.java | 265 +++++++++++++++++++++
 .../utils/CompactionFileGeneratorUtils.java        |  60 +++++
 3 files changed, 351 insertions(+), 12 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
index d8d3d71172..539945ac3b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.engine.compaction.execute.utils.executor.readchunk;
 
 import org.apache.iotdb.commons.path.PartialPath;
@@ -192,17 +193,20 @@ public class SingleSeriesCompactionExecutor {
   }
 
   private void processLargeChunk(Chunk chunk, ChunkMetadata chunkMetadata) throws IOException {
-    if (pointCountInChunkWriter != 0L) {
+    if (cachedChunk != null && canMerge(cachedChunk, chunk)) {
+      // if there is a cached chunk, merge it with current chunk, then flush it
+      summary.increaseMergedChunkNum(1);
+      mergeWithCachedChunk(chunk, chunkMetadata);
+      flushCachedChunkIfLargeEnough();
+    } else if (cachedChunk != null || pointCountInChunkWriter != 0L) {
       // if there are points remaining in ChunkWriter
       // deserialize current chunk and write to ChunkWriter, then flush the ChunkWriter
       summary.increaseDeserializedChunkNum(1);
+      if (cachedChunk != null) {
+        writeCachedChunkIntoChunkWriter();
+      }
       writeChunkIntoChunkWriter(chunk);
       flushChunkWriterIfLargeEnough();
-    } else if (cachedChunk != null) {
-      // if there is a cached chunk, merge it with current chunk, then flush it
-      summary.increaseMergedChunkNum(1);
-      mergeWithCachedChunk(chunk, chunkMetadata);
-      flushCachedChunkIfLargeEnough();
     } else {
       // there is no points remaining in ChunkWriter and no cached chunk
       // flush it to file directly
@@ -213,17 +217,20 @@ public class SingleSeriesCompactionExecutor {
 
   private void processMiddleChunk(Chunk chunk, ChunkMetadata chunkMetadata) throws IOException {
     // the chunk is not too large either too small
-    if (pointCountInChunkWriter != 0L) {
+    if (cachedChunk != null && canMerge(cachedChunk, chunk)) {
+      // if there is a cached chunk, merge it with current chunk
+      summary.increaseMergedChunkNum(1);
+      mergeWithCachedChunk(chunk, chunkMetadata);
+      flushCachedChunkIfLargeEnough();
+    } else if (cachedChunk != null || pointCountInChunkWriter != 0L) {
       // if there are points remaining in ChunkWriter
       // deserialize current chunk and write to ChunkWriter
+      if (cachedChunk != null) {
+        writeCachedChunkIntoChunkWriter();
+      }
       summary.increaseDeserializedChunkNum(1);
       writeChunkIntoChunkWriter(chunk);
       flushChunkWriterIfLargeEnough();
-    } else if (cachedChunk != null) {
-      // if there is a cached chunk, merge it with current chunk
-      summary.increaseMergedChunkNum(1);
-      mergeWithCachedChunk(chunk, chunkMetadata);
-      flushCachedChunkIfLargeEnough();
     } else {
       // there is no points remaining in ChunkWriter and no cached chunk
       // cached current chunk
@@ -246,6 +253,13 @@ public class SingleSeriesCompactionExecutor {
     flushChunkWriterIfLargeEnough();
   }
 
+  private boolean canMerge(Chunk chunk1, Chunk chunk2) {
+    ChunkHeader header1 = chunk1.getHeader();
+    ChunkHeader header2 = chunk2.getHeader();
+    return (header1.getEncodingType() == header2.getEncodingType())
+        && (header1.getCompressionType() == header2.getCompressionType());
+  }
+
   /** Deserialize a chunk into points and write it to the chunkWriter */
   private void writeChunkIntoChunkWriter(Chunk chunk) throws IOException {
     IChunkReader chunkReader = new ChunkReader(chunk, null);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerNoAlignedTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerNoAlignedTest.java
index 863a1e47bf..d908f30840 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerNoAlignedTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerNoAlignedTest.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
@@ -57,6 +58,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 
 /**
@@ -877,4 +879,267 @@ public class ReadChunkCompactionPerformerNoAlignedTest {
           .setChunkPointNumLowerBoundInCompaction(originChunkPointNumLowerBound);
     }
   }
+
+  @Test
+  public void testMergeChunkWithDifferentEncoding() throws Exception {
+    long testTargetChunkPointNum = 1000L;
+    long originTargetChunkSize = IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
+    long originTargetChunkPointNum =
+        IoTDBDescriptor.getInstance().getConfig().getTargetChunkPointNum();
+    IoTDBDescriptor.getInstance().getConfig().setTargetChunkSize(10240);
+    IoTDBDescriptor.getInstance().getConfig().setTargetChunkPointNum(testTargetChunkPointNum);
+    long originChunkSizeLowerBound =
+        IoTDBDescriptor.getInstance().getConfig().getChunkSizeLowerBoundInCompaction();
+    IoTDBDescriptor.getInstance().getConfig().setChunkSizeLowerBoundInCompaction(1);
+    long originChunkPointNumLowerBound =
+        IoTDBDescriptor.getInstance().getConfig().getChunkPointNumLowerBoundInCompaction();
+    IoTDBDescriptor.getInstance().getConfig().setChunkPointNumLowerBoundInCompaction(1);
+    try {
+      List<TsFileResource> sourceFiles = new ArrayList();
+      Set<String> fullPathSetWithDeleted = new HashSet<>(fullPathSet);
+      // we add a deleted timeseries to simulate timeseries is deleted before compaction.
+      String deletedPath = "root.compactionTest.device999.s999";
+      fullPathSetWithDeleted.add(deletedPath);
+      int fileNum = 6;
+      long pointStep = 300L;
+      for (int i = 0; i < fileNum; ++i) {
+        List<List<Long>> chunkPagePointsNum = new ArrayList<>();
+        List<Long> pagePointsNum = new ArrayList<>();
+        pagePointsNum.add((i + 1L) * pointStep);
+        chunkPagePointsNum.add(pagePointsNum);
+        TsFileResource resource =
+            new TsFileResource(new File(SEQ_DIRS, String.format("%d-%d-0-0.tsfile", i + 1, i + 1)));
+        sourceFiles.add(resource);
+        CompactionFileGeneratorUtils.writeTsFile(
+            fullPathSetWithDeleted,
+            chunkPagePointsNum,
+            i * 2000L,
+            resource,
+            i % 2 == 0 ? TSEncoding.PLAIN : TSEncoding.RLE,
+            CompressionType.SNAPPY);
+        Map<String, Pair<Long, Long>> deletionMap = new HashMap<>();
+        deletionMap.put(deletedPath, new Pair<>(i * 2000L, (i + 1) * 2000L));
+        CompactionFileGeneratorUtils.generateMods(deletionMap, resource, false);
+      }
+      Map<PartialPath, List<TimeValuePair>> originData =
+          CompactionCheckerUtils.getDataByQuery(paths, schemaList, sourceFiles, new ArrayList<>());
+      TsFileResource targetResource =
+          TsFileNameGenerator.getInnerCompactionTargetFileResource(sourceFiles, true);
+      performer.setSourceFiles(sourceFiles);
+      performer.setTargetFiles(Collections.singletonList(targetResource));
+      performer.perform();
+
+      Map<String, List<List<Long>>> chunkPagePointsNumMerged = new HashMap<>();
+      // outer list is a chunk, inner list is point num in each page
+      List<List<Long>> chunkPointsArray = new ArrayList<>();
+      List<Long> pointsArray = new ArrayList<>();
+      long curPointNum = 0L;
+      for (int i = 0; i < fileNum; ++i) {
+        curPointNum += (i + 1L) * pointStep;
+        pointsArray.add((i + 1L) * pointStep);
+        if (curPointNum > testTargetChunkPointNum) {
+          chunkPointsArray.add(pointsArray);
+          pointsArray = new ArrayList<>();
+          curPointNum = 0;
+        }
+      }
+      if (curPointNum > 0) {
+        chunkPointsArray.add(pointsArray);
+      }
+      for (String path : fullPathSetWithDeleted) {
+        chunkPagePointsNumMerged.put(path, chunkPointsArray);
+      }
+      chunkPagePointsNumMerged.put(deletedPath, null);
+      Map<PartialPath, List<TimeValuePair>> compactedData =
+          CompactionCheckerUtils.getDataByQuery(
+              paths, schemaList, Collections.singletonList(targetResource), new ArrayList<>());
+      CompactionCheckerUtils.validDataByValueList(originData, compactedData);
+    } finally {
+      IoTDBDescriptor.getInstance().getConfig().setTargetChunkSize(originTargetChunkSize);
+      IoTDBDescriptor.getInstance().getConfig().setTargetChunkPointNum(originTargetChunkPointNum);
+      IoTDBDescriptor.getInstance()
+          .getConfig()
+          .setChunkSizeLowerBoundInCompaction(originChunkSizeLowerBound);
+      IoTDBDescriptor.getInstance()
+          .getConfig()
+          .setChunkPointNumLowerBoundInCompaction(originChunkPointNumLowerBound);
+    }
+  }
+
+  @Test
+  public void testMergeChunkWithDifferentCompression() throws Exception {
+    long testTargetChunkPointNum = 1000L;
+    long originTargetChunkSize = IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
+    long originTargetChunkPointNum =
+        IoTDBDescriptor.getInstance().getConfig().getTargetChunkPointNum();
+    IoTDBDescriptor.getInstance().getConfig().setTargetChunkSize(10240);
+    IoTDBDescriptor.getInstance().getConfig().setTargetChunkPointNum(testTargetChunkPointNum);
+    long originChunkSizeLowerBound =
+        IoTDBDescriptor.getInstance().getConfig().getChunkSizeLowerBoundInCompaction();
+    IoTDBDescriptor.getInstance().getConfig().setChunkSizeLowerBoundInCompaction(1);
+    long originChunkPointNumLowerBound =
+        IoTDBDescriptor.getInstance().getConfig().getChunkPointNumLowerBoundInCompaction();
+    IoTDBDescriptor.getInstance().getConfig().setChunkPointNumLowerBoundInCompaction(1);
+    try {
+      List<TsFileResource> sourceFiles = new ArrayList();
+      Set<String> fullPathSetWithDeleted = new HashSet<>(fullPathSet);
+      // we add a deleted timeseries to simulate timeseries is deleted before compaction.
+      String deletedPath = "root.compactionTest.device999.s999";
+      fullPathSetWithDeleted.add(deletedPath);
+      int fileNum = 6;
+      long pointStep = 300L;
+      for (int i = 0; i < fileNum; ++i) {
+        List<List<Long>> chunkPagePointsNum = new ArrayList<>();
+        List<Long> pagePointsNum = new ArrayList<>();
+        pagePointsNum.add((i + 1L) * pointStep);
+        chunkPagePointsNum.add(pagePointsNum);
+        TsFileResource resource =
+            new TsFileResource(new File(SEQ_DIRS, String.format("%d-%d-0-0.tsfile", i + 1, i + 1)));
+        sourceFiles.add(resource);
+        CompactionFileGeneratorUtils.writeTsFile(
+            fullPathSetWithDeleted,
+            chunkPagePointsNum,
+            i * 2000L,
+            resource,
+            TSEncoding.PLAIN,
+            i % 2 == 0 ? CompressionType.SNAPPY : CompressionType.GZIP);
+        Map<String, Pair<Long, Long>> deletionMap = new HashMap<>();
+        deletionMap.put(deletedPath, new Pair<>(i * 2000L, (i + 1) * 2000L));
+        CompactionFileGeneratorUtils.generateMods(deletionMap, resource, false);
+      }
+      Map<PartialPath, List<TimeValuePair>> originData =
+          CompactionCheckerUtils.getDataByQuery(paths, schemaList, sourceFiles, new ArrayList<>());
+      TsFileResource targetResource =
+          TsFileNameGenerator.getInnerCompactionTargetFileResource(sourceFiles, true);
+      performer.setSourceFiles(sourceFiles);
+      performer.setTargetFiles(Collections.singletonList(targetResource));
+      performer.perform();
+
+      Map<String, List<List<Long>>> chunkPagePointsNumMerged = new HashMap<>();
+      // outer list is a chunk, inner list is point num in each page
+      List<List<Long>> chunkPointsArray = new ArrayList<>();
+      List<Long> pointsArray = new ArrayList<>();
+      long curPointNum = 0L;
+      for (int i = 0; i < fileNum; ++i) {
+        curPointNum += (i + 1L) * pointStep;
+        pointsArray.add((i + 1L) * pointStep);
+        if (curPointNum > testTargetChunkPointNum) {
+          chunkPointsArray.add(pointsArray);
+          pointsArray = new ArrayList<>();
+          curPointNum = 0;
+        }
+      }
+      if (curPointNum > 0) {
+        chunkPointsArray.add(pointsArray);
+      }
+      for (String path : fullPathSetWithDeleted) {
+        chunkPagePointsNumMerged.put(path, chunkPointsArray);
+      }
+      chunkPagePointsNumMerged.put(deletedPath, null);
+      Map<PartialPath, List<TimeValuePair>> compactedData =
+          CompactionCheckerUtils.getDataByQuery(
+              paths, schemaList, Collections.singletonList(targetResource), new ArrayList<>());
+      CompactionCheckerUtils.validDataByValueList(originData, compactedData);
+    } finally {
+      IoTDBDescriptor.getInstance().getConfig().setTargetChunkSize(originTargetChunkSize);
+      IoTDBDescriptor.getInstance().getConfig().setTargetChunkPointNum(originTargetChunkPointNum);
+      IoTDBDescriptor.getInstance()
+          .getConfig()
+          .setChunkSizeLowerBoundInCompaction(originChunkSizeLowerBound);
+      IoTDBDescriptor.getInstance()
+          .getConfig()
+          .setChunkPointNumLowerBoundInCompaction(originChunkPointNumLowerBound);
+    }
+  }
+
+  @Test
+  public void testMergeChunkWithDifferentCompressionAndEncoding() throws Exception {
+    long testTargetChunkPointNum = 1000L;
+    long originTargetChunkSize = IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
+    long originTargetChunkPointNum =
+        IoTDBDescriptor.getInstance().getConfig().getTargetChunkPointNum();
+    IoTDBDescriptor.getInstance().getConfig().setTargetChunkSize(10240);
+    IoTDBDescriptor.getInstance().getConfig().setTargetChunkPointNum(testTargetChunkPointNum);
+    long originChunkSizeLowerBound =
+        IoTDBDescriptor.getInstance().getConfig().getChunkSizeLowerBoundInCompaction();
+    IoTDBDescriptor.getInstance().getConfig().setChunkSizeLowerBoundInCompaction(1);
+    long originChunkPointNumLowerBound =
+        IoTDBDescriptor.getInstance().getConfig().getChunkPointNumLowerBoundInCompaction();
+    IoTDBDescriptor.getInstance().getConfig().setChunkPointNumLowerBoundInCompaction(1);
+    try {
+      List<TsFileResource> sourceFiles = new ArrayList();
+      Set<String> fullPathSetWithDeleted = new HashSet<>(fullPathSet);
+      // we add a deleted timeseries to simulate timeseries is deleted before compaction.
+      String deletedPath = "root.compactionTest.device999.s999";
+      fullPathSetWithDeleted.add(deletedPath);
+      int fileNum = 6;
+      long pointStep = 300L;
+      Random random = new Random();
+      for (int i = 0; i < fileNum; ++i) {
+        List<List<Long>> chunkPagePointsNum = new ArrayList<>();
+        List<Long> pagePointsNum = new ArrayList<>();
+        pagePointsNum.add((i + 1L) * pointStep);
+        chunkPagePointsNum.add(pagePointsNum);
+        TsFileResource resource =
+            new TsFileResource(new File(SEQ_DIRS, String.format("%d-%d-0-0.tsfile", i + 1, i + 1)));
+        sourceFiles.add(resource);
+        CompactionFileGeneratorUtils.writeTsFile(
+            fullPathSetWithDeleted,
+            chunkPagePointsNum,
+            i * 2000L,
+            resource,
+            random.nextInt() % 2 == 0
+                ? (random.nextInt() % 5 < 2 ? TSEncoding.RLE : TSEncoding.GORILLA)
+                : TSEncoding.PLAIN,
+            random.nextInt() % 3 == 0
+                ? (random.nextInt() % 5 < 2 ? CompressionType.SNAPPY : CompressionType.LZ4)
+                : CompressionType.GZIP);
+        Map<String, Pair<Long, Long>> deletionMap = new HashMap<>();
+        deletionMap.put(deletedPath, new Pair<>(i * 2000L, (i + 1) * 2000L));
+        CompactionFileGeneratorUtils.generateMods(deletionMap, resource, false);
+      }
+      Map<PartialPath, List<TimeValuePair>> originData =
+          CompactionCheckerUtils.getDataByQuery(paths, schemaList, sourceFiles, new ArrayList<>());
+      TsFileResource targetResource =
+          TsFileNameGenerator.getInnerCompactionTargetFileResource(sourceFiles, true);
+      performer.setSourceFiles(sourceFiles);
+      performer.setTargetFiles(Collections.singletonList(targetResource));
+      performer.perform();
+
+      Map<String, List<List<Long>>> chunkPagePointsNumMerged = new HashMap<>();
+      // outer list is a chunk, inner list is point num in each page
+      List<List<Long>> chunkPointsArray = new ArrayList<>();
+      List<Long> pointsArray = new ArrayList<>();
+      long curPointNum = 0L;
+      for (int i = 0; i < fileNum; ++i) {
+        curPointNum += (i + 1L) * pointStep;
+        pointsArray.add((i + 1L) * pointStep);
+        if (curPointNum > testTargetChunkPointNum) {
+          chunkPointsArray.add(pointsArray);
+          pointsArray = new ArrayList<>();
+          curPointNum = 0;
+        }
+      }
+      if (curPointNum > 0) {
+        chunkPointsArray.add(pointsArray);
+      }
+      for (String path : fullPathSetWithDeleted) {
+        chunkPagePointsNumMerged.put(path, chunkPointsArray);
+      }
+      chunkPagePointsNumMerged.put(deletedPath, null);
+      Map<PartialPath, List<TimeValuePair>> compactedData =
+          CompactionCheckerUtils.getDataByQuery(
+              paths, schemaList, Collections.singletonList(targetResource), new ArrayList<>());
+      CompactionCheckerUtils.validDataByValueList(originData, compactedData);
+    } finally {
+      IoTDBDescriptor.getInstance().getConfig().setTargetChunkSize(originTargetChunkSize);
+      IoTDBDescriptor.getInstance().getConfig().setTargetChunkPointNum(originTargetChunkPointNum);
+      IoTDBDescriptor.getInstance()
+          .getConfig()
+          .setChunkSizeLowerBoundInCompaction(originChunkSizeLowerBound);
+      IoTDBDescriptor.getInstance()
+          .getConfig()
+          .setChunkPointNumLowerBoundInCompaction(originChunkPointNumLowerBound);
+    }
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionFileGeneratorUtils.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionFileGeneratorUtils.java
index ad23d081d2..a5cd18a297 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionFileGeneratorUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionFileGeneratorUtils.java
@@ -29,7 +29,9 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
 import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator.TsFileName;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -42,9 +44,11 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Random;
 import java.util.Set;
 
 public class CompactionFileGeneratorUtils {
+  private static Random random = new Random();
 
   public static TsFileResource getTargetTsFileResourceFromSourceResource(
       TsFileResource sourceResource) throws IOException {
@@ -308,4 +312,60 @@ public class CompactionFileGeneratorUtils {
     }
     modificationFile.close();
   }
+
+  public static void writeTsFile(
+      Set<String> fullPaths,
+      List<List<Long>> chunkPagePointsNum,
+      long startTime,
+      TsFileResource newTsFileResource,
+      TSEncoding encoding,
+      CompressionType compressionType)
+      throws IOException, IllegalPathException {
+    // disable auto page seal and seal page manually
+    int prevMaxNumberOfPointsInPage =
+        TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+    TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(Integer.MAX_VALUE);
+
+    if (!newTsFileResource.getTsFile().getParentFile().exists()) {
+      newTsFileResource.getTsFile().getParentFile().mkdirs();
+    }
+    RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(newTsFileResource.getTsFile());
+    Map<String, List<String>> deviceMeasurementMap = new HashMap<>();
+    for (String fullPath : fullPaths) {
+      PartialPath partialPath = new PartialPath(fullPath);
+      List<String> sensors =
+          deviceMeasurementMap.computeIfAbsent(partialPath.getDevice(), (s) -> new ArrayList<>());
+      sensors.add(partialPath.getMeasurement());
+    }
+    for (Entry<String, List<String>> deviceMeasurementEntry : deviceMeasurementMap.entrySet()) {
+      String device = deviceMeasurementEntry.getKey();
+      writer.startChunkGroup(device);
+      for (String sensor : deviceMeasurementEntry.getValue()) {
+        long currTime = startTime;
+        for (List<Long> chunk : chunkPagePointsNum) {
+          ChunkWriterImpl chunkWriter =
+              new ChunkWriterImpl(
+                  new MeasurementSchema(sensor, TSDataType.INT64, encoding, compressionType), true);
+          for (Long page : chunk) {
+            for (long i = 0; i < page; i++) {
+              chunkWriter.write(currTime, random.nextLong());
+              newTsFileResource.updateStartTime(device, currTime);
+              newTsFileResource.updateEndTime(device, currTime);
+              currTime++;
+            }
+            chunkWriter.sealCurrentPage();
+          }
+          chunkWriter.writeToFileWriter(writer);
+        }
+      }
+      writer.endChunkGroup();
+    }
+    newTsFileResource.serialize();
+    writer.endFile();
+    newTsFileResource.close();
+
+    TSFileDescriptor.getInstance()
+        .getConfig()
+        .setMaxNumberOfPointsInPage(prevMaxNumberOfPointsInPage);
+  }
 }