You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/06/03 09:07:00 UTC

[iotdb] branch fix_partition_merge_0.11 created (now 2f94737)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a change to branch fix_partition_merge_0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 2f94737  fix continus compaction bug

This branch includes the following new commits:

     new 2f94737  fix continus compaction bug

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: fix continus compaction bug

Posted by qi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch fix_partition_merge_0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2f94737c855eb614d5d8fea3212f3b065e6c42f1
Author: qiaojialin <64...@qq.com>
AuthorDate: Thu Jun 3 17:05:01 2021 +0800

    fix continus compaction bug
---
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  2 +-
 .../compaction/CompactionMergeTaskPoolManager.java | 13 +++-
 .../compaction/StorageGroupCompactionTask.java     | 48 ++++++++++++
 .../db/engine/compaction/TsFileManagement.java     |  8 +-
 .../engine/storagegroup/StorageGroupProcessor.java | 91 +++++++++++++---------
 .../compaction/LevelCompactionCacheTest.java       |  1 +
 .../storagegroup/StorageGroupProcessorTest.java    |  2 +-
 7 files changed, 121 insertions(+), 44 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index f7d0f63..209f1c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -632,7 +632,7 @@ public class StorageEngine implements IService {
       throw new StorageEngineException("Current system mode is read only, does not support merge");
     }
     for (StorageGroupProcessor storageGroupProcessor : processorMap.values()) {
-      storageGroupProcessor.merge(fullMerge);
+      storageGroupProcessor.merge();
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
index 8abe359..268c92e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.engine.compaction;
 import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.COMPACTION_LOG_NAME;
 
 import java.io.File;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -47,6 +48,7 @@ public class CompactionMergeTaskPoolManager implements IService {
   private static final CompactionMergeTaskPoolManager INSTANCE = new CompactionMergeTaskPoolManager();
   private ExecutorService pool;
 
+  private static ConcurrentHashMap<String, Boolean> sgCompactionStatus = new ConcurrentHashMap<>();
   public static CompactionMergeTaskPoolManager getInstance() {
     return INSTANCE;
   }
@@ -137,10 +139,17 @@ public class CompactionMergeTaskPoolManager implements IService {
     return ServiceType.COMPACTION_SERVICE;
   }
 
-  public void submitTask(Runnable compactionMergeTask)
+  public synchronized void submitTask(StorageGroupCompactionTask storageGroupCompactionTask)
       throws RejectedExecutionException {
     if (pool != null && !pool.isTerminated()) {
-      pool.submit(compactionMergeTask);
+      String storageGroup = storageGroupCompactionTask.getStorageGroupName();
+      boolean isCompacting = sgCompactionStatus.computeIfAbsent(storageGroup, k -> false);
+      if (isCompacting) {
+        return;
+      }
+      storageGroupCompactionTask.setSgCompactionStatus(sgCompactionStatus);
+      sgCompactionStatus.put(storageGroup, true);
+      pool.submit(storageGroupCompactionTask);
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/StorageGroupCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/StorageGroupCompactionTask.java
new file mode 100644
index 0000000..d3ce31e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/StorageGroupCompactionTask.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.compaction;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+public abstract class StorageGroupCompactionTask implements Runnable{
+
+  private String storageGroupName;
+  private ConcurrentHashMap<String, Boolean> sgCompactionStatus;
+
+  public StorageGroupCompactionTask(String storageGroupName) {
+    this.storageGroupName = storageGroupName;
+  }
+
+  void setSgCompactionStatus(ConcurrentHashMap<String, Boolean> sgCompactionStatus) {
+    this.sgCompactionStatus = sgCompactionStatus;
+  }
+
+  public String getStorageGroupName() {
+    return storageGroupName;
+  }
+
+  protected void clearCompactionStatus() {
+    // for test
+    if (sgCompactionStatus == null) {
+      sgCompactionStatus = new ConcurrentHashMap<>();
+    }
+    sgCompactionStatus.put(storageGroupName, false);
+  }
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index f3e70ff..db611dc 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -152,13 +152,14 @@ public abstract class TsFileManagement {
 
   protected abstract void merge(long timePartition);
 
-  public class CompactionMergeTask implements Runnable {
+  public class CompactionMergeTask extends StorageGroupCompactionTask {
 
     private CloseCompactionMergeCallBack closeCompactionMergeCallBack;
     private long timePartitionId;
 
     public CompactionMergeTask(
         CloseCompactionMergeCallBack closeCompactionMergeCallBack, long timePartitionId) {
+      super(storageGroupName);
       this.closeCompactionMergeCallBack = closeCompactionMergeCallBack;
       this.timePartitionId = timePartitionId;
     }
@@ -167,14 +168,16 @@ public abstract class TsFileManagement {
     public void run() {
       merge(timePartitionId);
       closeCompactionMergeCallBack.call(isMergeExecutedInCurrentTask, timePartitionId);
+      clearCompactionStatus();
     }
   }
 
-  public class CompactionRecoverTask implements Runnable {
+  public class CompactionRecoverTask extends StorageGroupCompactionTask {
 
     private CloseCompactionMergeCallBack closeCompactionMergeCallBack;
 
     public CompactionRecoverTask(CloseCompactionMergeCallBack closeCompactionMergeCallBack) {
+      super(storageGroupName);
       this.closeCompactionMergeCallBack = closeCompactionMergeCallBack;
     }
 
@@ -184,6 +187,7 @@ public abstract class TsFileManagement {
       // in recover logic, we do not have to start next compaction task, and in this case the param
       // time partition is useless, we can just pass 0L
       closeCompactionMergeCallBack.call(false, 0L);
+      clearCompactionStatus();
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index e5cdc81..3993cda 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -49,6 +49,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.DirectoryManager;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
+import org.apache.iotdb.db.engine.compaction.StorageGroupCompactionTask;
 import org.apache.iotdb.db.engine.compaction.TsFileManagement;
 import org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
@@ -157,8 +158,6 @@ public class StorageGroupProcessor {
   private final TreeMap<Long, TsFileProcessor> workSequenceTsFileProcessors = new TreeMap<>();
   /** time partition id in the storage group -> tsFileProcessor for this time partition */
   private final TreeMap<Long, TsFileProcessor> workUnsequenceTsFileProcessors = new TreeMap<>();
-  /** compactionMergeWorking is used to wait for last compaction to be done. */
-  private volatile boolean compactionMergeWorking = false;
   // upgrading sequence TsFile resource list
   private List<TsFileResource> upgradeSeqFileList = new LinkedList<>();
 
@@ -373,18 +372,30 @@ public class StorageGroupProcessor {
       globalLatestFlushedTimeForEachDevice.putAll(endTimeMap);
     }
 
-    if (IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()
-        && seqTsFileResources.size() > 0) {
+    if (IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
+      CompactionMergeTaskPoolManager.getInstance()
+          .submitTask(new CompactionAllPartitionTask(storageGroupName));
+    }
+  }
+
+  public class CompactionAllPartitionTask extends StorageGroupCompactionTask{
+
+    CompactionAllPartitionTask(String storageGroupName) {
+      super(storageGroupName);
+    }
+
+    @Override
+    public void run() {
       for (long timePartitionId : partitionLatestFlushedTimeForEachDevice.keySet()) {
-        executeCompaction(
+        syncCompactOnePartition(
             timePartitionId, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
       }
+      clearCompactionStatus();
     }
   }
 
   private void recoverCompaction() {
     if (!CompactionMergeTaskPoolManager.getInstance().isTerminated()) {
-      compactionMergeWorking = true;
       logger.info("{} submit a compaction merge task", storageGroupName);
       try {
         CompactionMergeTaskPoolManager.getInstance()
@@ -1863,37 +1874,47 @@ public class StorageGroupProcessor {
     }
     logger.info("signal closing storage group condition in {}", storageGroupName);
 
-    executeCompaction(
-        tsFileProcessor.getTimeRangeId(),
-        IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+    CompactionMergeTaskPoolManager.getInstance().submitTask(
+        new CompactionOnePartitionTask(storageGroupName, tsFileProcessor.getTimeRangeId()));
   }
 
-  private void executeCompaction(long timePartition, boolean fullMerge) {
-    if (!compactionMergeWorking && !CompactionMergeTaskPoolManager.getInstance().isTerminated()) {
-      compactionMergeWorking = true;
-      logger.info("{} submit a compaction merge task", storageGroupName);
-      try {
-        // fork and filter current tsfile, then commit then to compaction merge
-        tsFileManagement.forkCurrentFileList(timePartition);
-        tsFileManagement.setForceFullMerge(fullMerge);
-        CompactionMergeTaskPoolManager.getInstance()
-            .submitTask(
-                tsFileManagement
-                .new CompactionMergeTask(this::closeCompactionMergeCallBack, timePartition));
-      } catch (IOException | RejectedExecutionException e) {
-        this.closeCompactionMergeCallBack(false, timePartition);
-        logger.error("{} compaction submit task failed", storageGroupName);
-      }
-    } else {
-      logger.info("{} last compaction merge task is working, skip current merge", storageGroupName);
+
+  public class CompactionOnePartitionTask extends StorageGroupCompactionTask{
+
+    private long partition;
+
+    CompactionOnePartitionTask(String storageGroupName, long partition) {
+      super(storageGroupName);
+      this.partition = partition;
+    }
+
+    @Override
+    public void run() {
+      syncCompactOnePartition(
+          partition,
+          IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+      clearCompactionStatus();
+    }
+  }
+
+  private void syncCompactOnePartition(long timePartition, boolean fullMerge) {
+    logger.info("{} submit a compaction merge task", storageGroupName);
+    try {
+      // fork and filter current tsfile, then commit then to compaction merge
+      tsFileManagement.forkCurrentFileList(timePartition);
+      tsFileManagement.setForceFullMerge(fullMerge);
+      tsFileManagement.new CompactionMergeTask(this::closeCompactionMergeCallBack, timePartition)
+          .run();
+    } catch (IOException e) {
+      this.closeCompactionMergeCallBack(false, timePartition);
+      logger.error("{} compaction submit task failed", storageGroupName);
     }
   }
 
   /** close compaction merge callback, to release some locks */
   private void closeCompactionMergeCallBack(boolean isMerge, long timePartitionId) {
-    this.compactionMergeWorking = false;
     if (isMerge && IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
-      executeCompaction(
+      syncCompactOnePartition(
           timePartitionId, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
     }
   }
@@ -1969,15 +1990,9 @@ public class StorageGroupProcessor {
     }
   }
 
-  public void merge(boolean fullMerge) {
-    writeLock();
-    try {
-      for (long timePartitionId : timePartitionIdVersionControllerMap.keySet()) {
-        executeCompaction(timePartitionId, fullMerge);
-      }
-    } finally {
-      writeUnlock();
-    }
+  public void merge() {
+    CompactionMergeTaskPoolManager.getInstance()
+        .submitTask(new CompactionAllPartitionTask(storageGroupName));
   }
 
   /**
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionCacheTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionCacheTest.java
index 6c03e4f..9c581cf 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionCacheTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionCacheTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.engine.compaction;
 
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.iotdb.db.constant.TestConstant;
 import org.apache.iotdb.db.engine.cache.ChunkCache;
 import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index 4a9f513..18f81bf 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -600,7 +600,7 @@ public class StorageGroupProcessorTest {
     }
 
     processor.syncCloseAllWorkingTsFileProcessors();
-    processor.merge(true);
+    processor.merge();
     while (processor.getTsFileManagement().isUnseqMerging) {
       // wait
     }