You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2023/03/21 12:37:40 UTC
[iotdb] 03/06: [To rel/1.1][IOTDB-5686] Fix devices with the same name but different alignment properties meets error in inner seq compaction
This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch rc/1.1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7dcba8085413b95e3c4f398ed91652248f19bf36
Author: 周沛辰 <45...@users.noreply.github.com>
AuthorDate: Tue Mar 21 10:38:07 2023 +0800
[To rel/1.1][IOTDB-5686] Fix devices with the same name but different alignment properties meets error in inner seq compaction
---
.../impl/ReadChunkCompactionPerformer.java | 7 +-
.../engine/compaction/AbstractCompactionTest.java | 7 +-
.../compaction/FastAlignedCrossCompactionTest.java | 139 ++--
.../FastNonAlignedCrossCompactionTest.java | 138 ++--
.../utils/MultiTsFileDeviceIteratorTest.java | 734 +++++++++++++++++++++
5 files changed, 883 insertions(+), 142 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
index e97f2e5fe3..58d2a9e25e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
@@ -170,8 +170,13 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer {
deviceIterator.iterateNotAlignedSeries(device, true);
while (seriesIterator.hasNextSeries()) {
checkThreadInterrupted();
+ String seriesID = seriesIterator.nextSeries();
+ if (seriesID.equals("")) {
+ // encounter a deleted aligned device, return
+ return;
+ }
// TODO: we can provide a configuration item to enable concurrent between each series
- PartialPath p = new PartialPath(device, seriesIterator.nextSeries());
+ PartialPath p = new PartialPath(device, seriesID);
// TODO: seriesIterator needs to be refactor.
// This statement must be called before next hasNextSeries() called, or it may be trapped in a
// dead-loop.
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/AbstractCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/AbstractCompactionTest.java
index 049bc778fb..c3c8e6366e 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/AbstractCompactionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/AbstractCompactionTest.java
@@ -449,8 +449,8 @@ public class AbstractCompactionTest {
path,
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
- seqResources,
- unseqResources,
+ tsFileManager.getTsFileList(true),
+ tsFileManager.getTsFileList(false),
true);
while (tsBlockReader.hasNextBatch()) {
TsBlock block = tsBlockReader.nextBatch();
@@ -490,7 +490,8 @@ public class AbstractCompactionTest {
}
}
if (timeseriesData.size() > 0) {
- // there are still data points left, which are not in the target file
+ // there are still data points left, which are not in the target file. Lost the data after
+ // compaction.
fail();
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/FastAlignedCrossCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/FastAlignedCrossCompactionTest.java
index 08cd53cacd..06e7f59697 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/FastAlignedCrossCompactionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/FastAlignedCrossCompactionTest.java
@@ -223,12 +223,13 @@ public class FastAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
+
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -462,12 +463,12 @@ public class FastAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -700,12 +701,12 @@ public class FastAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -955,12 +956,12 @@ public class FastAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -1303,12 +1304,12 @@ public class FastAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -1659,12 +1660,12 @@ public class FastAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -2025,12 +2026,12 @@ public class FastAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -2375,12 +2376,12 @@ public class FastAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -2726,12 +2727,12 @@ public class FastAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -3087,12 +3088,12 @@ public class FastAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
validateSeqFiles(true);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
@@ -3501,12 +3502,12 @@ public class FastAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -3917,12 +3918,12 @@ public class FastAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -4333,12 +4334,12 @@ public class FastAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -4798,12 +4799,12 @@ public class FastAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -5211,12 +5212,12 @@ public class FastAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -5417,12 +5418,12 @@ public class FastAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -5682,12 +5683,12 @@ public class FastAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -6065,12 +6066,12 @@ public class FastAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -6416,12 +6417,12 @@ public class FastAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -6655,12 +6656,12 @@ public class FastAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -6789,12 +6790,12 @@ public class FastAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -6971,12 +6972,12 @@ public class FastAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -7187,12 +7188,12 @@ public class FastAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/FastNonAlignedCrossCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/FastNonAlignedCrossCompactionTest.java
index afb10bafeb..a91cab2062 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/FastNonAlignedCrossCompactionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/FastNonAlignedCrossCompactionTest.java
@@ -244,12 +244,12 @@ public class FastNonAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -479,12 +479,12 @@ public class FastNonAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -713,12 +713,12 @@ public class FastNonAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -965,12 +965,12 @@ public class FastNonAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -1310,12 +1310,12 @@ public class FastNonAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -1662,12 +1662,12 @@ public class FastNonAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -2024,12 +2024,12 @@ public class FastNonAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -2371,12 +2371,12 @@ public class FastNonAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -2719,12 +2719,12 @@ public class FastNonAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -3077,12 +3077,12 @@ public class FastNonAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -3487,12 +3487,12 @@ public class FastNonAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -3900,12 +3900,12 @@ public class FastNonAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -4313,12 +4313,12 @@ public class FastNonAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -4775,12 +4775,12 @@ public class FastNonAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -5185,12 +5185,12 @@ public class FastNonAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -5391,12 +5391,12 @@ public class FastNonAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -5656,12 +5656,12 @@ public class FastNonAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -6039,12 +6039,12 @@ public class FastNonAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -6387,12 +6387,12 @@ public class FastNonAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -6626,12 +6626,12 @@ public class FastNonAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -6760,12 +6760,12 @@ public class FastNonAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -6942,12 +6942,12 @@ public class FastNonAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
@@ -7151,12 +7151,12 @@ public class FastNonAlignedCrossCompactionTest extends AbstractCompactionTest {
resource.serialize();
unseqResources.add(resource);
- Map<PartialPath, List<TimeValuePair>> sourceDatas =
- readSourceFiles(timeserisPathList, tsDataTypes);
-
// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0,
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/MultiTsFileDeviceIteratorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/MultiTsFileDeviceIteratorTest.java
index 9ab358f6e4..5e4c60e4ed 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/MultiTsFileDeviceIteratorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/MultiTsFileDeviceIteratorTest.java
@@ -19,16 +19,30 @@
package org.apache.iotdb.db.engine.compaction.utils;
import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.AlignedPath;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.AbstractCompactionTest;
+import org.apache.iotdb.db.engine.compaction.execute.performer.impl.FastCompactionPerformer;
+import org.apache.iotdb.db.engine.compaction.execute.performer.impl.ReadChunkCompactionPerformer;
+import org.apache.iotdb.db.engine.compaction.execute.performer.impl.ReadPointCompactionPerformer;
+import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary;
+import org.apache.iotdb.db.engine.compaction.execute.task.InnerSpaceCompactionTask;
+import org.apache.iotdb.db.engine.compaction.execute.task.subtask.FastCompactionTaskSummary;
+import org.apache.iotdb.db.engine.compaction.execute.utils.CompactionUtils;
import org.apache.iotdb.db.engine.compaction.execute.utils.MultiTsFileDeviceIterator;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.TsFileGeneratorUtils;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
import org.junit.Assert;
@@ -37,17 +51,22 @@ import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_SEPARATOR;
public class MultiTsFileDeviceIteratorTest extends AbstractCompactionTest {
+ private final String oldThreadName = Thread.currentThread().getName();
@Before
public void setUp()
throws IOException, WriteProcessException, MetadataException, InterruptedException {
super.setUp();
IoTDBDescriptor.getInstance().getConfig().setTargetChunkSize(1024);
+ Thread.currentThread().setName("pool-1-IoTDB-Compaction-1");
}
@After
@@ -59,6 +78,7 @@ public class MultiTsFileDeviceIteratorTest extends AbstractCompactionTest {
for (TsFileResource tsFileResource : unseqResources) {
FileReaderManager.getInstance().closeFileAndRemoveReader(tsFileResource.getTsFilePath());
}
+ Thread.currentThread().setName(oldThreadName);
}
@Test
@@ -297,4 +317,718 @@ public class MultiTsFileDeviceIteratorTest extends AbstractCompactionTest {
Assert.assertEquals(30, deviceNum);
TsFileGeneratorUtils.alignDeviceOffset = oldAlignedDeviceOffset;
}
+
+ /**
+ * Create device with nonAligned property. Deleted it and create new device with same deviceID but
+ * aligned property. Compact it. Then deleted it and create new device with same deviceID but
+ * nonAligned property. Check whether the deviceID and its property can be obtained correctly.
+ */
+ @Test
+ public void getDeletedDevicesWithSameNameFromSeqFilesByReadChunkPerformer()
+ throws MetadataException, IOException, WriteProcessException, StorageEngineException,
+ InterruptedException {
+ TSFileDescriptor.getInstance().getConfig().setMaxDegreeOfIndexNode(3);
+ int oldAlignedDeviceOffset = TsFileGeneratorUtils.alignDeviceOffset;
+ TsFileGeneratorUtils.alignDeviceOffset = 0;
+ registerTimeseriesInMManger(30, 5, false);
+ createFiles(3, 10, 5, 100, 0, 0, 50, 50, false, true);
+ createFiles(4, 30, 5, 100, 1000, 0, 50, 50, false, true);
+
+ // generate mods file, delete d0 ~ d9 with nonAligned property
+ List<String> seriesPaths = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 5; j++) {
+ seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j);
+ }
+ }
+ generateModsFile(seriesPaths, seqResources, Long.MIN_VALUE, Long.MAX_VALUE);
+ generateModsFile(seriesPaths, unseqResources, Long.MIN_VALUE, Long.MAX_VALUE);
+ deleteTimeseriesInMManager(seriesPaths);
+
+ // generate d0 ~ d9 with aligned property
+ createFiles(2, 10, 15, 100, 2000, 2000, 50, 50, true, true);
+ tsFileManager.addAll(seqResources, true);
+
+ List<PartialPath> timeseriesPaths = new ArrayList<>();
+ for (int i = 0; i < 30; i++) {
+ for (int j = 0; j < 15; j++) {
+ if (i < 10) {
+ timeseriesPaths.add(
+ new AlignedPath(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
+ Collections.singletonList("s" + j),
+ Collections.singletonList(new MeasurementSchema("s" + j, TSDataType.INT64))));
+ } else {
+ timeseriesPaths.add(
+ new MeasurementPath(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
+ TSDataType.INT64));
+ }
+ }
+ }
+ Map<PartialPath, List<TimeValuePair>> sourceData =
+ readSourceFiles(timeseriesPaths, Collections.emptyList());
+
+ InnerSpaceCompactionTask task =
+ new InnerSpaceCompactionTask(
+ 0,
+ tsFileManager,
+ tsFileManager.getTsFileList(true),
+ true,
+ new ReadChunkCompactionPerformer(),
+ new AtomicInteger(),
+ 0L);
+ task.start();
+
+ validateSeqFiles(true);
+ validateTargetDatas(sourceData, Collections.emptyList());
+
+ // generate mods file, delete d0 ~ d9 with aligned property
+ seriesPaths.clear();
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 15; j++) {
+ seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j);
+ }
+ }
+ generateModsFile(
+ seriesPaths, tsFileManager.getTsFileList(true), Long.MIN_VALUE, Long.MAX_VALUE);
+
+ deleteTimeseriesInMManager(seriesPaths);
+
+ // generate mods file, delete d0 ~ d9 with nonAligned property
+ createFiles(1, 10, 5, 100, 2000, 2000, 50, 50, false, true);
+ tsFileManager.add(seqResources.get(seqResources.size() - 1), true);
+
+ sourceData = readSourceFiles(timeseriesPaths, Collections.emptyList());
+
+ // sort the deviceId in lexicographical order from small to large
+ List<String> deviceIds = new ArrayList<>();
+ for (int i = 0; i < 30; i++) {
+ deviceIds.add("root.testsg.d" + (i + TsFileGeneratorUtils.getAlignDeviceOffset()));
+ }
+ deviceIds.sort(String::compareTo);
+
+ int deviceNum = 0;
+ try (MultiTsFileDeviceIterator multiTsFileDeviceIterator =
+ new MultiTsFileDeviceIterator(tsFileManager.getTsFileList(true))) {
+ while (multiTsFileDeviceIterator.hasNextDevice()) {
+ Pair<String, Boolean> deviceInfo = multiTsFileDeviceIterator.nextDevice();
+ Assert.assertEquals(deviceIds.get(deviceNum), deviceInfo.left);
+ Assert.assertFalse(deviceInfo.right);
+ deviceNum++;
+ }
+ }
+ Assert.assertEquals(30, deviceNum);
+ TsFileGeneratorUtils.alignDeviceOffset = oldAlignedDeviceOffset;
+ List<TsFileResource> targetResources =
+ CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(
+ tsFileManager.getTsFileList(true), true);
+ ReadChunkCompactionPerformer performer =
+ new ReadChunkCompactionPerformer(tsFileManager.getTsFileList(true), targetResources.get(0));
+ performer.setSummary(new CompactionTaskSummary());
+ performer.perform();
+
+ CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
+ tsFileManager.replace(
+ tsFileManager.getTsFileList(true), Collections.emptyList(), targetResources, 0, true);
+ tsFileManager.getTsFileList(true).get(0).setStatus(TsFileResourceStatus.CLOSED);
+
+ validateSeqFiles(true);
+ validateTargetDatas(sourceData, Collections.emptyList());
+ }
+
+ /**
+ * Create device with aligned property. Deleted it and create new device with same deviceID but
+ * nonAligned property. Compact it. Then deleted it and create new device with same deviceID but
+ * aligned property. Check whether the deviceID and its property can be obtained correctly.
+ */
+ @Test
+ public void getDeletedDevicesWithSameNameFromSeqFilesByReadChunkPerformer2()
+ throws MetadataException, IOException, WriteProcessException, StorageEngineException,
+ InterruptedException {
+ TSFileDescriptor.getInstance().getConfig().setMaxDegreeOfIndexNode(3);
+ int oldAlignedDeviceOffset = TsFileGeneratorUtils.alignDeviceOffset;
+ TsFileGeneratorUtils.alignDeviceOffset = 0;
+ registerTimeseriesInMManger(30, 5, true);
+ createFiles(3, 10, 5, 100, 0, 0, 50, 50, true, true);
+ createFiles(4, 30, 5, 100, 1000, 0, 50, 50, true, true);
+
+ // generate mods file, delete d0 ~ d9 with nonAligned property
+ List<String> seriesPaths = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 5; j++) {
+ seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j);
+ }
+ }
+ generateModsFile(seriesPaths, seqResources, Long.MIN_VALUE, Long.MAX_VALUE);
+ generateModsFile(seriesPaths, unseqResources, Long.MIN_VALUE, Long.MAX_VALUE);
+ deleteTimeseriesInMManager(seriesPaths);
+
+ // generate d0 ~ d9 with aligned property
+ createFiles(2, 10, 15, 100, 2000, 2000, 50, 50, false, true);
+ tsFileManager.addAll(seqResources, true);
+
+ List<PartialPath> timeseriesPaths = new ArrayList<>();
+ for (int i = 0; i < 30; i++) {
+ for (int j = 0; j < 15; j++) {
+ if (i < 10) {
+ timeseriesPaths.add(
+ new AlignedPath(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
+ Collections.singletonList("s" + j),
+ Collections.singletonList(new MeasurementSchema("s" + j, TSDataType.INT64))));
+ } else {
+ timeseriesPaths.add(
+ new MeasurementPath(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
+ TSDataType.INT64));
+ }
+ }
+ }
+ Map<PartialPath, List<TimeValuePair>> sourceData =
+ readSourceFiles(timeseriesPaths, Collections.emptyList());
+
+ InnerSpaceCompactionTask task =
+ new InnerSpaceCompactionTask(
+ 0,
+ tsFileManager,
+ tsFileManager.getTsFileList(true),
+ true,
+ new ReadChunkCompactionPerformer(),
+ new AtomicInteger(),
+ 0L);
+ task.start();
+
+ validateSeqFiles(true);
+ validateTargetDatas(sourceData, Collections.emptyList());
+
+ // generate mods file, delete d0 ~ d9 with aligned property
+ seriesPaths.clear();
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 15; j++) {
+ seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j);
+ }
+ }
+ generateModsFile(
+ seriesPaths, tsFileManager.getTsFileList(true), Long.MIN_VALUE, Long.MAX_VALUE);
+
+ deleteTimeseriesInMManager(seriesPaths);
+
+ // generate mods file, delete d0 ~ d9 with nonAligned property
+ createFiles(1, 10, 5, 100, 2000, 2000, 50, 50, true, true);
+ tsFileManager.add(seqResources.get(seqResources.size() - 1), true);
+
+ sourceData = readSourceFiles(timeseriesPaths, Collections.emptyList());
+
+ // sort the deviceId in lexicographical order from small to large
+ List<String> deviceIds = new ArrayList<>();
+ for (int i = 0; i < 30; i++) {
+ deviceIds.add("root.testsg.d" + (i + TsFileGeneratorUtils.getAlignDeviceOffset()));
+ }
+ deviceIds.sort(String::compareTo);
+
+ int deviceNum = 0;
+ try (MultiTsFileDeviceIterator multiTsFileDeviceIterator =
+ new MultiTsFileDeviceIterator(tsFileManager.getTsFileList(true))) {
+ while (multiTsFileDeviceIterator.hasNextDevice()) {
+ Pair<String, Boolean> deviceInfo = multiTsFileDeviceIterator.nextDevice();
+ Assert.assertEquals(deviceIds.get(deviceNum), deviceInfo.left);
+ Assert.assertTrue(deviceInfo.right);
+ deviceNum++;
+ }
+ }
+ Assert.assertEquals(30, deviceNum);
+ TsFileGeneratorUtils.alignDeviceOffset = oldAlignedDeviceOffset;
+ List<TsFileResource> targetResources =
+ CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(
+ tsFileManager.getTsFileList(true), true);
+ ReadChunkCompactionPerformer performer =
+ new ReadChunkCompactionPerformer(tsFileManager.getTsFileList(true), targetResources.get(0));
+ performer.setSummary(new CompactionTaskSummary());
+ performer.perform();
+
+ CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
+ tsFileManager.replace(
+ tsFileManager.getTsFileList(true), Collections.emptyList(), targetResources, 0, true);
+ tsFileManager.getTsFileList(true).get(0).setStatus(TsFileResourceStatus.CLOSED);
+
+ validateSeqFiles(true);
+ validateTargetDatas(sourceData, Collections.emptyList());
+ }
+
+ /**
+ * Create device with nonAligned property. Deleted it and create new device with same deviceID but
+ * aligned property. Compact it. Then deleted it and create new device with same deviceID but
+ * nonAligned property. Check whether the deviceID and its property can be obtained correctly.
+ */
+ @Test
+ public void getDeletedDevicesWithSameNameFromSeqFilesByReadPointPerformer() throws Exception {
+ TSFileDescriptor.getInstance().getConfig().setMaxDegreeOfIndexNode(3);
+ int oldAlignedDeviceOffset = TsFileGeneratorUtils.alignDeviceOffset;
+ TsFileGeneratorUtils.alignDeviceOffset = 0;
+ registerTimeseriesInMManger(30, 5, false);
+ createFiles(3, 10, 5, 100, 0, 0, 50, 50, false, true);
+ createFiles(4, 30, 5, 100, 1000, 0, 50, 50, false, true);
+
+ // generate mods file, delete d0 ~ d9 with nonAligned property
+ List<String> seriesPaths = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 5; j++) {
+ seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j);
+ }
+ }
+ generateModsFile(seriesPaths, seqResources, Long.MIN_VALUE, Long.MAX_VALUE);
+ generateModsFile(seriesPaths, unseqResources, Long.MIN_VALUE, Long.MAX_VALUE);
+ deleteTimeseriesInMManager(seriesPaths);
+
+ // generate d0 ~ d9 with aligned property
+ createFiles(2, 10, 15, 100, 2000, 2000, 50, 50, true, true);
+ tsFileManager.addAll(seqResources, true);
+
+ List<PartialPath> timeseriesPaths = new ArrayList<>();
+ for (int i = 0; i < 30; i++) {
+ for (int j = 0; j < 15; j++) {
+ if (i < 10) {
+ timeseriesPaths.add(
+ new AlignedPath(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
+ Collections.singletonList("s" + j),
+ Collections.singletonList(new MeasurementSchema("s" + j, TSDataType.INT64))));
+ } else {
+ timeseriesPaths.add(
+ new MeasurementPath(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
+ TSDataType.INT64));
+ }
+ }
+ }
+ Map<PartialPath, List<TimeValuePair>> sourceData =
+ readSourceFiles(timeseriesPaths, Collections.emptyList());
+
+ InnerSpaceCompactionTask task =
+ new InnerSpaceCompactionTask(
+ 0,
+ tsFileManager,
+ tsFileManager.getTsFileList(true),
+ true,
+ new ReadPointCompactionPerformer(),
+ new AtomicInteger(),
+ 0L);
+ task.start();
+
+ validateSeqFiles(true);
+ validateTargetDatas(sourceData, Collections.emptyList());
+
+ // generate mods file, delete d0 ~ d9 with aligned property
+ seriesPaths.clear();
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 15; j++) {
+ seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j);
+ }
+ }
+ generateModsFile(
+ seriesPaths, tsFileManager.getTsFileList(true), Long.MIN_VALUE, Long.MAX_VALUE);
+
+ deleteTimeseriesInMManager(seriesPaths);
+
+ // generate mods file, delete d0 ~ d9 with nonAligned property
+ createFiles(1, 10, 5, 100, 2000, 2000, 50, 50, false, true);
+ tsFileManager.add(seqResources.get(seqResources.size() - 1), true);
+
+ sourceData = readSourceFiles(timeseriesPaths, Collections.emptyList());
+
+ // sort the deviceId in lexicographical order from small to large
+ List<String> deviceIds = new ArrayList<>();
+ for (int i = 0; i < 30; i++) {
+ deviceIds.add("root.testsg.d" + (i + TsFileGeneratorUtils.getAlignDeviceOffset()));
+ }
+ deviceIds.sort(String::compareTo);
+
+ int deviceNum = 0;
+ try (MultiTsFileDeviceIterator multiTsFileDeviceIterator =
+ new MultiTsFileDeviceIterator(tsFileManager.getTsFileList(true))) {
+ while (multiTsFileDeviceIterator.hasNextDevice()) {
+ Pair<String, Boolean> deviceInfo = multiTsFileDeviceIterator.nextDevice();
+ Assert.assertEquals(deviceIds.get(deviceNum), deviceInfo.left);
+ Assert.assertFalse(deviceInfo.right);
+ deviceNum++;
+ }
+ }
+ Assert.assertEquals(30, deviceNum);
+ TsFileGeneratorUtils.alignDeviceOffset = oldAlignedDeviceOffset;
+ List<TsFileResource> targetResources =
+ CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(
+ tsFileManager.getTsFileList(true), true);
+ ReadPointCompactionPerformer performer =
+ new ReadPointCompactionPerformer(
+ tsFileManager.getTsFileList(true), Collections.emptyList(), targetResources);
+ performer.setSummary(new CompactionTaskSummary());
+ performer.perform();
+
+ CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
+ tsFileManager.replace(
+ tsFileManager.getTsFileList(true), Collections.emptyList(), targetResources, 0, true);
+ tsFileManager.getTsFileList(true).get(0).setStatus(TsFileResourceStatus.CLOSED);
+
+ validateSeqFiles(true);
+ validateTargetDatas(sourceData, Collections.emptyList());
+ }
+
+ /**
+ * Create device with aligned property. Deleted it and create new device with same deviceID but
+ * nonAligned property. Compact it. Then deleted it and create new device with same deviceID but
+ * aligned property. Check whether the deviceID and its property can be obtained correctly.
+ */
+ @Test
+ public void getDeletedDevicesWithSameNameFromSeqFilesByReadPointPerformer2() throws Exception {
+ TSFileDescriptor.getInstance().getConfig().setMaxDegreeOfIndexNode(3);
+ int oldAlignedDeviceOffset = TsFileGeneratorUtils.alignDeviceOffset;
+ TsFileGeneratorUtils.alignDeviceOffset = 0;
+ registerTimeseriesInMManger(30, 5, true);
+ createFiles(3, 10, 5, 100, 0, 0, 50, 50, true, true);
+ createFiles(4, 30, 5, 100, 1000, 0, 50, 50, true, true);
+
+ // generate mods file, delete d0 ~ d9 with nonAligned property
+ List<String> seriesPaths = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 5; j++) {
+ seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j);
+ }
+ }
+ generateModsFile(seriesPaths, seqResources, Long.MIN_VALUE, Long.MAX_VALUE);
+ generateModsFile(seriesPaths, unseqResources, Long.MIN_VALUE, Long.MAX_VALUE);
+ deleteTimeseriesInMManager(seriesPaths);
+
+ // generate d0 ~ d9 with aligned property
+ createFiles(2, 10, 15, 100, 2000, 2000, 50, 50, false, true);
+ tsFileManager.addAll(seqResources, true);
+
+ List<PartialPath> timeseriesPaths = new ArrayList<>();
+ for (int i = 0; i < 30; i++) {
+ for (int j = 0; j < 15; j++) {
+ if (i < 10) {
+ timeseriesPaths.add(
+ new AlignedPath(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
+ Collections.singletonList("s" + j),
+ Collections.singletonList(new MeasurementSchema("s" + j, TSDataType.INT64))));
+ } else {
+ timeseriesPaths.add(
+ new MeasurementPath(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
+ TSDataType.INT64));
+ }
+ }
+ }
+ Map<PartialPath, List<TimeValuePair>> sourceData =
+ readSourceFiles(timeseriesPaths, Collections.emptyList());
+
+ InnerSpaceCompactionTask task =
+ new InnerSpaceCompactionTask(
+ 0,
+ tsFileManager,
+ tsFileManager.getTsFileList(true),
+ true,
+ new ReadPointCompactionPerformer(),
+ new AtomicInteger(),
+ 0L);
+ task.start();
+
+ validateSeqFiles(true);
+ validateTargetDatas(sourceData, Collections.emptyList());
+
+ // generate mods file, delete d0 ~ d9 with aligned property
+ seriesPaths.clear();
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 15; j++) {
+ seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j);
+ }
+ }
+ generateModsFile(
+ seriesPaths, tsFileManager.getTsFileList(true), Long.MIN_VALUE, Long.MAX_VALUE);
+
+ deleteTimeseriesInMManager(seriesPaths);
+
+ // generate mods file, delete d0 ~ d9 with nonAligned property
+ createFiles(1, 10, 5, 100, 2000, 2000, 50, 50, true, true);
+ tsFileManager.add(seqResources.get(seqResources.size() - 1), true);
+
+ sourceData = readSourceFiles(timeseriesPaths, Collections.emptyList());
+
+ // sort the deviceId in lexicographical order from small to large
+ List<String> deviceIds = new ArrayList<>();
+ for (int i = 0; i < 30; i++) {
+ deviceIds.add("root.testsg.d" + (i + TsFileGeneratorUtils.getAlignDeviceOffset()));
+ }
+ deviceIds.sort(String::compareTo);
+
+ int deviceNum = 0;
+ try (MultiTsFileDeviceIterator multiTsFileDeviceIterator =
+ new MultiTsFileDeviceIterator(tsFileManager.getTsFileList(true))) {
+ while (multiTsFileDeviceIterator.hasNextDevice()) {
+ Pair<String, Boolean> deviceInfo = multiTsFileDeviceIterator.nextDevice();
+ Assert.assertEquals(deviceIds.get(deviceNum), deviceInfo.left);
+ Assert.assertTrue(deviceInfo.right);
+ deviceNum++;
+ }
+ }
+ Assert.assertEquals(30, deviceNum);
+ TsFileGeneratorUtils.alignDeviceOffset = oldAlignedDeviceOffset;
+ List<TsFileResource> targetResources =
+ CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(
+ tsFileManager.getTsFileList(true), true);
+ ReadPointCompactionPerformer performer =
+ new ReadPointCompactionPerformer(
+ tsFileManager.getTsFileList(true), Collections.emptyList(), targetResources);
+ performer.setSummary(new CompactionTaskSummary());
+ performer.perform();
+
+ CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
+ tsFileManager.replace(
+ tsFileManager.getTsFileList(true), Collections.emptyList(), targetResources, 0, true);
+ tsFileManager.getTsFileList(true).get(0).setStatus(TsFileResourceStatus.CLOSED);
+
+ validateSeqFiles(true);
+ validateTargetDatas(sourceData, Collections.emptyList());
+ }
+
+ /**
+ * Create device with nonAligned property. Deleted it and create new device with same deviceID but
+ * aligned property. Compact it. Then deleted it and create new device with same deviceID but
+ * nonAligned property. Check whether the deviceID and its property can be obtained correctly.
+ */
+ @Test
+ public void getDeletedDevicesWithSameNameFromSeqFilesByFastPerformer()
+ throws MetadataException, IOException, WriteProcessException, StorageEngineException,
+ InterruptedException {
+ TSFileDescriptor.getInstance().getConfig().setMaxDegreeOfIndexNode(3);
+ int oldAlignedDeviceOffset = TsFileGeneratorUtils.alignDeviceOffset;
+ TsFileGeneratorUtils.alignDeviceOffset = 0;
+ registerTimeseriesInMManger(30, 5, false);
+ createFiles(3, 10, 5, 100, 0, 0, 50, 50, false, true);
+ createFiles(4, 30, 5, 100, 1000, 0, 50, 50, false, true);
+
+ // generate mods file, delete d0 ~ d9 with nonAligned property
+ List<String> seriesPaths = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 5; j++) {
+ seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j);
+ }
+ }
+ generateModsFile(seriesPaths, seqResources, Long.MIN_VALUE, Long.MAX_VALUE);
+ generateModsFile(seriesPaths, unseqResources, Long.MIN_VALUE, Long.MAX_VALUE);
+ deleteTimeseriesInMManager(seriesPaths);
+
+ // generate d0 ~ d9 with aligned property
+ createFiles(2, 10, 15, 100, 2000, 2000, 50, 50, true, true);
+ tsFileManager.addAll(seqResources, true);
+
+ List<PartialPath> timeseriesPaths = new ArrayList<>();
+ for (int i = 0; i < 30; i++) {
+ for (int j = 0; j < 15; j++) {
+ if (i < 10) {
+ timeseriesPaths.add(
+ new AlignedPath(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
+ Collections.singletonList("s" + j),
+ Collections.singletonList(new MeasurementSchema("s" + j, TSDataType.INT64))));
+ } else {
+ timeseriesPaths.add(
+ new MeasurementPath(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
+ TSDataType.INT64));
+ }
+ }
+ }
+ Map<PartialPath, List<TimeValuePair>> sourceData =
+ readSourceFiles(timeseriesPaths, Collections.emptyList());
+
+ InnerSpaceCompactionTask task =
+ new InnerSpaceCompactionTask(
+ 0,
+ tsFileManager,
+ tsFileManager.getTsFileList(true),
+ true,
+ new FastCompactionPerformer(false),
+ new AtomicInteger(),
+ 0L);
+ task.start();
+
+ validateSeqFiles(true);
+ validateTargetDatas(sourceData, Collections.emptyList());
+
+ // generate mods file, delete d0 ~ d9 with aligned property
+ seriesPaths.clear();
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 15; j++) {
+ seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j);
+ }
+ }
+ generateModsFile(
+ seriesPaths, tsFileManager.getTsFileList(true), Long.MIN_VALUE, Long.MAX_VALUE);
+
+ deleteTimeseriesInMManager(seriesPaths);
+
+ // generate mods file, delete d0 ~ d9 with nonAligned property
+ createFiles(1, 10, 5, 100, 2000, 2000, 50, 50, false, true);
+ tsFileManager.add(seqResources.get(seqResources.size() - 1), true);
+
+ sourceData = readSourceFiles(timeseriesPaths, Collections.emptyList());
+
+ // sort the deviceId in lexicographical order from small to large
+ List<String> deviceIds = new ArrayList<>();
+ for (int i = 0; i < 30; i++) {
+ deviceIds.add("root.testsg.d" + (i + TsFileGeneratorUtils.getAlignDeviceOffset()));
+ }
+ deviceIds.sort(String::compareTo);
+
+ int deviceNum = 0;
+ try (MultiTsFileDeviceIterator multiTsFileDeviceIterator =
+ new MultiTsFileDeviceIterator(tsFileManager.getTsFileList(true))) {
+ while (multiTsFileDeviceIterator.hasNextDevice()) {
+ Pair<String, Boolean> deviceInfo = multiTsFileDeviceIterator.nextDevice();
+ Assert.assertEquals(deviceIds.get(deviceNum), deviceInfo.left);
+ Assert.assertFalse(deviceInfo.right);
+ deviceNum++;
+ }
+ }
+ Assert.assertEquals(30, deviceNum);
+ TsFileGeneratorUtils.alignDeviceOffset = oldAlignedDeviceOffset;
+ List<TsFileResource> targetResources =
+ CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(
+ tsFileManager.getTsFileList(true), true);
+ FastCompactionPerformer performer =
+ new FastCompactionPerformer(
+ tsFileManager.getTsFileList(true), Collections.emptyList(), targetResources);
+ performer.setSummary(new FastCompactionTaskSummary());
+ performer.perform();
+
+ CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
+ tsFileManager.replace(
+ tsFileManager.getTsFileList(true), Collections.emptyList(), targetResources, 0, true);
+ tsFileManager.getTsFileList(true).get(0).setStatus(TsFileResourceStatus.CLOSED);
+
+ validateSeqFiles(true);
+ validateTargetDatas(sourceData, Collections.emptyList());
+ }
+
+ /**
+ * Create device with aligned property. Deleted it and create new device with same deviceID but
+ * nonAligned property. Compact it. Then deleted it and create new device with same deviceID but
+ * aligned property. Check whether the deviceID and its property can be obtained correctly.
+ */
+ @Test
+ public void getDeletedDevicesWithSameNameFromSeqFilesByFastPerformer2()
+ throws MetadataException, IOException, WriteProcessException, StorageEngineException,
+ InterruptedException {
+ TSFileDescriptor.getInstance().getConfig().setMaxDegreeOfIndexNode(3);
+ int oldAlignedDeviceOffset = TsFileGeneratorUtils.alignDeviceOffset;
+ TsFileGeneratorUtils.alignDeviceOffset = 0;
+ registerTimeseriesInMManger(30, 5, true);
+ createFiles(3, 10, 5, 100, 0, 0, 50, 50, true, true);
+ createFiles(4, 30, 5, 100, 1000, 0, 50, 50, true, true);
+
+ // generate mods file, delete d0 ~ d9 with nonAligned property
+ List<String> seriesPaths = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 5; j++) {
+ seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j);
+ }
+ }
+ generateModsFile(seriesPaths, seqResources, Long.MIN_VALUE, Long.MAX_VALUE);
+ generateModsFile(seriesPaths, unseqResources, Long.MIN_VALUE, Long.MAX_VALUE);
+ deleteTimeseriesInMManager(seriesPaths);
+
+ // generate d0 ~ d9 with aligned property
+ createFiles(2, 10, 15, 100, 2000, 2000, 50, 50, false, true);
+ tsFileManager.addAll(seqResources, true);
+
+ List<PartialPath> timeseriesPaths = new ArrayList<>();
+ for (int i = 0; i < 30; i++) {
+ for (int j = 0; j < 15; j++) {
+ if (i < 10) {
+ timeseriesPaths.add(
+ new AlignedPath(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
+ Collections.singletonList("s" + j),
+ Collections.singletonList(new MeasurementSchema("s" + j, TSDataType.INT64))));
+ } else {
+ timeseriesPaths.add(
+ new MeasurementPath(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
+ TSDataType.INT64));
+ }
+ }
+ }
+ Map<PartialPath, List<TimeValuePair>> sourceData =
+ readSourceFiles(timeseriesPaths, Collections.emptyList());
+
+ InnerSpaceCompactionTask task =
+ new InnerSpaceCompactionTask(
+ 0,
+ tsFileManager,
+ tsFileManager.getTsFileList(true),
+ true,
+ new FastCompactionPerformer(false),
+ new AtomicInteger(),
+ 0L);
+ task.start();
+
+ validateSeqFiles(true);
+ validateTargetDatas(sourceData, Collections.emptyList());
+
+ // generate mods file, delete d0 ~ d9 with aligned property
+ seriesPaths.clear();
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 15; j++) {
+ seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j);
+ }
+ }
+ generateModsFile(
+ seriesPaths, tsFileManager.getTsFileList(true), Long.MIN_VALUE, Long.MAX_VALUE);
+
+ deleteTimeseriesInMManager(seriesPaths);
+
+ // generate mods file, delete d0 ~ d9 with nonAligned property
+ createFiles(1, 10, 5, 100, 2000, 2000, 50, 50, true, true);
+ tsFileManager.add(seqResources.get(seqResources.size() - 1), true);
+
+ sourceData = readSourceFiles(timeseriesPaths, Collections.emptyList());
+
+ // sort the deviceId in lexicographical order from small to large
+ List<String> deviceIds = new ArrayList<>();
+ for (int i = 0; i < 30; i++) {
+ deviceIds.add("root.testsg.d" + (i + TsFileGeneratorUtils.getAlignDeviceOffset()));
+ }
+ deviceIds.sort(String::compareTo);
+
+ int deviceNum = 0;
+ try (MultiTsFileDeviceIterator multiTsFileDeviceIterator =
+ new MultiTsFileDeviceIterator(tsFileManager.getTsFileList(true))) {
+ while (multiTsFileDeviceIterator.hasNextDevice()) {
+ Pair<String, Boolean> deviceInfo = multiTsFileDeviceIterator.nextDevice();
+ Assert.assertEquals(deviceIds.get(deviceNum), deviceInfo.left);
+ Assert.assertTrue(deviceInfo.right);
+ deviceNum++;
+ }
+ }
+ Assert.assertEquals(30, deviceNum);
+ TsFileGeneratorUtils.alignDeviceOffset = oldAlignedDeviceOffset;
+ List<TsFileResource> targetResources =
+ CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(
+ tsFileManager.getTsFileList(true), true);
+ FastCompactionPerformer performer =
+ new FastCompactionPerformer(
+ tsFileManager.getTsFileList(true), Collections.emptyList(), targetResources);
+ performer.setSummary(new FastCompactionTaskSummary());
+ performer.perform();
+
+ CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
+ tsFileManager.replace(
+ tsFileManager.getTsFileList(true), Collections.emptyList(), targetResources, 0, true);
+ tsFileManager.getTsFileList(true).get(0).setStatus(TsFileResourceStatus.CLOSED);
+
+ validateSeqFiles(true);
+ validateTargetDatas(sourceData, Collections.emptyList());
+ }
}