You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2023/01/11 13:56:42 UTC

[iotdb] branch rel/1.0 updated: [To rel/1.0][IOTDB-5392] accelerate pointPriorityReader in fast compaction #8811

This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch rel/1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.0 by this push:
     new ceb4276cd7 [To rel/1.0][IOTDB-5392] accelerate pointPriorityReader in fast compaction #8811
ceb4276cd7 is described below

commit ceb4276cd733214e8ee7e42f1c2c69797c75a05a
Author: 周沛辰 <45...@users.noreply.github.com>
AuthorDate: Wed Jan 11 21:56:37 2023 +0800

    [To rel/1.0][IOTDB-5392] accelerate pointPriorityReader in fast compaction #8811
---
 .../execute/utils/reader/PointPriorityReader.java  |  14 +--
 .../compaction/FastAlignedCrossCompactionTest.java | 134 ++++++++++++++++++++
 .../FastNonAlignedCrossCompactionTest.java         | 138 +++++++++++++++++++++
 3 files changed, 279 insertions(+), 7 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/reader/PointPriorityReader.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/reader/PointPriorityReader.java
index d1aa44d9cd..2bf063aafc 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/reader/PointPriorityReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/reader/PointPriorityReader.java
@@ -48,7 +48,7 @@ public class PointPriorityReader {
 
   private boolean shouldReadPointFromQueue = true;
 
-  private long nextPageStartTime = Long.MAX_VALUE;
+  private long nextPointInOtherPage = Long.MAX_VALUE;
 
   private PointElement currentPointElement;
 
@@ -128,7 +128,7 @@ public class PointPriorityReader {
       if (pointReader.hasNextTimeValuePair()) {
         // get the point directly if it is not overlapped with other points
         currentPoint = pointReader.nextTimeValuePair();
-        if (currentPoint.getTimestamp() >= nextPageStartTime) {
+        if (currentPoint.getTimestamp() >= nextPointInOtherPage) {
           // if the point is overlapped with other points, then add it into priority queue
           currentPointElement.setPoint(currentPoint);
           pointQueue.add(currentPointElement);
@@ -152,9 +152,9 @@ public class PointPriorityReader {
           IPointReader pointReader = pointElement.pointReader;
           if (pointReader.hasNextTimeValuePair()) {
             pointElement.setPoint(pointReader.nextTimeValuePair());
-            nextPageStartTime =
-                pointQueue.size() > 0 ? pointQueue.peek().pageElement.startTime : Long.MAX_VALUE;
-            if (pointElement.timestamp < nextPageStartTime) {
+            nextPointInOtherPage =
+                pointQueue.size() > 0 ? pointQueue.peek().timestamp : Long.MAX_VALUE;
+            if (pointElement.timestamp < nextPointInOtherPage) {
               currentPointElement = pointElement;
               currentPoint = currentPointElement.timeValuePair;
             } else {
@@ -177,8 +177,8 @@ public class PointPriorityReader {
   /** Add a new overlapped page. */
   public void addNewPage(PageElement pageElement) throws IOException {
     if (currentPointElement != null) {
-      nextPageStartTime = Math.min(nextPageStartTime, pageElement.startTime);
-      if (currentPoint.getTimestamp() >= nextPageStartTime) {
+      nextPointInOtherPage = Math.min(nextPointInOtherPage, pageElement.startTime);
+      if (currentPoint.getTimestamp() >= nextPointInOtherPage) {
         currentPointElement.setPoint(currentPoint);
         pointQueue.add(currentPointElement);
         currentPointElement = null;
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/FastAlignedCrossCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/FastAlignedCrossCompactionTest.java
index 2373948f14..016b5b0739 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/FastAlignedCrossCompactionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/FastAlignedCrossCompactionTest.java
@@ -6678,6 +6678,140 @@ public class FastAlignedCrossCompactionTest extends AbstractCompactionTest {
     validateTargetDatas(sourceDatas, tsDataTypes);
   }
 
+  @Test
+  public void test21() throws IOException, IllegalPathException {
+    List<PartialPath> timeserisPathList = new ArrayList<>();
+    List<TSDataType> tsDataTypes = new ArrayList<>();
+    // seq file 1
+    int deviceNum = 10;
+    int measurementNum = 10;
+    TsFileResource resource = createEmptyFileAndResource(true);
+    try (TsFileIOWriter tsFileIOWriter = new TsFileIOWriter(resource.getTsFile())) {
+      // write the data in device
+      for (int deviceIndex = 0; deviceIndex < deviceNum; deviceIndex++) {
+        tsFileIOWriter.startChunkGroup(testStorageGroup + PATH_SEPARATOR + "d" + deviceIndex);
+
+        List<TSDataType> dataTypes = createDataType(measurementNum);
+        List<Integer> measurementIndexes = new ArrayList<>();
+        for (int i = 0; i < measurementNum; i++) {
+          measurementIndexes.add(i);
+        }
+        List<PartialPath> timeseriesPath =
+            createTimeseries(deviceIndex, measurementIndexes, dataTypes, true);
+
+        // write first chunk
+        List<TimeRange> pages = new ArrayList<>();
+        pages.add(new TimeRange(0, 200));
+        pages.add(new TimeRange(350, 450));
+        pages.add(new TimeRange(600, 800));
+        pages.add(new TimeRange(900, 1100));
+        pages.add(new TimeRange(1400, 1600));
+
+        List<IChunkWriter> iChunkWriters = createChunkWriter(timeseriesPath, dataTypes, true);
+        for (IChunkWriter iChunkWriter : iChunkWriters) {
+          writeAlignedChunk((AlignedChunkWriterImpl) iChunkWriter, tsFileIOWriter, pages, true);
+        }
+
+        tsFileIOWriter.endChunkGroup();
+        resource.updateStartTime(testStorageGroup + PATH_SEPARATOR + "d" + deviceIndex, 0);
+        resource.updateEndTime(testStorageGroup + PATH_SEPARATOR + "d" + deviceIndex, 1600);
+        timeserisPathList.addAll(timeseriesPath);
+        tsDataTypes.addAll(dataTypes);
+      }
+      tsFileIOWriter.endFile();
+    }
+    resource.serialize();
+    seqResources.add(resource);
+
+    // unseq file 1
+    deviceNum = 15;
+    measurementNum = 5;
+    resource = createEmptyFileAndResource(false);
+    try (TsFileIOWriter tsFileIOWriter = new TsFileIOWriter(resource.getTsFile())) {
+      // write the data in device
+      for (int deviceIndex = 0; deviceIndex < deviceNum; deviceIndex++) {
+        tsFileIOWriter.startChunkGroup(testStorageGroup + PATH_SEPARATOR + "d" + deviceIndex);
+
+        List<TSDataType> dataTypes = createDataType(measurementNum);
+        List<Integer> measurementIndexes = new ArrayList<>();
+        for (int i = 0; i < measurementNum; i++) {
+          measurementIndexes.add(i);
+        }
+        List<PartialPath> timeseriesPath =
+            createTimeseries(deviceIndex, measurementIndexes, dataTypes, true);
+
+        // write first chunk
+        List<TimeRange> timeRanges = new ArrayList<>();
+
+        List<IChunkWriter> iChunkWriters = createChunkWriter(timeseriesPath, dataTypes, true);
+        for (IChunkWriter iChunkWriter : iChunkWriters) {
+          // first page
+          timeRanges.clear();
+          timeRanges.add(new TimeRange(0, 0));
+          timeRanges.add(new TimeRange(200, 200));
+          writeOneAlignedPage((AlignedChunkWriterImpl) iChunkWriter, timeRanges, false);
+
+          // second page
+          timeRanges.clear();
+          timeRanges.add(new TimeRange(300, 300));
+          timeRanges.add(new TimeRange(500, 500));
+          writeOneAlignedPage((AlignedChunkWriterImpl) iChunkWriter, timeRanges, false);
+
+          // third page
+          timeRanges.clear();
+          timeRanges.add(new TimeRange(650, 650));
+          timeRanges.add(new TimeRange(750, 750));
+          writeOneAlignedPage((AlignedChunkWriterImpl) iChunkWriter, timeRanges, false);
+
+          // forth page
+          timeRanges.clear();
+          timeRanges.add(new TimeRange(1000, 1000));
+          timeRanges.add(new TimeRange(1200, 1200));
+          writeOneAlignedPage((AlignedChunkWriterImpl) iChunkWriter, timeRanges, false);
+
+          // fifth page
+          timeRanges.clear();
+          timeRanges.add(new TimeRange(1300, 1300));
+          timeRanges.add(new TimeRange(1500, 1500));
+          writeOneAlignedPage((AlignedChunkWriterImpl) iChunkWriter, timeRanges, false);
+
+          iChunkWriter.writeToFileWriter(tsFileIOWriter);
+        }
+
+        tsFileIOWriter.endChunkGroup();
+        resource.updateStartTime(testStorageGroup + PATH_SEPARATOR + "d" + deviceIndex, 0);
+        resource.updateEndTime(testStorageGroup + PATH_SEPARATOR + "d" + deviceIndex, 1500);
+        timeserisPathList.addAll(timeseriesPath);
+        tsDataTypes.addAll(dataTypes);
+      }
+      tsFileIOWriter.endFile();
+    }
+    resource.serialize();
+    unseqResources.add(resource);
+
+    Map<PartialPath, List<TimeValuePair>> sourceDatas =
+        readSourceFiles(timeserisPathList, tsDataTypes);
+
+    // start compacting
+    tsFileManager.addAll(seqResources, true);
+    tsFileManager.addAll(unseqResources, false);
+    CrossSpaceCompactionTask task =
+        new CrossSpaceCompactionTask(
+            0,
+            tsFileManager,
+            seqResources,
+            unseqResources,
+            new FastCompactionPerformer(true),
+            new AtomicInteger(0),
+            0,
+            0);
+    task.start();
+
+    validateSeqFiles(true);
+
+    validateTargetDatas(sourceDatas, tsDataTypes);
+  }
+
   private List<TimeRange> createPages(long startTime, long endTime, int pagePointNum) {
     List<TimeRange> pages = new ArrayList<>();
     for (long i = startTime; i <= endTime; i += pagePointNum) {
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/FastNonAlignedCrossCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/FastNonAlignedCrossCompactionTest.java
index 6dcf685639..293687a8b0 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/FastNonAlignedCrossCompactionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/FastNonAlignedCrossCompactionTest.java
@@ -6644,6 +6644,144 @@ public class FastNonAlignedCrossCompactionTest extends AbstractCompactionTest {
     validateTargetDatas(sourceDatas, tsDataTypes);
   }
 
+  @Test
+  public void test21() throws IOException, IllegalPathException {
+    List<PartialPath> timeserisPathList = new ArrayList<>();
+    List<TSDataType> tsDataTypes = new ArrayList<>();
+    // seq file 1
+    int deviceNum = 10;
+    int measurementNum = 10;
+    TsFileResource resource = createEmptyFileAndResource(true);
+    try (TsFileIOWriter tsFileIOWriter = new TsFileIOWriter(resource.getTsFile())) {
+      // write the data in device
+      for (int deviceIndex = 0; deviceIndex < deviceNum; deviceIndex++) {
+        tsFileIOWriter.startChunkGroup(testStorageGroup + PATH_SEPARATOR + "d" + deviceIndex);
+
+        List<TSDataType> dataTypes = createDataType(measurementNum);
+        List<Integer> measurementIndexes = new ArrayList<>();
+        for (int i = 0; i < measurementNum; i++) {
+          measurementIndexes.add(i);
+        }
+        List<PartialPath> timeseriesPath =
+            createTimeseries(deviceIndex, measurementIndexes, dataTypes, false);
+
+        // write first chunk
+        List<TimeRange> pages = new ArrayList<>();
+        pages.add(new TimeRange(0, 200));
+        pages.add(new TimeRange(350, 450));
+        pages.add(new TimeRange(600, 800));
+        pages.add(new TimeRange(900, 1100));
+        pages.add(new TimeRange(1400, 1600));
+
+        List<IChunkWriter> iChunkWriters = createChunkWriter(timeseriesPath, dataTypes, false);
+        for (IChunkWriter iChunkWriter : iChunkWriters) {
+          writeNonAlignedChunk((ChunkWriterImpl) iChunkWriter, tsFileIOWriter, pages, true);
+        }
+
+        tsFileIOWriter.endChunkGroup();
+        resource.updateStartTime(testStorageGroup + PATH_SEPARATOR + "d" + deviceIndex, 0);
+        resource.updateEndTime(testStorageGroup + PATH_SEPARATOR + "d" + deviceIndex, 1600);
+        timeserisPathList.addAll(timeseriesPath);
+        tsDataTypes.addAll(dataTypes);
+      }
+      tsFileIOWriter.endFile();
+    }
+    resource.serialize();
+    seqResources.add(resource);
+
+    // unseq file 1
+    deviceNum = 15;
+    measurementNum = 5;
+    resource = createEmptyFileAndResource(false);
+    try (TsFileIOWriter tsFileIOWriter = new TsFileIOWriter(resource.getTsFile())) {
+      // write the data in device
+      for (int deviceIndex = 0; deviceIndex < deviceNum; deviceIndex++) {
+        tsFileIOWriter.startChunkGroup(testStorageGroup + PATH_SEPARATOR + "d" + deviceIndex);
+
+        List<TSDataType> dataTypes = createDataType(measurementNum);
+        List<Integer> measurementIndexes = new ArrayList<>();
+        for (int i = 0; i < measurementNum; i++) {
+          measurementIndexes.add(i);
+        }
+        List<PartialPath> timeseriesPath =
+            createTimeseries(deviceIndex, measurementIndexes, dataTypes, false);
+
+        // write first chunk
+        List<TimeRange> timeRanges = new ArrayList<>();
+
+        List<IChunkWriter> iChunkWriters = createChunkWriter(timeseriesPath, dataTypes, false);
+        for (IChunkWriter iChunkWriter : iChunkWriters) {
+          // first page
+          timeRanges.clear();
+          timeRanges.add(new TimeRange(0, 0));
+          timeRanges.add(new TimeRange(200, 200));
+          writeOneNonAlignedPage((ChunkWriterImpl) iChunkWriter, timeRanges, false);
+
+          // second page
+          timeRanges.clear();
+          timeRanges.add(new TimeRange(300, 300));
+          timeRanges.add(new TimeRange(500, 500));
+          writeOneNonAlignedPage((ChunkWriterImpl) iChunkWriter, timeRanges, false);
+
+          // third page
+          timeRanges.clear();
+          timeRanges.add(new TimeRange(650, 650));
+          timeRanges.add(new TimeRange(750, 750));
+          writeOneNonAlignedPage((ChunkWriterImpl) iChunkWriter, timeRanges, false);
+
+          // forth page
+          timeRanges.clear();
+          timeRanges.add(new TimeRange(1000, 1000));
+          timeRanges.add(new TimeRange(1200, 1200));
+          writeOneNonAlignedPage((ChunkWriterImpl) iChunkWriter, timeRanges, false);
+
+          // fifth page
+          timeRanges.clear();
+          timeRanges.add(new TimeRange(1300, 1300));
+          timeRanges.add(new TimeRange(1500, 1500));
+          writeOneNonAlignedPage((ChunkWriterImpl) iChunkWriter, timeRanges, false);
+
+          iChunkWriter.writeToFileWriter(tsFileIOWriter);
+        }
+
+        tsFileIOWriter.endChunkGroup();
+        resource.updateStartTime(testStorageGroup + PATH_SEPARATOR + "d" + deviceIndex, 0);
+        resource.updateEndTime(testStorageGroup + PATH_SEPARATOR + "d" + deviceIndex, 1500);
+        timeserisPathList.addAll(timeseriesPath);
+        tsDataTypes.addAll(dataTypes);
+      }
+      tsFileIOWriter.endFile();
+    }
+    resource.serialize();
+    unseqResources.add(resource);
+
+    Map<PartialPath, List<TimeValuePair>> sourceDatas =
+        readSourceFiles(timeserisPathList, tsDataTypes);
+
+    // start compacting
+    tsFileManager.addAll(seqResources, true);
+    tsFileManager.addAll(unseqResources, false);
+    CrossSpaceCompactionTask task =
+        new CrossSpaceCompactionTask(
+            0,
+            tsFileManager,
+            seqResources,
+            unseqResources,
+            new FastCompactionPerformer(true),
+            new AtomicInteger(0),
+            0,
+            0);
+    task.start();
+
+    validateSeqFiles(true);
+
+    validateTargetDatas(sourceDatas, tsDataTypes);
+  }
+
+  /**
+   * Create pages from startTime to endTime and each page has pagePointNum points except the last
+   * page.
+   */
   private List<TimeRange> createPages(long startTime, long endTime, int pagePointNum) {
     List<TimeRange> pages = new ArrayList<>();
     for (long i = startTime; i <= endTime; i += pagePointNum) {