You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2023/04/25 02:37:17 UTC
[iotdb] branch master updated: [IOTDB-5662] Fix BufferedUnderflowException occurs in inner space compaction (#9322)
This is an automated email from the ASF dual-hosted git repository.
marklau99 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 29e7e1a851 [IOTDB-5662] Fix BufferedUnderflowException occurs in inner space compaction (#9322)
29e7e1a851 is described below
commit 29e7e1a8516d88a92ef811687142fd7bf5871fff
Author: Liu Xuxin <37...@users.noreply.github.com>
AuthorDate: Tue Apr 25 10:37:11 2023 +0800
[IOTDB-5662] Fix BufferedUnderflowException occurs in inner space compaction (#9322)
---
.../readchunk/SingleSeriesCompactionExecutor.java | 38 ++-
.../ReadChunkCompactionPerformerNoAlignedTest.java | 274 +++++++++++++++++++++
.../utils/CompactionFileGeneratorUtils.java | 60 +++++
3 files changed, 360 insertions(+), 12 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
index d8d3d71172..539945ac3b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.engine.compaction.execute.utils.executor.readchunk;
import org.apache.iotdb.commons.path.PartialPath;
@@ -192,17 +193,20 @@ public class SingleSeriesCompactionExecutor {
}
private void processLargeChunk(Chunk chunk, ChunkMetadata chunkMetadata) throws IOException {
- if (pointCountInChunkWriter != 0L) {
+ if (cachedChunk != null && canMerge(cachedChunk, chunk)) {
+ // if there is a cached chunk, merge it with current chunk, then flush it
+ summary.increaseMergedChunkNum(1);
+ mergeWithCachedChunk(chunk, chunkMetadata);
+ flushCachedChunkIfLargeEnough();
+ } else if (cachedChunk != null || pointCountInChunkWriter != 0L) {
// if there are points remaining in ChunkWriter
// deserialize current chunk and write to ChunkWriter, then flush the ChunkWriter
summary.increaseDeserializedChunkNum(1);
+ if (cachedChunk != null) {
+ writeCachedChunkIntoChunkWriter();
+ }
writeChunkIntoChunkWriter(chunk);
flushChunkWriterIfLargeEnough();
- } else if (cachedChunk != null) {
- // if there is a cached chunk, merge it with current chunk, then flush it
- summary.increaseMergedChunkNum(1);
- mergeWithCachedChunk(chunk, chunkMetadata);
- flushCachedChunkIfLargeEnough();
} else {
// there is no points remaining in ChunkWriter and no cached chunk
// flush it to file directly
@@ -213,17 +217,20 @@ public class SingleSeriesCompactionExecutor {
private void processMiddleChunk(Chunk chunk, ChunkMetadata chunkMetadata) throws IOException {
// the chunk is not too large either too small
- if (pointCountInChunkWriter != 0L) {
+ if (cachedChunk != null && canMerge(cachedChunk, chunk)) {
+ // if there is a cached chunk, merge it with current chunk
+ summary.increaseMergedChunkNum(1);
+ mergeWithCachedChunk(chunk, chunkMetadata);
+ flushCachedChunkIfLargeEnough();
+ } else if (cachedChunk != null || pointCountInChunkWriter != 0L) {
// if there are points remaining in ChunkWriter
// deserialize current chunk and write to ChunkWriter
+ if (cachedChunk != null) {
+ writeCachedChunkIntoChunkWriter();
+ }
summary.increaseDeserializedChunkNum(1);
writeChunkIntoChunkWriter(chunk);
flushChunkWriterIfLargeEnough();
- } else if (cachedChunk != null) {
- // if there is a cached chunk, merge it with current chunk
- summary.increaseMergedChunkNum(1);
- mergeWithCachedChunk(chunk, chunkMetadata);
- flushCachedChunkIfLargeEnough();
} else {
// there is no points remaining in ChunkWriter and no cached chunk
// cached current chunk
@@ -246,6 +253,13 @@ public class SingleSeriesCompactionExecutor {
flushChunkWriterIfLargeEnough();
}
+ private boolean canMerge(Chunk chunk1, Chunk chunk2) {
+ ChunkHeader header1 = chunk1.getHeader();
+ ChunkHeader header2 = chunk2.getHeader();
+ return (header1.getEncodingType() == header2.getEncodingType())
+ && (header1.getCompressionType() == header2.getCompressionType());
+ }
+
/** Deserialize a chunk into points and write it to the chunkWriter */
private void writeChunkIntoChunkWriter(Chunk chunk) throws IOException {
IChunkReader chunkReader = new ChunkReader(chunk, null);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerNoAlignedTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerNoAlignedTest.java
index 863a1e47bf..3b6021edc9 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerNoAlignedTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerNoAlignedTest.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -57,6 +58,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.Set;
/**
@@ -877,4 +879,276 @@ public class ReadChunkCompactionPerformerNoAlignedTest {
.setChunkPointNumLowerBoundInCompaction(originChunkPointNumLowerBound);
}
}
+
+ @Test
+ public void testMergeChunkWithDifferentEncoding() throws Exception {
+ long testTargetChunkPointNum = 1000L;
+ long originTargetChunkSize = IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
+ long originTargetChunkPointNum =
+ IoTDBDescriptor.getInstance().getConfig().getTargetChunkPointNum();
+ IoTDBDescriptor.getInstance().getConfig().setTargetChunkSize(10240);
+ IoTDBDescriptor.getInstance().getConfig().setTargetChunkPointNum(testTargetChunkPointNum);
+ long originChunkSizeLowerBound =
+ IoTDBDescriptor.getInstance().getConfig().getChunkSizeLowerBoundInCompaction();
+ IoTDBDescriptor.getInstance().getConfig().setChunkSizeLowerBoundInCompaction(1);
+ long originChunkPointNumLowerBound =
+ IoTDBDescriptor.getInstance().getConfig().getChunkPointNumLowerBoundInCompaction();
+ IoTDBDescriptor.getInstance().getConfig().setChunkPointNumLowerBoundInCompaction(1);
+ try {
+ List<TsFileResource> sourceFiles = new ArrayList();
+ Set<String> fullPathSetWithDeleted = new HashSet<>(fullPathSet);
+ // we add a deleted timeseries to simulate timeseries is deleted before compaction.
+ String deletedPath = "root.compactionTest.device999.s999";
+ fullPathSetWithDeleted.add(deletedPath);
+ int fileNum = 6;
+ long pointStep = 300L;
+ for (int i = 0; i < fileNum; ++i) {
+ List<List<Long>> chunkPagePointsNum = new ArrayList<>();
+ List<Long> pagePointsNum = new ArrayList<>();
+ pagePointsNum.add((i + 1L) * pointStep);
+ chunkPagePointsNum.add(pagePointsNum);
+ TsFileResource resource =
+ new TsFileResource(new File(SEQ_DIRS, String.format("%d-%d-0-0.tsfile", i + 1, i + 1)));
+ sourceFiles.add(resource);
+ CompactionFileGeneratorUtils.writeTsFile(
+ fullPathSetWithDeleted,
+ chunkPagePointsNum,
+ i * 2000L,
+ resource,
+ i % 2 == 0 ? TSEncoding.PLAIN : TSEncoding.RLE,
+ CompressionType.SNAPPY);
+ Map<String, Pair<Long, Long>> deletionMap = new HashMap<>();
+ deletionMap.put(deletedPath, new Pair<>(i * 2000L, (i + 1) * 2000L));
+ CompactionFileGeneratorUtils.generateMods(deletionMap, resource, false);
+ }
+ Map<PartialPath, List<TimeValuePair>> originData =
+ CompactionCheckerUtils.getDataByQuery(paths, schemaList, sourceFiles, new ArrayList<>());
+ TsFileResource targetResource =
+ TsFileNameGenerator.getInnerCompactionTargetFileResource(sourceFiles, true);
+ performer.setSourceFiles(sourceFiles);
+ performer.setTargetFiles(Collections.singletonList(targetResource));
+ performer.setSummary(new CompactionTaskSummary());
+ performer.perform();
+
+ Map<String, List<List<Long>>> chunkPagePointsNumMerged = new HashMap<>();
+ // outer list is a chunk, inner list is point num in each page
+ List<List<Long>> chunkPointsArray = new ArrayList<>();
+ List<Long> pointsArray = new ArrayList<>();
+ long curPointNum = 0L;
+ for (int i = 0; i < fileNum; ++i) {
+ curPointNum += (i + 1L) * pointStep;
+ pointsArray.add((i + 1L) * pointStep);
+ if (curPointNum > testTargetChunkPointNum) {
+ chunkPointsArray.add(pointsArray);
+ pointsArray = new ArrayList<>();
+ curPointNum = 0;
+ }
+ }
+ if (curPointNum > 0) {
+ chunkPointsArray.add(pointsArray);
+ }
+ for (String path : fullPathSetWithDeleted) {
+ chunkPagePointsNumMerged.put(path, chunkPointsArray);
+ }
+ chunkPagePointsNumMerged.put(deletedPath, null);
+ CompactionUtils.moveTargetFile(
+ Collections.singletonList(targetResource), true, "root.compactionTest");
+ Map<PartialPath, List<TimeValuePair>> compactedData =
+ CompactionCheckerUtils.getDataByQuery(
+ paths, schemaList, Collections.singletonList(targetResource), new ArrayList<>());
+ CompactionCheckerUtils.validDataByValueList(originData, compactedData);
+ } finally {
+ IoTDBDescriptor.getInstance().getConfig().setTargetChunkSize(originTargetChunkSize);
+ IoTDBDescriptor.getInstance().getConfig().setTargetChunkPointNum(originTargetChunkPointNum);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setChunkSizeLowerBoundInCompaction(originChunkSizeLowerBound);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setChunkPointNumLowerBoundInCompaction(originChunkPointNumLowerBound);
+ }
+ }
+
+ @Test
+ public void testMergeChunkWithDifferentCompression() throws Exception {
+ long testTargetChunkPointNum = 1000L;
+ long originTargetChunkSize = IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
+ long originTargetChunkPointNum =
+ IoTDBDescriptor.getInstance().getConfig().getTargetChunkPointNum();
+ IoTDBDescriptor.getInstance().getConfig().setTargetChunkSize(10240);
+ IoTDBDescriptor.getInstance().getConfig().setTargetChunkPointNum(testTargetChunkPointNum);
+ long originChunkSizeLowerBound =
+ IoTDBDescriptor.getInstance().getConfig().getChunkSizeLowerBoundInCompaction();
+ IoTDBDescriptor.getInstance().getConfig().setChunkSizeLowerBoundInCompaction(1);
+ long originChunkPointNumLowerBound =
+ IoTDBDescriptor.getInstance().getConfig().getChunkPointNumLowerBoundInCompaction();
+ IoTDBDescriptor.getInstance().getConfig().setChunkPointNumLowerBoundInCompaction(1);
+ try {
+ List<TsFileResource> sourceFiles = new ArrayList();
+ Set<String> fullPathSetWithDeleted = new HashSet<>(fullPathSet);
+ // we add a deleted timeseries to simulate timeseries is deleted before compaction.
+ String deletedPath = "root.compactionTest.device999.s999";
+ fullPathSetWithDeleted.add(deletedPath);
+ int fileNum = 6;
+ long pointStep = 300L;
+ for (int i = 0; i < fileNum; ++i) {
+ List<List<Long>> chunkPagePointsNum = new ArrayList<>();
+ List<Long> pagePointsNum = new ArrayList<>();
+ pagePointsNum.add((i + 1L) * pointStep);
+ chunkPagePointsNum.add(pagePointsNum);
+ TsFileResource resource =
+ new TsFileResource(new File(SEQ_DIRS, String.format("%d-%d-0-0.tsfile", i + 1, i + 1)));
+ sourceFiles.add(resource);
+ CompactionFileGeneratorUtils.writeTsFile(
+ fullPathSetWithDeleted,
+ chunkPagePointsNum,
+ i * 2000L,
+ resource,
+ TSEncoding.PLAIN,
+ i % 2 == 0 ? CompressionType.SNAPPY : CompressionType.GZIP);
+ Map<String, Pair<Long, Long>> deletionMap = new HashMap<>();
+ deletionMap.put(deletedPath, new Pair<>(i * 2000L, (i + 1) * 2000L));
+ CompactionFileGeneratorUtils.generateMods(deletionMap, resource, false);
+ }
+ Map<PartialPath, List<TimeValuePair>> originData =
+ CompactionCheckerUtils.getDataByQuery(paths, schemaList, sourceFiles, new ArrayList<>());
+ TsFileResource targetResource =
+ TsFileNameGenerator.getInnerCompactionTargetFileResource(sourceFiles, true);
+ performer.setSourceFiles(sourceFiles);
+ performer.setTargetFiles(Collections.singletonList(targetResource));
+ performer.setSummary(new CompactionTaskSummary());
+ performer.perform();
+
+ Map<String, List<List<Long>>> chunkPagePointsNumMerged = new HashMap<>();
+ // outer list is a chunk, inner list is point num in each page
+ List<List<Long>> chunkPointsArray = new ArrayList<>();
+ List<Long> pointsArray = new ArrayList<>();
+ long curPointNum = 0L;
+ for (int i = 0; i < fileNum; ++i) {
+ curPointNum += (i + 1L) * pointStep;
+ pointsArray.add((i + 1L) * pointStep);
+ if (curPointNum > testTargetChunkPointNum) {
+ chunkPointsArray.add(pointsArray);
+ pointsArray = new ArrayList<>();
+ curPointNum = 0;
+ }
+ }
+ if (curPointNum > 0) {
+ chunkPointsArray.add(pointsArray);
+ }
+ for (String path : fullPathSetWithDeleted) {
+ chunkPagePointsNumMerged.put(path, chunkPointsArray);
+ }
+ chunkPagePointsNumMerged.put(deletedPath, null);
+ CompactionUtils.moveTargetFile(
+ Collections.singletonList(targetResource), true, "root.compactionTest");
+ Map<PartialPath, List<TimeValuePair>> compactedData =
+ CompactionCheckerUtils.getDataByQuery(
+ paths, schemaList, Collections.singletonList(targetResource), new ArrayList<>());
+ CompactionCheckerUtils.validDataByValueList(originData, compactedData);
+ } finally {
+ IoTDBDescriptor.getInstance().getConfig().setTargetChunkSize(originTargetChunkSize);
+ IoTDBDescriptor.getInstance().getConfig().setTargetChunkPointNum(originTargetChunkPointNum);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setChunkSizeLowerBoundInCompaction(originChunkSizeLowerBound);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setChunkPointNumLowerBoundInCompaction(originChunkPointNumLowerBound);
+ }
+ }
+
+ @Test
+ public void testMergeChunkWithDifferentCompressionAndEncoding() throws Exception {
+ long testTargetChunkPointNum = 1000L;
+ long originTargetChunkSize = IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
+ long originTargetChunkPointNum =
+ IoTDBDescriptor.getInstance().getConfig().getTargetChunkPointNum();
+ IoTDBDescriptor.getInstance().getConfig().setTargetChunkSize(10240);
+ IoTDBDescriptor.getInstance().getConfig().setTargetChunkPointNum(testTargetChunkPointNum);
+ long originChunkSizeLowerBound =
+ IoTDBDescriptor.getInstance().getConfig().getChunkSizeLowerBoundInCompaction();
+ IoTDBDescriptor.getInstance().getConfig().setChunkSizeLowerBoundInCompaction(1);
+ long originChunkPointNumLowerBound =
+ IoTDBDescriptor.getInstance().getConfig().getChunkPointNumLowerBoundInCompaction();
+ IoTDBDescriptor.getInstance().getConfig().setChunkPointNumLowerBoundInCompaction(1);
+ try {
+ List<TsFileResource> sourceFiles = new ArrayList();
+ Set<String> fullPathSetWithDeleted = new HashSet<>(fullPathSet);
+ // we add a deleted timeseries to simulate timeseries is deleted before compaction.
+ String deletedPath = "root.compactionTest.device999.s999";
+ fullPathSetWithDeleted.add(deletedPath);
+ int fileNum = 6;
+ long pointStep = 300L;
+ Random random = new Random();
+ for (int i = 0; i < fileNum; ++i) {
+ List<List<Long>> chunkPagePointsNum = new ArrayList<>();
+ List<Long> pagePointsNum = new ArrayList<>();
+ pagePointsNum.add((i + 1L) * pointStep);
+ chunkPagePointsNum.add(pagePointsNum);
+ TsFileResource resource =
+ new TsFileResource(new File(SEQ_DIRS, String.format("%d-%d-0-0.tsfile", i + 1, i + 1)));
+ sourceFiles.add(resource);
+ CompactionFileGeneratorUtils.writeTsFile(
+ fullPathSetWithDeleted,
+ chunkPagePointsNum,
+ i * 2000L,
+ resource,
+ random.nextInt() % 2 == 0
+ ? (random.nextInt() % 5 < 2 ? TSEncoding.RLE : TSEncoding.GORILLA)
+ : TSEncoding.PLAIN,
+ random.nextInt() % 3 == 0
+ ? (random.nextInt() % 5 < 2 ? CompressionType.SNAPPY : CompressionType.LZ4)
+ : CompressionType.GZIP);
+ Map<String, Pair<Long, Long>> deletionMap = new HashMap<>();
+ deletionMap.put(deletedPath, new Pair<>(i * 2000L, (i + 1) * 2000L));
+ CompactionFileGeneratorUtils.generateMods(deletionMap, resource, false);
+ }
+ Map<PartialPath, List<TimeValuePair>> originData =
+ CompactionCheckerUtils.getDataByQuery(paths, schemaList, sourceFiles, new ArrayList<>());
+ TsFileResource targetResource =
+ TsFileNameGenerator.getInnerCompactionTargetFileResource(sourceFiles, true);
+ performer.setSourceFiles(sourceFiles);
+ performer.setTargetFiles(Collections.singletonList(targetResource));
+ performer.setSummary(new CompactionTaskSummary());
+ performer.perform();
+
+ Map<String, List<List<Long>>> chunkPagePointsNumMerged = new HashMap<>();
+ // outer list is a chunk, inner list is point num in each page
+ List<List<Long>> chunkPointsArray = new ArrayList<>();
+ List<Long> pointsArray = new ArrayList<>();
+ long curPointNum = 0L;
+ for (int i = 0; i < fileNum; ++i) {
+ curPointNum += (i + 1L) * pointStep;
+ pointsArray.add((i + 1L) * pointStep);
+ if (curPointNum > testTargetChunkPointNum) {
+ chunkPointsArray.add(pointsArray);
+ pointsArray = new ArrayList<>();
+ curPointNum = 0;
+ }
+ }
+ if (curPointNum > 0) {
+ chunkPointsArray.add(pointsArray);
+ }
+ for (String path : fullPathSetWithDeleted) {
+ chunkPagePointsNumMerged.put(path, chunkPointsArray);
+ }
+ chunkPagePointsNumMerged.put(deletedPath, null);
+ CompactionUtils.moveTargetFile(
+ Collections.singletonList(targetResource), true, "root.compactionTest");
+ Map<PartialPath, List<TimeValuePair>> compactedData =
+ CompactionCheckerUtils.getDataByQuery(
+ paths, schemaList, Collections.singletonList(targetResource), new ArrayList<>());
+ CompactionCheckerUtils.validDataByValueList(originData, compactedData);
+ } finally {
+ IoTDBDescriptor.getInstance().getConfig().setTargetChunkSize(originTargetChunkSize);
+ IoTDBDescriptor.getInstance().getConfig().setTargetChunkPointNum(originTargetChunkPointNum);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setChunkSizeLowerBoundInCompaction(originChunkSizeLowerBound);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setChunkPointNumLowerBoundInCompaction(originChunkPointNumLowerBound);
+ }
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionFileGeneratorUtils.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionFileGeneratorUtils.java
index ad23d081d2..a5cd18a297 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionFileGeneratorUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionFileGeneratorUtils.java
@@ -29,7 +29,9 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator.TsFileName;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -42,9 +44,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Random;
import java.util.Set;
public class CompactionFileGeneratorUtils {
+ private static Random random = new Random();
public static TsFileResource getTargetTsFileResourceFromSourceResource(
TsFileResource sourceResource) throws IOException {
@@ -308,4 +312,60 @@ public class CompactionFileGeneratorUtils {
}
modificationFile.close();
}
+
+ public static void writeTsFile(
+ Set<String> fullPaths,
+ List<List<Long>> chunkPagePointsNum,
+ long startTime,
+ TsFileResource newTsFileResource,
+ TSEncoding encoding,
+ CompressionType compressionType)
+ throws IOException, IllegalPathException {
+ // disable auto page seal and seal page manually
+ int prevMaxNumberOfPointsInPage =
+ TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+ TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(Integer.MAX_VALUE);
+
+ if (!newTsFileResource.getTsFile().getParentFile().exists()) {
+ newTsFileResource.getTsFile().getParentFile().mkdirs();
+ }
+ RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(newTsFileResource.getTsFile());
+ Map<String, List<String>> deviceMeasurementMap = new HashMap<>();
+ for (String fullPath : fullPaths) {
+ PartialPath partialPath = new PartialPath(fullPath);
+ List<String> sensors =
+ deviceMeasurementMap.computeIfAbsent(partialPath.getDevice(), (s) -> new ArrayList<>());
+ sensors.add(partialPath.getMeasurement());
+ }
+ for (Entry<String, List<String>> deviceMeasurementEntry : deviceMeasurementMap.entrySet()) {
+ String device = deviceMeasurementEntry.getKey();
+ writer.startChunkGroup(device);
+ for (String sensor : deviceMeasurementEntry.getValue()) {
+ long currTime = startTime;
+ for (List<Long> chunk : chunkPagePointsNum) {
+ ChunkWriterImpl chunkWriter =
+ new ChunkWriterImpl(
+ new MeasurementSchema(sensor, TSDataType.INT64, encoding, compressionType), true);
+ for (Long page : chunk) {
+ for (long i = 0; i < page; i++) {
+ chunkWriter.write(currTime, random.nextLong());
+ newTsFileResource.updateStartTime(device, currTime);
+ newTsFileResource.updateEndTime(device, currTime);
+ currTime++;
+ }
+ chunkWriter.sealCurrentPage();
+ }
+ chunkWriter.writeToFileWriter(writer);
+ }
+ }
+ writer.endChunkGroup();
+ }
+ newTsFileResource.serialize();
+ writer.endFile();
+ newTsFileResource.close();
+
+ TSFileDescriptor.getInstance()
+ .getConfig()
+ .setMaxNumberOfPointsInPage(prevMaxNumberOfPointsInPage);
+ }
}