You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/06/04 03:43:05 UTC

[iotdb] branch rel/0.12 updated: [IOTDB-1419][To rel/0.12] fix continuous compaction doesn't take effect when enablePartition (#3326)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new da162af  [IOTDB-1419][To rel/0.12] fix continuous compaction doesn't take effect when enablePartition (#3326)
da162af is described below

commit da162af63d775df934098bd02ee935e1a2250cd1
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Fri Jun 4 11:42:42 2021 +0800

    [IOTDB-1419][To rel/0.12] fix continuous compaction doesn't take effect when enablePartition (#3326)
---
 .../compaction/CompactionMergeTaskPoolManager.java |  31 +++---
 .../compaction/StorageGroupCompactionTask.java     |  48 +++++++++
 .../db/engine/compaction/TsFileManagement.java     |  28 +++---
 .../engine/storagegroup/StorageGroupProcessor.java | 107 +++++++++++----------
 .../virtualSg/VirtualStorageGroupManager.java      |   2 +-
 .../storagegroup/StorageGroupProcessorTest.java    |   2 +-
 6 files changed, 144 insertions(+), 74 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
index cdf6f4a..c04661c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
@@ -36,7 +36,6 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ExecutorService;
@@ -56,6 +55,8 @@ public class CompactionMergeTaskPoolManager implements IService {
   private ExecutorService pool;
   private Map<String, Set<Future<Void>>> storageGroupTasks = new ConcurrentHashMap<>();
 
+  private static ConcurrentHashMap<String, Boolean> sgCompactionStatus = new ConcurrentHashMap<>();
+
   public static CompactionMergeTaskPoolManager getInstance() {
     return INSTANCE;
   }
@@ -159,16 +160,6 @@ public class CompactionMergeTaskPoolManager implements IService {
     return ServiceType.COMPACTION_SERVICE;
   }
 
-  public void submitTask(String storageGroupName, Callable<Void> compactionMergeTask)
-      throws RejectedExecutionException {
-    if (pool != null && !pool.isTerminated()) {
-      Future<Void> future = pool.submit(compactionMergeTask);
-      storageGroupTasks
-          .computeIfAbsent(storageGroupName, k -> new ConcurrentSkipListSet<>())
-          .add(future);
-    }
-  }
-
   /**
    * Abort all compactions of a storage group. The caller must acquire the write lock of the
    * corresponding storage group.
@@ -181,11 +172,29 @@ public class CompactionMergeTaskPoolManager implements IService {
       Future<Void> next = subIterator.next();
       if (!next.isDone() && !next.isCancelled()) {
         next.cancel(true);
+        sgCompactionStatus.put(storageGroup, false);
       }
       subIterator.remove();
     }
   }
 
+  public synchronized void submitTask(StorageGroupCompactionTask storageGroupCompactionTask)
+      throws RejectedExecutionException {
+    if (pool != null && !pool.isTerminated()) {
+      String storageGroup = storageGroupCompactionTask.getStorageGroupName();
+      boolean isCompacting = sgCompactionStatus.computeIfAbsent(storageGroup, k -> false);
+      if (isCompacting) {
+        return;
+      }
+      storageGroupCompactionTask.setSgCompactionStatus(sgCompactionStatus);
+      sgCompactionStatus.put(storageGroup, true);
+      Future<Void> future = pool.submit(storageGroupCompactionTask);
+      storageGroupTasks
+          .computeIfAbsent(storageGroup, k -> new ConcurrentSkipListSet<>())
+          .add(future);
+    }
+  }
+
   public boolean isTerminated() {
     return pool == null || pool.isTerminated();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/StorageGroupCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/StorageGroupCompactionTask.java
new file mode 100644
index 0000000..78061ce
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/StorageGroupCompactionTask.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.compaction;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+
+public abstract class StorageGroupCompactionTask implements Callable<Void> {
+
+  private String storageGroupName;
+  private ConcurrentHashMap<String, Boolean> sgCompactionStatus;
+
+  public StorageGroupCompactionTask(String storageGroupName) {
+    this.storageGroupName = storageGroupName;
+  }
+
+  void setSgCompactionStatus(ConcurrentHashMap<String, Boolean> sgCompactionStatus) {
+    this.sgCompactionStatus = sgCompactionStatus;
+  }
+
+  public String getStorageGroupName() {
+    return storageGroupName;
+  }
+
+  protected void clearCompactionStatus() {
+    // for test
+    if (sgCompactionStatus == null) {
+      sgCompactionStatus = new ConcurrentHashMap<>();
+    }
+    sgCompactionStatus.put(storageGroupName, false);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index 595ddf8..7a7a30c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -44,7 +44,6 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.Callable;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -157,13 +156,14 @@ public abstract class TsFileManagement {
 
   protected abstract void merge(long timePartition);
 
-  public class CompactionMergeTask implements Callable<Void> {
+  public class CompactionMergeTask extends StorageGroupCompactionTask {
 
     private CloseCompactionMergeCallBack closeCompactionMergeCallBack;
     private long timePartitionId;
 
     public CompactionMergeTask(
         CloseCompactionMergeCallBack closeCompactionMergeCallBack, long timePartitionId) {
+      super(storageGroupName);
       this.closeCompactionMergeCallBack = closeCompactionMergeCallBack;
       this.timePartitionId = timePartitionId;
     }
@@ -172,15 +172,17 @@ public abstract class TsFileManagement {
     public Void call() {
       merge(timePartitionId);
       closeCompactionMergeCallBack.call(isMergeExecutedInCurrentTask, timePartitionId);
+      clearCompactionStatus();
       return null;
     }
   }
 
-  public class CompactionRecoverTask implements Callable<Void> {
+  public class CompactionRecoverTask extends StorageGroupCompactionTask {
 
     private CloseCompactionMergeCallBack closeCompactionMergeCallBack;
 
     public CompactionRecoverTask(CloseCompactionMergeCallBack closeCompactionMergeCallBack) {
+      super(storageGroupName);
       this.closeCompactionMergeCallBack = closeCompactionMergeCallBack;
     }
 
@@ -190,6 +192,7 @@ public abstract class TsFileManagement {
       // in recover logic, we do not have to start next compaction task, and in this case the param
       // time partition is useless, we can just pass 0L
       closeCompactionMergeCallBack.call(false, 0L);
+      clearCompactionStatus();
       return null;
     }
   }
@@ -199,15 +202,6 @@ public abstract class TsFileManagement {
       List<TsFileResource> seqMergeList,
       List<TsFileResource> unSeqMergeList,
       long dataTTL) {
-    if (isUnseqMerging) {
-      if (logger.isInfoEnabled()) {
-        logger.info(
-            "{} Last merge is ongoing, currently consumed time: {}ms",
-            storageGroupName,
-            (System.currentTimeMillis() - mergeStartTime));
-      }
-      return false;
-    }
     // wait until seq merge has finished
     while (isSeqMerging) {
       try {
@@ -288,6 +282,16 @@ public abstract class TsFileManagement {
             mergeFiles[0].size(),
             mergeFiles[1].size());
       }
+      // wait until unseq merge has finished
+      while (isUnseqMerging) {
+        try {
+          Thread.sleep(200);
+        } catch (InterruptedException e) {
+          logger.error("{} [Compaction] shutdown", storageGroupName, e);
+          Thread.currentThread().interrupt();
+          return false;
+        }
+      }
       return true;
     } catch (MergeException | IOException e) {
       logger.error("{} cannot select file for merge", storageGroupName, e);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 7427f98..e9fc64b 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.DirectoryManager;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
+import org.apache.iotdb.db.engine.compaction.StorageGroupCompactionTask;
 import org.apache.iotdb.db.engine.compaction.TsFileManagement;
 import org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
@@ -170,8 +171,6 @@ public class StorageGroupProcessor {
   private final TreeMap<Long, TsFileProcessor> workSequenceTsFileProcessors = new TreeMap<>();
   /** time partition id in the storage group -> tsFileProcessor for this time partition */
   private final TreeMap<Long, TsFileProcessor> workUnsequenceTsFileProcessors = new TreeMap<>();
-  /** compactionMergeWorking is used to wait for last compaction to be done. */
-  private volatile boolean compactionMergeWorking = false;
   // upgrading sequence TsFile resource list
   private List<TsFileResource> upgradeSeqFileList = new LinkedList<>();
 
@@ -507,18 +506,31 @@ public class StorageGroupProcessor {
       globalLatestFlushedTimeForEachDevice.putAll(endTimeMap);
     }
 
-    if (IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()
-        && seqTsFileResources.size() > 0) {
-      for (long timePartitionId : timePartitionIdVersionControllerMap.keySet()) {
-        executeCompaction(
+    if (IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
+      CompactionMergeTaskPoolManager.getInstance()
+          .submitTask(new CompactionAllPartitionTask(logicalStorageGroupName));
+    }
+  }
+
+  public class CompactionAllPartitionTask extends StorageGroupCompactionTask {
+
+    CompactionAllPartitionTask(String storageGroupName) {
+      super(storageGroupName);
+    }
+
+    @Override
+    public Void call() {
+      for (long timePartitionId : partitionLatestFlushedTimeForEachDevice.keySet()) {
+        syncCompactOnePartition(
             timePartitionId, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
       }
+      clearCompactionStatus();
+      return null;
     }
   }
 
   private void recoverCompaction() {
     if (!CompactionMergeTaskPoolManager.getInstance().isTerminated()) {
-      compactionMergeWorking = true;
       logger.info(
           "{} - {} submit a compaction recover merge task",
           logicalStorageGroupName,
@@ -526,7 +538,6 @@ public class StorageGroupProcessor {
       try {
         CompactionMergeTaskPoolManager.getInstance()
             .submitTask(
-                logicalStorageGroupName,
                 tsFileManagement.new CompactionRecoverTask(this::closeCompactionMergeCallBack));
       } catch (RejectedExecutionException e) {
         this.closeCompactionMergeCallBack(false, 0);
@@ -1945,47 +1956,51 @@ public class StorageGroupProcessor {
         "signal closing storage group condition in {}",
         logicalStorageGroupName + "-" + virtualStorageGroupId);
 
-    executeCompaction(
-        tsFileProcessor.getTimeRangeId(),
-        IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+    CompactionMergeTaskPoolManager.getInstance()
+        .submitTask(
+            new CompactionOnePartitionTask(
+                logicalStorageGroupName, tsFileProcessor.getTimeRangeId()));
   }
 
-  private void executeCompaction(long timePartition, boolean fullMerge) {
-    if (!compactionMergeWorking && !CompactionMergeTaskPoolManager.getInstance().isTerminated()) {
-      compactionMergeWorking = true;
-      logger.info(
-          "{} submit a compaction merge task",
-          logicalStorageGroupName + "-" + virtualStorageGroupId);
-      try {
-        // fork and filter current tsfile, then commit then to compaction merge
-        tsFileManagement.forkCurrentFileList(timePartition);
-        tsFileManagement.setForceFullMerge(fullMerge);
-        CompactionMergeTaskPoolManager.getInstance()
-            .submitTask(
-                logicalStorageGroupName,
-                tsFileManagement
-                .new CompactionMergeTask(this::closeCompactionMergeCallBack, timePartition));
-      } catch (IOException | RejectedExecutionException e) {
-        this.closeCompactionMergeCallBack(false, timePartition);
-        logger.error(
-            "{} compaction submit task failed",
-            logicalStorageGroupName + "-" + virtualStorageGroupId,
-            e);
-      }
-    } else {
-      logger.info(
-          "{} last compaction merge task is working, skip current merge",
-          logicalStorageGroupName + "-" + virtualStorageGroupId);
+  public class CompactionOnePartitionTask extends StorageGroupCompactionTask {
+
+    private long partition;
+
+    CompactionOnePartitionTask(String storageGroupName, long partition) {
+      super(storageGroupName);
+      this.partition = partition;
+    }
+
+    @Override
+    public Void call() {
+      syncCompactOnePartition(
+          partition, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+      clearCompactionStatus();
+      return null;
+    }
+  }
+
+  private void syncCompactOnePartition(long timePartition, boolean fullMerge) {
+    logger.info(
+        "{}-{} submit a compaction merge task", logicalStorageGroupName, virtualStorageGroupId);
+    try {
+      // fork and filter current tsfile, then commit then to compaction merge
+      tsFileManagement.forkCurrentFileList(timePartition);
+      tsFileManagement.setForceFullMerge(fullMerge);
+      tsFileManagement.new CompactionMergeTask(this::closeCompactionMergeCallBack, timePartition)
+          .call();
+    } catch (IOException e) {
+      this.closeCompactionMergeCallBack(false, timePartition);
+      logger.error(
+          "{}-{} compaction submit task failed", logicalStorageGroupName, virtualStorageGroupId);
     }
   }
 
   /** close compaction merge callback, to release some locks */
   private void closeCompactionMergeCallBack(boolean isMerge, long timePartitionId) {
     if (isMerge && IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
-      executeCompaction(
+      syncCompactOnePartition(
           timePartitionId, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
-    } else {
-      this.compactionMergeWorking = false;
     }
   }
 
@@ -2086,15 +2101,9 @@ public class StorageGroupProcessor {
     resources.clear();
   }
 
-  public void merge(boolean isFullMerge) {
-    writeLock();
-    try {
-      for (long timePartitionId : partitionLatestFlushedTimeForEachDevice.keySet()) {
-        executeCompaction(timePartitionId, isFullMerge);
-      }
-    } finally {
-      writeUnlock();
-    }
+  public void merge() {
+    CompactionMergeTaskPoolManager.getInstance()
+        .submitTask(new CompactionAllPartitionTask(logicalStorageGroupName));
   }
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
index e8d1f77..fbbec79 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
@@ -298,7 +298,7 @@ public class VirtualStorageGroupManager {
   public void mergeAll(boolean isFullMerge) {
     for (StorageGroupProcessor storageGroupProcessor : virtualStorageGroupProcessor) {
       if (storageGroupProcessor != null) {
-        storageGroupProcessor.merge(isFullMerge);
+        storageGroupProcessor.merge();
       }
     }
   }
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index e68c4af..8cb27ac 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -600,7 +600,7 @@ public class StorageGroupProcessorTest {
     }
 
     processor.syncCloseAllWorkingTsFileProcessors();
-    processor.merge(IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+    processor.merge();
     while (processor.getTsFileManagement().isUnseqMerging) {
       // wait
     }