You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2023/01/11 13:56:42 UTC
[iotdb] branch rel/1.0 updated: [To rel/1.0][IOTDB-5392] accelerate pointPriorityReader in fast compaction #8811
This is an automated email from the ASF dual-hosted git repository.
marklau99 pushed a commit to branch rel/1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.0 by this push:
new ceb4276cd7 [To rel/1.0][IOTDB-5392] accelerate pointPriorityReader in fast compaction #8811
ceb4276cd7 is described below
commit ceb4276cd733214e8ee7e42f1c2c69797c75a05a
Author: 周沛辰 <45...@users.noreply.github.com>
AuthorDate: Wed Jan 11 21:56:37 2023 +0800
[To rel/1.0][IOTDB-5392] accelerate pointPriorityReader in fast compaction #8811
---
.../execute/utils/reader/PointPriorityReader.java | 14 +--
.../compaction/FastAlignedCrossCompactionTest.java | 134 ++++++++++++++++++++
.../FastNonAlignedCrossCompactionTest.java | 138 +++++++++++++++++++++
3 files changed, 279 insertions(+), 7 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/reader/PointPriorityReader.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/reader/PointPriorityReader.java
index d1aa44d9cd..2bf063aafc 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/reader/PointPriorityReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/reader/PointPriorityReader.java
@@ -48,7 +48,7 @@ public class PointPriorityReader {
private boolean shouldReadPointFromQueue = true;
- private long nextPageStartTime = Long.MAX_VALUE;
+ private long nextPointInOtherPage = Long.MAX_VALUE;
private PointElement currentPointElement;
@@ -128,7 +128,7 @@ public class PointPriorityReader {
if (pointReader.hasNextTimeValuePair()) {
// get the point directly if it is not overlapped with other points
currentPoint = pointReader.nextTimeValuePair();
- if (currentPoint.getTimestamp() >= nextPageStartTime) {
+ if (currentPoint.getTimestamp() >= nextPointInOtherPage) {
// if the point is overlapped with other points, then add it into priority queue
currentPointElement.setPoint(currentPoint);
pointQueue.add(currentPointElement);
@@ -152,9 +152,9 @@ public class PointPriorityReader {
IPointReader pointReader = pointElement.pointReader;
if (pointReader.hasNextTimeValuePair()) {
pointElement.setPoint(pointReader.nextTimeValuePair());
- nextPageStartTime =
- pointQueue.size() > 0 ? pointQueue.peek().pageElement.startTime : Long.MAX_VALUE;
- if (pointElement.timestamp < nextPageStartTime) {
+ nextPointInOtherPage =
+ pointQueue.size() > 0 ? pointQueue.peek().timestamp : Long.MAX_VALUE;
+ if (pointElement.timestamp < nextPointInOtherPage) {
currentPointElement = pointElement;
currentPoint = currentPointElement.timeValuePair;
} else {
@@ -177,8 +177,8 @@ public class PointPriorityReader {
/** Add a new overlapped page. */
public void addNewPage(PageElement pageElement) throws IOException {
if (currentPointElement != null) {
- nextPageStartTime = Math.min(nextPageStartTime, pageElement.startTime);
- if (currentPoint.getTimestamp() >= nextPageStartTime) {
+ nextPointInOtherPage = Math.min(nextPointInOtherPage, pageElement.startTime);
+ if (currentPoint.getTimestamp() >= nextPointInOtherPage) {
currentPointElement.setPoint(currentPoint);
pointQueue.add(currentPointElement);
currentPointElement = null;
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/FastAlignedCrossCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/FastAlignedCrossCompactionTest.java
index 2373948f14..016b5b0739 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/FastAlignedCrossCompactionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/FastAlignedCrossCompactionTest.java
@@ -6678,6 +6678,140 @@ public class FastAlignedCrossCompactionTest extends AbstractCompactionTest {
validateTargetDatas(sourceDatas, tsDataTypes);
}
+ @Test
+ public void test21() throws IOException, IllegalPathException {
+ List<PartialPath> timeserisPathList = new ArrayList<>();
+ List<TSDataType> tsDataTypes = new ArrayList<>();
+ // seq file 1
+ int deviceNum = 10;
+ int measurementNum = 10;
+ TsFileResource resource = createEmptyFileAndResource(true);
+ try (TsFileIOWriter tsFileIOWriter = new TsFileIOWriter(resource.getTsFile())) {
+ // write the data in device
+ for (int deviceIndex = 0; deviceIndex < deviceNum; deviceIndex++) {
+ tsFileIOWriter.startChunkGroup(testStorageGroup + PATH_SEPARATOR + "d" + deviceIndex);
+
+ List<TSDataType> dataTypes = createDataType(measurementNum);
+ List<Integer> measurementIndexes = new ArrayList<>();
+ for (int i = 0; i < measurementNum; i++) {
+ measurementIndexes.add(i);
+ }
+ List<PartialPath> timeseriesPath =
+ createTimeseries(deviceIndex, measurementIndexes, dataTypes, true);
+
+ // write first chunk
+ List<TimeRange> pages = new ArrayList<>();
+ pages.add(new TimeRange(0, 200));
+ pages.add(new TimeRange(350, 450));
+ pages.add(new TimeRange(600, 800));
+ pages.add(new TimeRange(900, 1100));
+ pages.add(new TimeRange(1400, 1600));
+
+ List<IChunkWriter> iChunkWriters = createChunkWriter(timeseriesPath, dataTypes, true);
+ for (IChunkWriter iChunkWriter : iChunkWriters) {
+ writeAlignedChunk((AlignedChunkWriterImpl) iChunkWriter, tsFileIOWriter, pages, true);
+ }
+
+ tsFileIOWriter.endChunkGroup();
+ resource.updateStartTime(testStorageGroup + PATH_SEPARATOR + "d" + deviceIndex, 0);
+ resource.updateEndTime(testStorageGroup + PATH_SEPARATOR + "d" + deviceIndex, 1600);
+ timeserisPathList.addAll(timeseriesPath);
+ tsDataTypes.addAll(dataTypes);
+ }
+ tsFileIOWriter.endFile();
+ }
+ resource.serialize();
+ seqResources.add(resource);
+
+ // unseq file 1
+ deviceNum = 15;
+ measurementNum = 5;
+ resource = createEmptyFileAndResource(false);
+ try (TsFileIOWriter tsFileIOWriter = new TsFileIOWriter(resource.getTsFile())) {
+ // write the data in device
+ for (int deviceIndex = 0; deviceIndex < deviceNum; deviceIndex++) {
+ tsFileIOWriter.startChunkGroup(testStorageGroup + PATH_SEPARATOR + "d" + deviceIndex);
+
+ List<TSDataType> dataTypes = createDataType(measurementNum);
+ List<Integer> measurementIndexes = new ArrayList<>();
+ for (int i = 0; i < measurementNum; i++) {
+ measurementIndexes.add(i);
+ }
+ List<PartialPath> timeseriesPath =
+ createTimeseries(deviceIndex, measurementIndexes, dataTypes, true);
+
+ // write first chunk
+ List<TimeRange> timeRanges = new ArrayList<>();
+
+ List<IChunkWriter> iChunkWriters = createChunkWriter(timeseriesPath, dataTypes, true);
+ for (IChunkWriter iChunkWriter : iChunkWriters) {
+ // first page
+ timeRanges.clear();
+ timeRanges.add(new TimeRange(0, 0));
+ timeRanges.add(new TimeRange(200, 200));
+ writeOneAlignedPage((AlignedChunkWriterImpl) iChunkWriter, timeRanges, false);
+
+ // second page
+ timeRanges.clear();
+ timeRanges.add(new TimeRange(300, 300));
+ timeRanges.add(new TimeRange(500, 500));
+ writeOneAlignedPage((AlignedChunkWriterImpl) iChunkWriter, timeRanges, false);
+
+ // third page
+ timeRanges.clear();
+ timeRanges.add(new TimeRange(650, 650));
+ timeRanges.add(new TimeRange(750, 750));
+ writeOneAlignedPage((AlignedChunkWriterImpl) iChunkWriter, timeRanges, false);
+
+ // forth page
+ timeRanges.clear();
+ timeRanges.add(new TimeRange(1000, 1000));
+ timeRanges.add(new TimeRange(1200, 1200));
+ writeOneAlignedPage((AlignedChunkWriterImpl) iChunkWriter, timeRanges, false);
+
+ // fifth page
+ timeRanges.clear();
+ timeRanges.add(new TimeRange(1300, 1300));
+ timeRanges.add(new TimeRange(1500, 1500));
+ writeOneAlignedPage((AlignedChunkWriterImpl) iChunkWriter, timeRanges, false);
+
+ iChunkWriter.writeToFileWriter(tsFileIOWriter);
+ }
+
+ tsFileIOWriter.endChunkGroup();
+ resource.updateStartTime(testStorageGroup + PATH_SEPARATOR + "d" + deviceIndex, 0);
+ resource.updateEndTime(testStorageGroup + PATH_SEPARATOR + "d" + deviceIndex, 1500);
+ timeserisPathList.addAll(timeseriesPath);
+ tsDataTypes.addAll(dataTypes);
+ }
+ tsFileIOWriter.endFile();
+ }
+ resource.serialize();
+ unseqResources.add(resource);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
+
+ // start compacting
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+ CrossSpaceCompactionTask task =
+ new CrossSpaceCompactionTask(
+ 0,
+ tsFileManager,
+ seqResources,
+ unseqResources,
+ new FastCompactionPerformer(true),
+ new AtomicInteger(0),
+ 0,
+ 0);
+ task.start();
+
+ validateSeqFiles(true);
+
+ validateTargetDatas(sourceDatas, tsDataTypes);
+ }
+
private List<TimeRange> createPages(long startTime, long endTime, int pagePointNum) {
List<TimeRange> pages = new ArrayList<>();
for (long i = startTime; i <= endTime; i += pagePointNum) {
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/FastNonAlignedCrossCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/FastNonAlignedCrossCompactionTest.java
index 6dcf685639..293687a8b0 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/FastNonAlignedCrossCompactionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/FastNonAlignedCrossCompactionTest.java
@@ -6644,6 +6644,144 @@ public class FastNonAlignedCrossCompactionTest extends AbstractCompactionTest {
validateTargetDatas(sourceDatas, tsDataTypes);
}
+ @Test
+ public void test21() throws IOException, IllegalPathException {
+ List<PartialPath> timeserisPathList = new ArrayList<>();
+ List<TSDataType> tsDataTypes = new ArrayList<>();
+ // seq file 1
+ int deviceNum = 10;
+ int measurementNum = 10;
+ TsFileResource resource = createEmptyFileAndResource(true);
+ try (TsFileIOWriter tsFileIOWriter = new TsFileIOWriter(resource.getTsFile())) {
+ // write the data in device
+ for (int deviceIndex = 0; deviceIndex < deviceNum; deviceIndex++) {
+ tsFileIOWriter.startChunkGroup(testStorageGroup + PATH_SEPARATOR + "d" + deviceIndex);
+
+ List<TSDataType> dataTypes = createDataType(measurementNum);
+ List<Integer> measurementIndexes = new ArrayList<>();
+ for (int i = 0; i < measurementNum; i++) {
+ measurementIndexes.add(i);
+ }
+ List<PartialPath> timeseriesPath =
+ createTimeseries(deviceIndex, measurementIndexes, dataTypes, false);
+
+ // write first chunk
+ List<TimeRange> pages = new ArrayList<>();
+ pages.add(new TimeRange(0, 200));
+ pages.add(new TimeRange(350, 450));
+ pages.add(new TimeRange(600, 800));
+ pages.add(new TimeRange(900, 1100));
+ pages.add(new TimeRange(1400, 1600));
+
+ List<IChunkWriter> iChunkWriters = createChunkWriter(timeseriesPath, dataTypes, false);
+ for (IChunkWriter iChunkWriter : iChunkWriters) {
+ writeNonAlignedChunk((ChunkWriterImpl) iChunkWriter, tsFileIOWriter, pages, true);
+ }
+
+ tsFileIOWriter.endChunkGroup();
+ resource.updateStartTime(testStorageGroup + PATH_SEPARATOR + "d" + deviceIndex, 0);
+ resource.updateEndTime(testStorageGroup + PATH_SEPARATOR + "d" + deviceIndex, 1600);
+ timeserisPathList.addAll(timeseriesPath);
+ tsDataTypes.addAll(dataTypes);
+ }
+ tsFileIOWriter.endFile();
+ }
+ resource.serialize();
+ seqResources.add(resource);
+
+ // unseq file 1
+ deviceNum = 15;
+ measurementNum = 5;
+ resource = createEmptyFileAndResource(false);
+ try (TsFileIOWriter tsFileIOWriter = new TsFileIOWriter(resource.getTsFile())) {
+ // write the data in device
+ for (int deviceIndex = 0; deviceIndex < deviceNum; deviceIndex++) {
+ tsFileIOWriter.startChunkGroup(testStorageGroup + PATH_SEPARATOR + "d" + deviceIndex);
+
+ List<TSDataType> dataTypes = createDataType(measurementNum);
+ List<Integer> measurementIndexes = new ArrayList<>();
+ for (int i = 0; i < measurementNum; i++) {
+ measurementIndexes.add(i);
+ }
+ List<PartialPath> timeseriesPath =
+ createTimeseries(deviceIndex, measurementIndexes, dataTypes, false);
+
+ // write first chunk
+ List<TimeRange> timeRanges = new ArrayList<>();
+
+ List<IChunkWriter> iChunkWriters = createChunkWriter(timeseriesPath, dataTypes, false);
+ for (IChunkWriter iChunkWriter : iChunkWriters) {
+ // first page
+ timeRanges.clear();
+ timeRanges.add(new TimeRange(0, 0));
+ timeRanges.add(new TimeRange(200, 200));
+ writeOneNonAlignedPage((ChunkWriterImpl) iChunkWriter, timeRanges, false);
+
+ // second page
+ timeRanges.clear();
+ timeRanges.add(new TimeRange(300, 300));
+ timeRanges.add(new TimeRange(500, 500));
+ writeOneNonAlignedPage((ChunkWriterImpl) iChunkWriter, timeRanges, false);
+
+ // third page
+ timeRanges.clear();
+ timeRanges.add(new TimeRange(650, 650));
+ timeRanges.add(new TimeRange(750, 750));
+ writeOneNonAlignedPage((ChunkWriterImpl) iChunkWriter, timeRanges, false);
+
+ // forth page
+ timeRanges.clear();
+ timeRanges.add(new TimeRange(1000, 1000));
+ timeRanges.add(new TimeRange(1200, 1200));
+ writeOneNonAlignedPage((ChunkWriterImpl) iChunkWriter, timeRanges, false);
+
+ // fifth page
+ timeRanges.clear();
+ timeRanges.add(new TimeRange(1300, 1300));
+ timeRanges.add(new TimeRange(1500, 1500));
+ writeOneNonAlignedPage((ChunkWriterImpl) iChunkWriter, timeRanges, false);
+
+ iChunkWriter.writeToFileWriter(tsFileIOWriter);
+ }
+
+ tsFileIOWriter.endChunkGroup();
+ resource.updateStartTime(testStorageGroup + PATH_SEPARATOR + "d" + deviceIndex, 0);
+ resource.updateEndTime(testStorageGroup + PATH_SEPARATOR + "d" + deviceIndex, 1500);
+ timeserisPathList.addAll(timeseriesPath);
+ tsDataTypes.addAll(dataTypes);
+ }
+ tsFileIOWriter.endFile();
+ }
+ resource.serialize();
+ unseqResources.add(resource);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(timeserisPathList, tsDataTypes);
+
+ // start compacting
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+ CrossSpaceCompactionTask task =
+ new CrossSpaceCompactionTask(
+ 0,
+ tsFileManager,
+ seqResources,
+ unseqResources,
+ new FastCompactionPerformer(true),
+ new AtomicInteger(0),
+ 0,
+ 0);
+ task.start();
+
+ validateSeqFiles(true);
+
+ validateTargetDatas(sourceDatas, tsDataTypes);
+ }
+
+ /**
+ * Create pages from startTime to endTime and each page has pagePointNum points except the last
+ * page.
+ */
private List<TimeRange> createPages(long startTime, long endTime, int pagePointNum) {
List<TimeRange> pages = new ArrayList<>();
for (long i = startTime; i <= endTime; i += pagePointNum) {