You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/06/03 11:42:25 UTC

[iotdb] branch rel/0.11 updated: fix continuous compaction doesn't take effect when enablePartition (#3322)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.11 by this push:
     new fa22e77  fix continuous compaction doesn't take effect when enablePartition (#3322)
fa22e77 is described below

commit fa22e7703870dcbd3c6389cd60497e2e42cb3794
Author: Jialin Qiao <qj...@mails.tsinghua.edu.cn>
AuthorDate: Thu Jun 3 06:41:09 2021 -0500

    fix continuous compaction doesn't take effect when enablePartition (#3322)
---
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  2 +-
 .../compaction/CompactionMergeTaskPoolManager.java | 13 +++-
 .../compaction/StorageGroupCompactionTask.java     | 48 ++++++++++++
 .../db/engine/compaction/TsFileManagement.java     |  8 +-
 .../engine/storagegroup/StorageGroupProcessor.java | 91 +++++++++++++---------
 .../compaction/LevelCompactionCacheTest.java       |  1 +
 .../storagegroup/StorageGroupProcessorTest.java    |  2 +-
 7 files changed, 121 insertions(+), 44 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index f7d0f63..209f1c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -632,7 +632,7 @@ public class StorageEngine implements IService {
       throw new StorageEngineException("Current system mode is read only, does not support merge");
     }
     for (StorageGroupProcessor storageGroupProcessor : processorMap.values()) {
-      storageGroupProcessor.merge(fullMerge);
+      storageGroupProcessor.merge();
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
index 8abe359..268c92e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.engine.compaction;
 import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.COMPACTION_LOG_NAME;
 
 import java.io.File;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -47,6 +48,7 @@ public class CompactionMergeTaskPoolManager implements IService {
   private static final CompactionMergeTaskPoolManager INSTANCE = new CompactionMergeTaskPoolManager();
   private ExecutorService pool;
 
+  private static ConcurrentHashMap<String, Boolean> sgCompactionStatus = new ConcurrentHashMap<>();
   public static CompactionMergeTaskPoolManager getInstance() {
     return INSTANCE;
   }
@@ -137,10 +139,17 @@ public class CompactionMergeTaskPoolManager implements IService {
     return ServiceType.COMPACTION_SERVICE;
   }
 
-  public void submitTask(Runnable compactionMergeTask)
+  public synchronized void submitTask(StorageGroupCompactionTask storageGroupCompactionTask)
       throws RejectedExecutionException {
     if (pool != null && !pool.isTerminated()) {
-      pool.submit(compactionMergeTask);
+      String storageGroup = storageGroupCompactionTask.getStorageGroupName();
+      boolean isCompacting = sgCompactionStatus.computeIfAbsent(storageGroup, k -> false);
+      if (isCompacting) {
+        return;
+      }
+      storageGroupCompactionTask.setSgCompactionStatus(sgCompactionStatus);
+      sgCompactionStatus.put(storageGroup, true);
+      pool.submit(storageGroupCompactionTask);
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/StorageGroupCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/StorageGroupCompactionTask.java
new file mode 100644
index 0000000..d3ce31e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/StorageGroupCompactionTask.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.compaction;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+public abstract class StorageGroupCompactionTask implements Runnable{
+
+  private String storageGroupName;
+  private ConcurrentHashMap<String, Boolean> sgCompactionStatus;
+
+  public StorageGroupCompactionTask(String storageGroupName) {
+    this.storageGroupName = storageGroupName;
+  }
+
+  void setSgCompactionStatus(ConcurrentHashMap<String, Boolean> sgCompactionStatus) {
+    this.sgCompactionStatus = sgCompactionStatus;
+  }
+
+  public String getStorageGroupName() {
+    return storageGroupName;
+  }
+
+  protected void clearCompactionStatus() {
+    // for test
+    if (sgCompactionStatus == null) {
+      sgCompactionStatus = new ConcurrentHashMap<>();
+    }
+    sgCompactionStatus.put(storageGroupName, false);
+  }
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index f3e70ff..db611dc 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -152,13 +152,14 @@ public abstract class TsFileManagement {
 
   protected abstract void merge(long timePartition);
 
-  public class CompactionMergeTask implements Runnable {
+  public class CompactionMergeTask extends StorageGroupCompactionTask {
 
     private CloseCompactionMergeCallBack closeCompactionMergeCallBack;
     private long timePartitionId;
 
     public CompactionMergeTask(
         CloseCompactionMergeCallBack closeCompactionMergeCallBack, long timePartitionId) {
+      super(storageGroupName);
       this.closeCompactionMergeCallBack = closeCompactionMergeCallBack;
       this.timePartitionId = timePartitionId;
     }
@@ -167,14 +168,16 @@ public abstract class TsFileManagement {
     public void run() {
       merge(timePartitionId);
       closeCompactionMergeCallBack.call(isMergeExecutedInCurrentTask, timePartitionId);
+      clearCompactionStatus();
     }
   }
 
-  public class CompactionRecoverTask implements Runnable {
+  public class CompactionRecoverTask extends StorageGroupCompactionTask {
 
     private CloseCompactionMergeCallBack closeCompactionMergeCallBack;
 
     public CompactionRecoverTask(CloseCompactionMergeCallBack closeCompactionMergeCallBack) {
+      super(storageGroupName);
       this.closeCompactionMergeCallBack = closeCompactionMergeCallBack;
     }
 
@@ -184,6 +187,7 @@ public abstract class TsFileManagement {
       // in recover logic, we do not have to start next compaction task, and in this case the param
       // time partition is useless, we can just pass 0L
       closeCompactionMergeCallBack.call(false, 0L);
+      clearCompactionStatus();
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index e5cdc81..3993cda 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -49,6 +49,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.DirectoryManager;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
+import org.apache.iotdb.db.engine.compaction.StorageGroupCompactionTask;
 import org.apache.iotdb.db.engine.compaction.TsFileManagement;
 import org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
@@ -157,8 +158,6 @@ public class StorageGroupProcessor {
   private final TreeMap<Long, TsFileProcessor> workSequenceTsFileProcessors = new TreeMap<>();
   /** time partition id in the storage group -> tsFileProcessor for this time partition */
   private final TreeMap<Long, TsFileProcessor> workUnsequenceTsFileProcessors = new TreeMap<>();
-  /** compactionMergeWorking is used to wait for last compaction to be done. */
-  private volatile boolean compactionMergeWorking = false;
   // upgrading sequence TsFile resource list
   private List<TsFileResource> upgradeSeqFileList = new LinkedList<>();
 
@@ -373,18 +372,30 @@ public class StorageGroupProcessor {
       globalLatestFlushedTimeForEachDevice.putAll(endTimeMap);
     }
 
-    if (IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()
-        && seqTsFileResources.size() > 0) {
+    if (IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
+      CompactionMergeTaskPoolManager.getInstance()
+          .submitTask(new CompactionAllPartitionTask(storageGroupName));
+    }
+  }
+
+  public class CompactionAllPartitionTask extends StorageGroupCompactionTask{
+
+    CompactionAllPartitionTask(String storageGroupName) {
+      super(storageGroupName);
+    }
+
+    @Override
+    public void run() {
       for (long timePartitionId : partitionLatestFlushedTimeForEachDevice.keySet()) {
-        executeCompaction(
+        syncCompactOnePartition(
             timePartitionId, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
       }
+      clearCompactionStatus();
     }
   }
 
   private void recoverCompaction() {
     if (!CompactionMergeTaskPoolManager.getInstance().isTerminated()) {
-      compactionMergeWorking = true;
       logger.info("{} submit a compaction merge task", storageGroupName);
       try {
         CompactionMergeTaskPoolManager.getInstance()
@@ -1863,37 +1874,47 @@ public class StorageGroupProcessor {
     }
     logger.info("signal closing storage group condition in {}", storageGroupName);
 
-    executeCompaction(
-        tsFileProcessor.getTimeRangeId(),
-        IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+    CompactionMergeTaskPoolManager.getInstance().submitTask(
+        new CompactionOnePartitionTask(storageGroupName, tsFileProcessor.getTimeRangeId()));
   }
 
-  private void executeCompaction(long timePartition, boolean fullMerge) {
-    if (!compactionMergeWorking && !CompactionMergeTaskPoolManager.getInstance().isTerminated()) {
-      compactionMergeWorking = true;
-      logger.info("{} submit a compaction merge task", storageGroupName);
-      try {
-        // fork and filter current tsfile, then commit then to compaction merge
-        tsFileManagement.forkCurrentFileList(timePartition);
-        tsFileManagement.setForceFullMerge(fullMerge);
-        CompactionMergeTaskPoolManager.getInstance()
-            .submitTask(
-                tsFileManagement
-                .new CompactionMergeTask(this::closeCompactionMergeCallBack, timePartition));
-      } catch (IOException | RejectedExecutionException e) {
-        this.closeCompactionMergeCallBack(false, timePartition);
-        logger.error("{} compaction submit task failed", storageGroupName);
-      }
-    } else {
-      logger.info("{} last compaction merge task is working, skip current merge", storageGroupName);
+
+  public class CompactionOnePartitionTask extends StorageGroupCompactionTask{
+
+    private long partition;
+
+    CompactionOnePartitionTask(String storageGroupName, long partition) {
+      super(storageGroupName);
+      this.partition = partition;
+    }
+
+    @Override
+    public void run() {
+      syncCompactOnePartition(
+          partition,
+          IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+      clearCompactionStatus();
+    }
+  }
+
+  private void syncCompactOnePartition(long timePartition, boolean fullMerge) {
+    logger.info("{} submit a compaction merge task", storageGroupName);
+    try {
+      // fork and filter current tsfile, then commit then to compaction merge
+      tsFileManagement.forkCurrentFileList(timePartition);
+      tsFileManagement.setForceFullMerge(fullMerge);
+      tsFileManagement.new CompactionMergeTask(this::closeCompactionMergeCallBack, timePartition)
+          .run();
+    } catch (IOException e) {
+      this.closeCompactionMergeCallBack(false, timePartition);
+      logger.error("{} compaction submit task failed", storageGroupName);
     }
   }
 
   /** close compaction merge callback, to release some locks */
   private void closeCompactionMergeCallBack(boolean isMerge, long timePartitionId) {
-    this.compactionMergeWorking = false;
     if (isMerge && IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
-      executeCompaction(
+      syncCompactOnePartition(
           timePartitionId, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
     }
   }
@@ -1969,15 +1990,9 @@ public class StorageGroupProcessor {
     }
   }
 
-  public void merge(boolean fullMerge) {
-    writeLock();
-    try {
-      for (long timePartitionId : timePartitionIdVersionControllerMap.keySet()) {
-        executeCompaction(timePartitionId, fullMerge);
-      }
-    } finally {
-      writeUnlock();
-    }
+  public void merge() {
+    CompactionMergeTaskPoolManager.getInstance()
+        .submitTask(new CompactionAllPartitionTask(storageGroupName));
   }
 
   /**
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionCacheTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionCacheTest.java
index 6c03e4f..9c581cf 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionCacheTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionCacheTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.engine.compaction;
 
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.iotdb.db.constant.TestConstant;
 import org.apache.iotdb.db.engine.cache.ChunkCache;
 import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index 4a9f513..18f81bf 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -600,7 +600,7 @@ public class StorageGroupProcessorTest {
     }
 
     processor.syncCloseAllWorkingTsFileProcessors();
-    processor.merge(true);
+    processor.merge();
     while (processor.getTsFileManagement().isUnseqMerging) {
       // wait
     }