You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2021/01/08 13:15:48 UTC

[iotdb] branch master updated: Revert "Execute compaction after sync and load is called (#2439)"

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ca31fef  Revert "Execute compaction after sync and load is called (#2439)"
ca31fef is described below

commit ca31fefde1885289cb45c2ca98db343d7fad0811
Author: Xiangdong Huang <hx...@qq.com>
AuthorDate: Fri Jan 8 19:45:41 2021 +0800

    Revert "Execute compaction after sync and load is called (#2439)"
    
    This reverts commit 38eb730d735a3e1f3b0beaa956a50e4ea37f9d50.
---
 .../engine/storagegroup/StorageGroupProcessor.java | 69 ++++++++++------------
 1 file changed, 32 insertions(+), 37 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index bfbd87a..fdf623a 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -819,11 +819,11 @@ public class StorageGroupProcessor {
    * inserted are in the range [start, end)
    *
    * @param insertTabletPlan insert a tablet of a device
-   * @param sequence whether is sequence
-   * @param start start index of rows to be inserted in insertTabletPlan
-   * @param end end index of rows to be inserted in insertTabletPlan
-   * @param results result array
-   * @param timePartitionId time partition id
+   * @param sequence         whether is sequence
+   * @param start            start index of rows to be inserted in insertTabletPlan
+   * @param end              end index of rows to be inserted in insertTabletPlan
+   * @param results          result array
+   * @param timePartitionId  time partition id
    * @return false if any failure occurs when inserting the tablet, true otherwise
    */
   private boolean insertTabletToTsFileProcessor(InsertTabletPlan insertTabletPlan,
@@ -982,9 +982,9 @@ public class StorageGroupProcessor {
   /**
    * get processor from hashmap, flush oldest processor if necessary
    *
-   * @param timeRangeId time partition range
+   * @param timeRangeId            time partition range
    * @param tsFileProcessorTreeMap tsFileProcessorTreeMap
-   * @param sequence whether is sequence or not
+   * @param sequence               whether is sequence or not
    */
   private TsFileProcessor getOrCreateTsFileProcessorIntern(long timeRangeId,
       TreeMap<Long, TsFileProcessor> tsFileProcessorTreeMap,
@@ -1451,9 +1451,9 @@ public class StorageGroupProcessor {
    * Delete data whose timestamp <= 'timestamp' and belongs to the time series
    * deviceId.measurementId.
    *
-   * @param path the timeseries path of the to be deleted.
+   * @param path      the timeseries path of the to be deleted.
    * @param startTime the startTime of delete range.
-   * @param endTime the endTime of delete range.
+   * @param endTime   the endTime of delete range.
    */
   public void delete(PartialPath path, long startTime, long endTime, long planIndex)
       throws IOException {
@@ -1679,7 +1679,25 @@ public class StorageGroupProcessor {
     }
     logger.info("signal closing storage group condition in {}", storageGroupName);
 
-    executeCompaction(tsFileProcessor.getTimeRangeId());
+    if (!compactionMergeWorking && !CompactionMergeTaskPoolManager.getInstance()
+        .isTerminated()) {
+      compactionMergeWorking = true;
+      logger.info("{} submit a compaction merge task", storageGroupName);
+      try {
+        // fork and filter current tsfile, then commit then to compaction merge
+        tsFileManagement.forkCurrentFileList(tsFileProcessor.getTimeRangeId());
+        CompactionMergeTaskPoolManager.getInstance()
+            .submitTask(
+                tsFileManagement.new CompactionMergeTask(this::closeCompactionMergeCallBack,
+                    tsFileProcessor.getTimeRangeId()));
+      } catch (IOException | RejectedExecutionException e) {
+        this.closeCompactionMergeCallBack();
+        logger.error("{} compaction submit task failed", storageGroupName);
+      }
+    } else {
+      logger.info("{} last compaction merge task is working, skip current merge",
+          storageGroupName);
+    }
   }
 
   /**
@@ -1862,29 +1880,6 @@ public class StorageGroupProcessor {
       tsFileManagement.writeUnlock();
       writeUnlock();
     }
-
-    executeCompaction(newFilePartitionId);
-  }
-
-  private void executeCompaction(long timePartition) {
-    if (!compactionMergeWorking && !CompactionMergeTaskPoolManager.getInstance()
-        .isTerminated()) {
-      compactionMergeWorking = true;
-      logger.info("{} submit a compaction merge task", storageGroupName);
-      try {
-        // fork and filter current tsfile, then commit then to compaction merge
-        tsFileManagement.forkCurrentFileList(timePartition);
-        CompactionMergeTaskPoolManager.getInstance().submitTask(
-            tsFileManagement.new CompactionMergeTask(this::closeCompactionMergeCallBack,
-                timePartition));
-      } catch (IOException | RejectedExecutionException e) {
-        this.closeCompactionMergeCallBack();
-        logger.error("{} compaction submit task failed", storageGroupName);
-      }
-    } else {
-      logger.info("{} last compaction merge task is working, skip current merge",
-          storageGroupName);
-    }
   }
 
   /**
@@ -2086,9 +2081,9 @@ public class StorageGroupProcessor {
    * returns directly; otherwise, the time stamp is the mean of the timestamps of the two files, the
    * version number is the version number in the tsfile with a larger timestamp.
    *
-   * @param tsfileName origin tsfile name
+   * @param tsfileName  origin tsfile name
    * @param insertIndex the new file will be inserted between the files [insertIndex, insertIndex +
-   * 1]
+   *                    1]
    * @return appropriate filename
    */
   private String getFileNameForLoadingFile(String tsfileName, int insertIndex,
@@ -2154,8 +2149,8 @@ public class StorageGroupProcessor {
   /**
    * Execute the loading process by the type.
    *
-   * @param type load type
-   * @param tsFileResource tsfile resource to be loaded
+   * @param type            load type
+   * @param tsFileResource  tsfile resource to be loaded
    * @param filePartitionId the partition id of the new file
    * @return load the file successfully
    * @UsedBy sync module, load external tsfile module.