You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2022/10/25 07:09:29 UTC

[GitHub] [iotdb] choubenson commented on a diff in pull request #7621: [IOTDB-3928][IOTDB-4097]New Compaction Performer —— Fast Compaction

choubenson commented on code in PR #7621:
URL: https://github.com/apache/iotdb/pull/7621#discussion_r1004085989


##########
server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/FastCompactionPerformerSubTask.java:
##########
@@ -0,0 +1,569 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.compaction.cross.rewrite.task;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.engine.compaction.cross.utils.ChunkMetadataElement;
+import org.apache.iotdb.db.engine.compaction.cross.utils.FileElement;
+import org.apache.iotdb.db.engine.compaction.cross.utils.PageElement;
+import org.apache.iotdb.db.engine.compaction.reader.PointPriorityReader;
+import org.apache.iotdb.db.engine.compaction.writer.FastCrossCompactionWriter;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.WriteProcessException;
+import org.apache.iotdb.tsfile.exception.write.PageException;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.reader.chunk.AlignedChunkReader;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.concurrent.Callable;
+
+public abstract class FastCompactionPerformerSubTask implements Callable<Void> {
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
+
+  @FunctionalInterface
+  public interface RemovePage {
+    void call(PageElement pageElement)
+        throws WriteProcessException, IOException, IllegalPathException;
+  }
+
+  // sorted source files by the start time of device
+  protected List<FileElement> fileList;
+
+  protected final PriorityQueue<ChunkMetadataElement> chunkMetadataQueue;
+
+  protected final PriorityQueue<PageElement> pageQueue;
+
+  protected FastCrossCompactionWriter compactionWriter;
+
+  protected int subTaskId;
+
+  // measurement -> tsfile resource -> timeseries metadata <startOffset, endOffset>
+  // used to get the chunk metadatas from tsfile directly according to timeseries metadata offset.
+  protected Map<String, Map<TsFileResource, Pair<Long, Long>>> timeseriesMetadataOffsetMap;
+
+  protected Map<TsFileResource, TsFileSequenceReader> readerCacheMap;
+
+  private final Map<TsFileResource, List<Modification>> modificationCacheMap;
+
+  // source files which are sorted by the start time of current device from old to new. Notice: If
+  // the type of timeIndex is FileTimeIndex, it may contain resources in which the current device
+  // does not exist.
+  protected List<TsFileResource> sortedSourceFiles;
+
+  private final PointPriorityReader pointPriorityReader = new PointPriorityReader(this::removePage);
+
+  private final boolean isAligned;
+
+  protected String deviceId;
+
+  public FastCompactionPerformerSubTask(
+      FastCrossCompactionWriter compactionWriter,
+      Map<String, Map<TsFileResource, Pair<Long, Long>>> timeseriesMetadataOffsetMap,
+      Map<TsFileResource, TsFileSequenceReader> readerCacheMap,
+      Map<TsFileResource, List<Modification>> modificationCacheMap,
+      List<TsFileResource> sortedSourceFiles,
+      String deviceId,
+      boolean isAligned,
+      int subTaskId) {
+    this.compactionWriter = compactionWriter;
+    this.subTaskId = subTaskId;
+    this.timeseriesMetadataOffsetMap = timeseriesMetadataOffsetMap;
+    this.isAligned = isAligned;
+    this.deviceId = deviceId;
+    this.readerCacheMap = readerCacheMap;
+    this.modificationCacheMap = modificationCacheMap;
+    this.sortedSourceFiles = sortedSourceFiles;
+
+    this.fileList = new ArrayList<>();
+    chunkMetadataQueue =
+        new PriorityQueue<>(
+            (o1, o2) -> {
+              int timeCompare = Long.compare(o1.startTime, o2.startTime);
+              return timeCompare != 0 ? timeCompare : Integer.compare(o2.priority, o1.priority);
+            });
+
+    pageQueue =
+        new PriorityQueue<>(
+            (o1, o2) -> {
+              int timeCompare = Long.compare(o1.startTime, o2.startTime);
+              return timeCompare != 0 ? timeCompare : Integer.compare(o2.priority, o1.priority);
+            });
+  }
+
+  protected void compactFiles()
+      throws PageException, IOException, WriteProcessException, IllegalPathException {
+    while (!fileList.isEmpty()) {
+      List<FileElement> overlappedFiles = findOverlapFiles(fileList.get(0));
+
+      // read chunk metadatas from files and put them into chunk metadata queue
+      deserializeFileIntoQueue(overlappedFiles);
+
+      if (!isAligned) {
+        // for nonAligned sensors, only after getting chunkMetadatas can we create schema to start
+        // measurement; for aligned sensors, we get all schemas of value sensors and
+        // startMeasurement() in the previous process, because we need to get all chunk metadatas of
+        // sensors and their schemas under the current device, but since the compaction process is
+        // to read a batch of overlapped files each time, which may not contain all the sensors.
+        startMeasurement();
+      }
+
+      compactChunks();
+    }
+  }
+
+  protected abstract void startMeasurement() throws IOException;
+
+  /**
+   * Compact chunks in chunk metadata queue.
+   *
+   * @throws IOException
+   * @throws PageException
+   */
+  private void compactChunks()
+      throws IOException, PageException, WriteProcessException, IllegalPathException {
+    while (!chunkMetadataQueue.isEmpty()) {
+      ChunkMetadataElement firstChunkMetadataElement = chunkMetadataQueue.peek();
+      List<ChunkMetadataElement> overlappedChunkMetadatas =
+          findOverlapChunkMetadatas(firstChunkMetadataElement);
+      boolean isChunkOverlap = overlappedChunkMetadatas.size() > 1;
+      boolean isModified = isChunkModified(firstChunkMetadataElement);
+
+      if (isChunkOverlap || isModified) {
+        // has overlap or modified chunk, then deserialize it
+        compactWithOverlapChunks(overlappedChunkMetadatas);
+      } else {
+        // has none overlap or modified chunk, flush it to file writer directly
+        compactWithNonOverlapChunks(firstChunkMetadataElement);
+      }
+    }
+  }
+
+  /**
+   * Deserialize chunks and start compacting pages. Compact a series of chunks that overlap with
+   * each other. Eg: The parameters are chunk 1 and chunk 2, that is, chunk 1 only overlaps with
+   * chunk 2, while chunk 2 overlap with chunk 3, chunk 3 overlap with chunk 4,and so on, there are
+   * 10 chunks in total. This method will merge all 10 chunks.
+   */
+  private void compactWithOverlapChunks(List<ChunkMetadataElement> overlappedChunkMetadatas)
+      throws IOException, PageException, WriteProcessException, IllegalPathException {
+    for (ChunkMetadataElement overlappedChunkMetadata : overlappedChunkMetadatas) {
+      deserializeChunkIntoQueue(overlappedChunkMetadata);
+    }
+    compactPages();
+  }
+
+  /**
+   * Flush chunk to target file directly. If the end time of chunk exceeds the end time of file or
+   * the unsealed chunk is too small, then deserialize it.
+   */
+  private void compactWithNonOverlapChunks(ChunkMetadataElement chunkMetadataElement)
+      throws IOException, PageException, WriteProcessException, IllegalPathException {
+    if (compactionWriter.flushChunkToFileWriter(
+        chunkMetadataElement.chunkMetadata,
+        readerCacheMap.get(chunkMetadataElement.fileElement.resource),
+        subTaskId)) {
+      // flush chunk successfully
+      removeChunk(chunkMetadataQueue.peek());
+    } else {
+      // unsealed chunk is not large enough or chunk.endTime > file.endTime, then deserialize chunk
+      deserializeChunkIntoQueue(chunkMetadataElement);
+      compactPages();
+    }
+  }
+
+  abstract void deserializeChunkIntoQueue(ChunkMetadataElement chunkMetadataElement)
+      throws IOException;
+
+  /** Deserialize files into chunk metadatas and put them into the chunk metadata queue. */
+  abstract void deserializeFileIntoQueue(List<FileElement> fileElements)
+      throws IOException, IllegalPathException;
+
+  /** Compact pages in page queue. */
+  private void compactPages()
+      throws IOException, PageException, WriteProcessException, IllegalPathException {
+    while (!pageQueue.isEmpty()) {
+      PageElement firstPageElement = pageQueue.peek();
+      int modifiedStatus = isPageModified(firstPageElement);
+
+      if (modifiedStatus == 1) {
+        // all data on this page has been deleted, remove it
+        removePage(firstPageElement);
+        continue;
+      }
+
+      List<PageElement> overlappedPages = findOverlapPages(firstPageElement);
+      boolean isPageOverlap = overlappedPages.size() > 1;
+
+      if (isPageOverlap || modifiedStatus == 0) {
+        // has overlap or modified pages, then deserialize it
+        compactWithOverlapPages(overlappedPages);
+      } else {
+        // has none overlap or modified pages, flush it to chunk writer directly
+        compactWithNonOverlapPage(firstPageElement);
+      }
+    }
+  }
+
+  private void compactWithNonOverlapPage(PageElement pageElement)
+      throws PageException, IOException, WriteProcessException, IllegalPathException {
+    boolean success;
+    if (pageElement.iChunkReader instanceof AlignedChunkReader) {
+      success =
+          compactionWriter.flushAlignedPageToChunkWriter(
+              pageElement.pageData,
+              pageElement.pageHeader,
+              pageElement.valuePageDatas,
+              pageElement.valuePageHeaders,
+              subTaskId);
+    } else {
+      success =
+          compactionWriter.flushPageToChunkWriter(
+              pageElement.pageData, pageElement.pageHeader, subTaskId);
+    }
+    if (success) {
+      // flush the page successfully, then remove this page
+      removePage(pageElement);
+    } else {
+      // unsealed page is not large enough or page.endTime > file.endTime, then deserialze it
+      pointPriorityReader.addNewPage(pageElement);
+
+      // write data points of the current page into chunk writer
+      while (pointPriorityReader.hasNext()
+          && pointPriorityReader.currentPoint().left <= pageElement.pageHeader.getEndTime()) {
+        compactionWriter.write(
+            pointPriorityReader.currentPoint().left,
+            pointPriorityReader.currentPoint().right,
+            subTaskId);
+        pointPriorityReader.next();
+      }
+    }
+  }
+
+  /**
+   * Compact a series of pages that overlap with each other. Eg: The parameters are page 1 and page
+   * 2, that is, page 1 only overlaps with page 2, while page 2 overlap with page 3, page 3 overlap
+   * with page 4,and so on, there are 10 pages in total. This method will merge all 10 pages.
+   */
+  private void compactWithOverlapPages(List<PageElement> overlappedPages)
+      throws IOException, PageException, WriteProcessException, IllegalPathException {
+    pointPriorityReader.addNewPage(overlappedPages.remove(0));
+    pointPriorityReader.updateNewOverlappedPages(overlappedPages);
+    while (pointPriorityReader.hasNext()) {
+      // write point.time < the last overlapped page.startTime
+      while (overlappedPages.size() > 0) {
+        PageElement nextPageElement = overlappedPages.get(0);
+
+        int oldSize = overlappedPages.size();
+        // write currentPage.point.time < nextPage.startTime to chunk writer
+        while (pointPriorityReader.currentPoint().left < nextPageElement.startTime) {
+          // write data point to chunk writer
+          compactionWriter.write(
+              pointPriorityReader.currentPoint().left,
+              pointPriorityReader.currentPoint().right,
+              subTaskId);
+          pointPriorityReader.next();
+          if (overlappedPages.size() > oldSize) {
+            // during the process of writing overlapped points, if the first page is compacted
+            // completely or a new chunk is deserialized, there may be new pages overlapped with the
+            // first page in page queue which are added into the list. If so, the next overlapped
+            // page in the list may be changed, so we should re-get next overlap page here.
+            oldSize = overlappedPages.size();
+            nextPageElement = overlappedPages.get(0);
+          }
+        }
+
+        int nextPageModifiedStatus = isPageModified(nextPageElement);
+
+        if (nextPageModifiedStatus == 1) {
+          // all data on next page has been deleted, remove it
+          removePage(nextPageElement);
+        } else {
+          boolean isNextPageOverlap =
+              pointPriorityReader.currentPoint().left <= nextPageElement.pageHeader.getEndTime()
+                  || isPageOverlap(nextPageElement);
+
+          if (isNextPageOverlap || nextPageModifiedStatus == 0) {
+            // has overlap or modified pages, then deserialize it
+            pointPriorityReader.addNewPage(nextPageElement);
+          } else {
+            // has none overlap or modified pages, flush it to chunk writer directly
+            compactWithNonOverlapPage(nextPageElement);
+          }
+        }
+        overlappedPages.remove(0);
+      }
+
+      // write remaining data points, of which point.time >= the last overlapped page.startTime
+      while (pointPriorityReader.hasNext()) {
+        // write data point to chunk writer
+        compactionWriter.write(
+            pointPriorityReader.currentPoint().left,
+            pointPriorityReader.currentPoint().right,
+            subTaskId);
+        pointPriorityReader.next();
+        if (overlappedPages.size() > 0) {
+          // finish compacting the first page or there are new chunks being deserialized and find
+          // the new overlapped pages, then start compacting them
+          break;
+        }
+      }
+    }
+  }
+
+  /**
+   * Find overlaped pages which have not been selected. Notice: We must ensure that the returned
+   * list is ordered according to the startTime of the page from small to large, so that each page
+   * can be compacted in order.
+   */
+  private List<PageElement> findOverlapPages(PageElement page) {
+    List<PageElement> elements = new ArrayList<>();
+    long endTime = page.pageHeader.getEndTime();
+    for (PageElement element : pageQueue) {
+      if (element.startTime <= endTime) {
+        if (!element.isOverlaped) {
+          elements.add(element);
+          element.isOverlaped = true;
+        }
+      }
+    }
+    elements.sort(Comparator.comparingLong(o -> o.startTime));
+    return elements;
+  }
+
+  /**
+   * Find overlapped chunks which have not been selected. Notice: We must ensure that the returned
+   * list is ordered according to the startTime of the chunk from small to large, so that each chunk
+   * can be compacted in order.
+   */
+  private List<ChunkMetadataElement> findOverlapChunkMetadatas(ChunkMetadataElement chunkMetadata) {
+    List<ChunkMetadataElement> elements = new ArrayList<>();
+    long endTime = chunkMetadata.chunkMetadata.getEndTime();

Review Comment:
   Resolved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org