You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2023/03/11 08:56:13 UTC
[iotdb] branch IOTDB-5662-0.13 updated: add test
This is an automated email from the ASF dual-hosted git repository.
marklau99 pushed a commit to branch IOTDB-5662-0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/IOTDB-5662-0.13 by this push:
new ff8c20d962 add test
ff8c20d962 is described below
commit ff8c20d9624b4f6c8d55871461d44721a353a8c1
Author: LiuXuxin <li...@outlook.com>
AuthorDate: Sat Mar 11 16:56:06 2023 +0800
add test
---
.../InnerSpaceCompactionUtilsNoAlignedTest.java | 261 +++++++++++++++++++++
.../utils/CompactionFileGeneratorUtils.java | 60 +++++
2 files changed, 321 insertions(+)
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionUtilsNoAlignedTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionUtilsNoAlignedTest.java
index 98b943e904..6271928d83 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionUtilsNoAlignedTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionUtilsNoAlignedTest.java
@@ -55,6 +55,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.Set;
/**
@@ -470,6 +471,266 @@ public class InnerSpaceCompactionUtilsNoAlignedTest {
}
}
+ @Test
+ public void testMergeChunkWithDifferentEncoding() throws Exception {
+ long testTargetChunkPointNum = 1000L;
+ long originTargetChunkSize = IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
+ long originTargetChunkPointNum =
+ IoTDBDescriptor.getInstance().getConfig().getTargetChunkPointNum();
+ IoTDBDescriptor.getInstance().getConfig().setTargetChunkSize(10240);
+ IoTDBDescriptor.getInstance().getConfig().setTargetChunkPointNum(testTargetChunkPointNum);
+ long originChunkSizeLowerBound =
+ IoTDBDescriptor.getInstance().getConfig().getChunkSizeLowerBoundInCompaction();
+ IoTDBDescriptor.getInstance().getConfig().setChunkSizeLowerBoundInCompaction(1);
+ long originChunkPointNumLowerBound =
+ IoTDBDescriptor.getInstance().getConfig().getChunkPointNumLowerBoundInCompaction();
+ IoTDBDescriptor.getInstance().getConfig().setChunkPointNumLowerBoundInCompaction(1);
+ try {
+ List<TsFileResource> sourceFiles = new ArrayList();
+ Set<String> fullPathSetWithDeleted = new HashSet<>(fullPathSet);
+ // we add a deleted timeseries to simulate timeseries is deleted before compaction.
+ String deletedPath = "root.compactionTest.device999.s999";
+ fullPathSetWithDeleted.add(deletedPath);
+ int fileNum = 6;
+ long pointStep = 300L;
+ for (int i = 0; i < fileNum; ++i) {
+ List<List<Long>> chunkPagePointsNum = new ArrayList<>();
+ List<Long> pagePointsNum = new ArrayList<>();
+ pagePointsNum.add((i + 1L) * pointStep);
+ chunkPagePointsNum.add(pagePointsNum);
+ TsFileResource resource =
+ new TsFileResource(new File(SEQ_DIRS, String.format("%d-%d-0-0.tsfile", i + 1, i + 1)));
+ sourceFiles.add(resource);
+ CompactionFileGeneratorUtils.writeTsFile(
+ fullPathSetWithDeleted,
+ chunkPagePointsNum,
+ i * 2000L,
+ resource,
+ i % 2 == 0 ? TSEncoding.PLAIN : TSEncoding.RLE,
+ CompressionType.SNAPPY);
+ Map<String, Pair<Long, Long>> deletionMap = new HashMap<>();
+ deletionMap.put(deletedPath, new Pair<>(i * 2000L, (i + 1) * 2000L));
+ CompactionFileGeneratorUtils.generateMods(deletionMap, resource, false);
+ }
+ Map<PartialPath, List<TimeValuePair>> originData =
+ CompactionCheckerUtils.getDataByQuery(paths, schemaList, sourceFiles, new ArrayList<>());
+ TsFileResource targetResource =
+ TsFileNameGenerator.getInnerCompactionTargetFileResource(sourceFiles, true);
+ InnerSpaceCompactionUtils.compact(targetResource, sourceFiles);
+ InnerSpaceCompactionUtils.moveTargetFile(targetResource, "");
+
+ Map<String, List<List<Long>>> chunkPagePointsNumMerged = new HashMap<>();
+ // outer list is a chunk, inner list is point num in each page
+ List<List<Long>> chunkPointsArray = new ArrayList<>();
+ List<Long> pointsArray = new ArrayList<>();
+ long curPointNum = 0L;
+ for (int i = 0; i < fileNum; ++i) {
+ curPointNum += (i + 1L) * pointStep;
+ pointsArray.add((i + 1L) * pointStep);
+ if (curPointNum > testTargetChunkPointNum) {
+ chunkPointsArray.add(pointsArray);
+ pointsArray = new ArrayList<>();
+ curPointNum = 0;
+ }
+ }
+ if (curPointNum > 0) {
+ chunkPointsArray.add(pointsArray);
+ }
+ for (String path : fullPathSetWithDeleted) {
+ chunkPagePointsNumMerged.put(path, chunkPointsArray);
+ }
+ chunkPagePointsNumMerged.put(deletedPath, null);
+ Map<PartialPath, List<TimeValuePair>> compactedData =
+ CompactionCheckerUtils.getDataByQuery(
+ paths, schemaList, Collections.singletonList(targetResource), new ArrayList<>());
+ CompactionCheckerUtils.validDataByValueList(originData, compactedData);
+ } finally {
+ IoTDBDescriptor.getInstance().getConfig().setTargetChunkSize(originTargetChunkSize);
+ IoTDBDescriptor.getInstance().getConfig().setTargetChunkPointNum(originTargetChunkPointNum);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setChunkSizeLowerBoundInCompaction(originChunkSizeLowerBound);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setChunkPointNumLowerBoundInCompaction(originChunkPointNumLowerBound);
+ }
+ }
+
+ @Test
+ public void testMergeChunkWithDifferentCompression() throws Exception {
+ long testTargetChunkPointNum = 1000L;
+ long originTargetChunkSize = IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
+ long originTargetChunkPointNum =
+ IoTDBDescriptor.getInstance().getConfig().getTargetChunkPointNum();
+ IoTDBDescriptor.getInstance().getConfig().setTargetChunkSize(10240);
+ IoTDBDescriptor.getInstance().getConfig().setTargetChunkPointNum(testTargetChunkPointNum);
+ long originChunkSizeLowerBound =
+ IoTDBDescriptor.getInstance().getConfig().getChunkSizeLowerBoundInCompaction();
+ IoTDBDescriptor.getInstance().getConfig().setChunkSizeLowerBoundInCompaction(1);
+ long originChunkPointNumLowerBound =
+ IoTDBDescriptor.getInstance().getConfig().getChunkPointNumLowerBoundInCompaction();
+ IoTDBDescriptor.getInstance().getConfig().setChunkPointNumLowerBoundInCompaction(1);
+ try {
+ List<TsFileResource> sourceFiles = new ArrayList();
+ Set<String> fullPathSetWithDeleted = new HashSet<>(fullPathSet);
+ // we add a deleted timeseries to simulate timeseries is deleted before compaction.
+ String deletedPath = "root.compactionTest.device999.s999";
+ fullPathSetWithDeleted.add(deletedPath);
+ int fileNum = 6;
+ long pointStep = 300L;
+ for (int i = 0; i < fileNum; ++i) {
+ List<List<Long>> chunkPagePointsNum = new ArrayList<>();
+ List<Long> pagePointsNum = new ArrayList<>();
+ pagePointsNum.add((i + 1L) * pointStep);
+ chunkPagePointsNum.add(pagePointsNum);
+ TsFileResource resource =
+ new TsFileResource(new File(SEQ_DIRS, String.format("%d-%d-0-0.tsfile", i + 1, i + 1)));
+ sourceFiles.add(resource);
+ CompactionFileGeneratorUtils.writeTsFile(
+ fullPathSetWithDeleted,
+ chunkPagePointsNum,
+ i * 2000L,
+ resource,
+ TSEncoding.PLAIN,
+ i % 2 == 0 ? CompressionType.SNAPPY : CompressionType.GZIP);
+ Map<String, Pair<Long, Long>> deletionMap = new HashMap<>();
+ deletionMap.put(deletedPath, new Pair<>(i * 2000L, (i + 1) * 2000L));
+ CompactionFileGeneratorUtils.generateMods(deletionMap, resource, false);
+ }
+ Map<PartialPath, List<TimeValuePair>> originData =
+ CompactionCheckerUtils.getDataByQuery(paths, schemaList, sourceFiles, new ArrayList<>());
+ TsFileResource targetResource =
+ TsFileNameGenerator.getInnerCompactionTargetFileResource(sourceFiles, true);
+ InnerSpaceCompactionUtils.compact(targetResource, sourceFiles);
+ InnerSpaceCompactionUtils.moveTargetFile(targetResource, "");
+
+ Map<String, List<List<Long>>> chunkPagePointsNumMerged = new HashMap<>();
+ // outer list is a chunk, inner list is point num in each page
+ List<List<Long>> chunkPointsArray = new ArrayList<>();
+ List<Long> pointsArray = new ArrayList<>();
+ long curPointNum = 0L;
+ for (int i = 0; i < fileNum; ++i) {
+ curPointNum += (i + 1L) * pointStep;
+ pointsArray.add((i + 1L) * pointStep);
+ if (curPointNum > testTargetChunkPointNum) {
+ chunkPointsArray.add(pointsArray);
+ pointsArray = new ArrayList<>();
+ curPointNum = 0;
+ }
+ }
+ if (curPointNum > 0) {
+ chunkPointsArray.add(pointsArray);
+ }
+ for (String path : fullPathSetWithDeleted) {
+ chunkPagePointsNumMerged.put(path, chunkPointsArray);
+ }
+ chunkPagePointsNumMerged.put(deletedPath, null);
+ Map<PartialPath, List<TimeValuePair>> compactedData =
+ CompactionCheckerUtils.getDataByQuery(
+ paths, schemaList, Collections.singletonList(targetResource), new ArrayList<>());
+ CompactionCheckerUtils.validDataByValueList(originData, compactedData);
+ } finally {
+ IoTDBDescriptor.getInstance().getConfig().setTargetChunkSize(originTargetChunkSize);
+ IoTDBDescriptor.getInstance().getConfig().setTargetChunkPointNum(originTargetChunkPointNum);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setChunkSizeLowerBoundInCompaction(originChunkSizeLowerBound);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setChunkPointNumLowerBoundInCompaction(originChunkPointNumLowerBound);
+ }
+ }
+
+ @Test
+ public void testMergeChunkWithDifferentCompressionAndEncoding() throws Exception {
+ long testTargetChunkPointNum = 1000L;
+ long originTargetChunkSize = IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
+ long originTargetChunkPointNum =
+ IoTDBDescriptor.getInstance().getConfig().getTargetChunkPointNum();
+ IoTDBDescriptor.getInstance().getConfig().setTargetChunkSize(10240);
+ IoTDBDescriptor.getInstance().getConfig().setTargetChunkPointNum(testTargetChunkPointNum);
+ long originChunkSizeLowerBound =
+ IoTDBDescriptor.getInstance().getConfig().getChunkSizeLowerBoundInCompaction();
+ IoTDBDescriptor.getInstance().getConfig().setChunkSizeLowerBoundInCompaction(1);
+ long originChunkPointNumLowerBound =
+ IoTDBDescriptor.getInstance().getConfig().getChunkPointNumLowerBoundInCompaction();
+ IoTDBDescriptor.getInstance().getConfig().setChunkPointNumLowerBoundInCompaction(1);
+ try {
+ List<TsFileResource> sourceFiles = new ArrayList();
+ Set<String> fullPathSetWithDeleted = new HashSet<>(fullPathSet);
+ // we add a deleted timeseries to simulate timeseries is deleted before compaction.
+ String deletedPath = "root.compactionTest.device999.s999";
+ fullPathSetWithDeleted.add(deletedPath);
+ int fileNum = 6;
+ long pointStep = 300L;
+ Random random = new Random();
+ for (int i = 0; i < fileNum; ++i) {
+ List<List<Long>> chunkPagePointsNum = new ArrayList<>();
+ List<Long> pagePointsNum = new ArrayList<>();
+ pagePointsNum.add((i + 1L) * pointStep);
+ chunkPagePointsNum.add(pagePointsNum);
+ TsFileResource resource =
+ new TsFileResource(new File(SEQ_DIRS, String.format("%d-%d-0-0.tsfile", i + 1, i + 1)));
+ sourceFiles.add(resource);
+ CompactionFileGeneratorUtils.writeTsFile(
+ fullPathSetWithDeleted,
+ chunkPagePointsNum,
+ i * 2000L,
+ resource,
+ random.nextInt() % 2 == 0
+ ? (random.nextInt() % 5 < 2 ? TSEncoding.RLE : TSEncoding.GORILLA)
+ : TSEncoding.PLAIN,
+ random.nextInt() % 3 == 0
+ ? (random.nextInt() % 5 < 2 ? CompressionType.SNAPPY : CompressionType.LZ4)
+ : CompressionType.GZIP);
+ Map<String, Pair<Long, Long>> deletionMap = new HashMap<>();
+ deletionMap.put(deletedPath, new Pair<>(i * 2000L, (i + 1) * 2000L));
+ CompactionFileGeneratorUtils.generateMods(deletionMap, resource, false);
+ }
+ Map<PartialPath, List<TimeValuePair>> originData =
+ CompactionCheckerUtils.getDataByQuery(paths, schemaList, sourceFiles, new ArrayList<>());
+ TsFileResource targetResource =
+ TsFileNameGenerator.getInnerCompactionTargetFileResource(sourceFiles, true);
+ InnerSpaceCompactionUtils.compact(targetResource, sourceFiles);
+ InnerSpaceCompactionUtils.moveTargetFile(targetResource, "");
+
+ Map<String, List<List<Long>>> chunkPagePointsNumMerged = new HashMap<>();
+ // outer list is a chunk, inner list is point num in each page
+ List<List<Long>> chunkPointsArray = new ArrayList<>();
+ List<Long> pointsArray = new ArrayList<>();
+ long curPointNum = 0L;
+ for (int i = 0; i < fileNum; ++i) {
+ curPointNum += (i + 1L) * pointStep;
+ pointsArray.add((i + 1L) * pointStep);
+ if (curPointNum > testTargetChunkPointNum) {
+ chunkPointsArray.add(pointsArray);
+ pointsArray = new ArrayList<>();
+ curPointNum = 0;
+ }
+ }
+ if (curPointNum > 0) {
+ chunkPointsArray.add(pointsArray);
+ }
+ for (String path : fullPathSetWithDeleted) {
+ chunkPagePointsNumMerged.put(path, chunkPointsArray);
+ }
+ chunkPagePointsNumMerged.put(deletedPath, null);
+ Map<PartialPath, List<TimeValuePair>> compactedData =
+ CompactionCheckerUtils.getDataByQuery(
+ paths, schemaList, Collections.singletonList(targetResource), new ArrayList<>());
+ CompactionCheckerUtils.validDataByValueList(originData, compactedData);
+ } finally {
+ IoTDBDescriptor.getInstance().getConfig().setTargetChunkSize(originTargetChunkSize);
+ IoTDBDescriptor.getInstance().getConfig().setTargetChunkPointNum(originTargetChunkPointNum);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setChunkSizeLowerBoundInCompaction(originChunkSizeLowerBound);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setChunkPointNumLowerBoundInCompaction(originChunkPointNumLowerBound);
+ }
+ }
+
/**
* Generate chunk that size are less than lower bound, and they will be deserialized and written
* into chunk writer. Then generate a middle size chunk, which will be deserialized and written
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionFileGeneratorUtils.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionFileGeneratorUtils.java
index dc9d345292..29c324fc57 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionFileGeneratorUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionFileGeneratorUtils.java
@@ -29,7 +29,9 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -42,9 +44,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Random;
import java.util.Set;
public class CompactionFileGeneratorUtils {
+ private static Random random = new Random();;
public static TsFileResource getTargetTsFileResourceFromSourceResource(
TsFileResource sourceResource) throws IOException {
@@ -216,6 +220,62 @@ public class CompactionFileGeneratorUtils {
.setMaxNumberOfPointsInPage(prevMaxNumberOfPointsInPage);
}
+ public static void writeTsFile(
+ Set<String> fullPaths,
+ List<List<Long>> chunkPagePointsNum,
+ long startTime,
+ TsFileResource newTsFileResource,
+ TSEncoding encoding,
+ CompressionType compressionType)
+ throws IOException, IllegalPathException {
+ // disable auto page seal and seal page manually
+ int prevMaxNumberOfPointsInPage =
+ TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+ TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(Integer.MAX_VALUE);
+
+ if (!newTsFileResource.getTsFile().getParentFile().exists()) {
+ newTsFileResource.getTsFile().getParentFile().mkdirs();
+ }
+ RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(newTsFileResource.getTsFile());
+ Map<String, List<String>> deviceMeasurementMap = new HashMap<>();
+ for (String fullPath : fullPaths) {
+ PartialPath partialPath = new PartialPath(fullPath);
+ List<String> sensors =
+ deviceMeasurementMap.computeIfAbsent(partialPath.getDevice(), (s) -> new ArrayList<>());
+ sensors.add(partialPath.getMeasurement());
+ }
+ for (Entry<String, List<String>> deviceMeasurementEntry : deviceMeasurementMap.entrySet()) {
+ String device = deviceMeasurementEntry.getKey();
+ writer.startChunkGroup(device);
+ for (String sensor : deviceMeasurementEntry.getValue()) {
+ long currTime = startTime;
+ for (List<Long> chunk : chunkPagePointsNum) {
+ ChunkWriterImpl chunkWriter =
+ new ChunkWriterImpl(
+ new MeasurementSchema(sensor, TSDataType.INT64, encoding, compressionType), true);
+ for (Long page : chunk) {
+ for (long i = 0; i < page; i++) {
+ chunkWriter.write(currTime, random.nextLong());
+ newTsFileResource.updateStartTime(device, currTime);
+ newTsFileResource.updateEndTime(device, currTime);
+ currTime++;
+ }
+ chunkWriter.sealCurrentPage();
+ }
+ chunkWriter.writeToFileWriter(writer);
+ }
+ }
+ writer.endChunkGroup();
+ }
+ newTsFileResource.serialize();
+ writer.endFile();
+ newTsFileResource.close();
+
+ TSFileDescriptor.getInstance()
+ .getConfig()
+ .setMaxNumberOfPointsInPage(prevMaxNumberOfPointsInPage);
+ }
+
/**
* Generate a new file. For each time series, insert a point (time +1 for each point, time =
* value) into the file from the start time to the end time, the value is also equal to the time