You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/06/03 09:07:01 UTC
[iotdb] 01/01: fix continus compaction bug
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch fix_partition_merge_0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2f94737c855eb614d5d8fea3212f3b065e6c42f1
Author: qiaojialin <64...@qq.com>
AuthorDate: Thu Jun 3 17:05:01 2021 +0800
fix continus compaction bug
---
.../org/apache/iotdb/db/engine/StorageEngine.java | 2 +-
.../compaction/CompactionMergeTaskPoolManager.java | 13 +++-
.../compaction/StorageGroupCompactionTask.java | 48 ++++++++++++
.../db/engine/compaction/TsFileManagement.java | 8 +-
.../engine/storagegroup/StorageGroupProcessor.java | 91 +++++++++++++---------
.../compaction/LevelCompactionCacheTest.java | 1 +
.../storagegroup/StorageGroupProcessorTest.java | 2 +-
7 files changed, 121 insertions(+), 44 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index f7d0f63..209f1c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -632,7 +632,7 @@ public class StorageEngine implements IService {
throw new StorageEngineException("Current system mode is read only, does not support merge");
}
for (StorageGroupProcessor storageGroupProcessor : processorMap.values()) {
- storageGroupProcessor.merge(fullMerge);
+ storageGroupProcessor.merge();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
index 8abe359..268c92e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.engine.compaction;
import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.COMPACTION_LOG_NAME;
import java.io.File;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
@@ -47,6 +48,7 @@ public class CompactionMergeTaskPoolManager implements IService {
private static final CompactionMergeTaskPoolManager INSTANCE = new CompactionMergeTaskPoolManager();
private ExecutorService pool;
+ private static ConcurrentHashMap<String, Boolean> sgCompactionStatus = new ConcurrentHashMap<>();
public static CompactionMergeTaskPoolManager getInstance() {
return INSTANCE;
}
@@ -137,10 +139,17 @@ public class CompactionMergeTaskPoolManager implements IService {
return ServiceType.COMPACTION_SERVICE;
}
- public void submitTask(Runnable compactionMergeTask)
+ public synchronized void submitTask(StorageGroupCompactionTask storageGroupCompactionTask)
throws RejectedExecutionException {
if (pool != null && !pool.isTerminated()) {
- pool.submit(compactionMergeTask);
+ String storageGroup = storageGroupCompactionTask.getStorageGroupName();
+ boolean isCompacting = sgCompactionStatus.computeIfAbsent(storageGroup, k -> false);
+ if (isCompacting) {
+ return;
+ }
+ storageGroupCompactionTask.setSgCompactionStatus(sgCompactionStatus);
+ sgCompactionStatus.put(storageGroup, true);
+ pool.submit(storageGroupCompactionTask);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/StorageGroupCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/StorageGroupCompactionTask.java
new file mode 100644
index 0000000..d3ce31e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/StorageGroupCompactionTask.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.compaction;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+public abstract class StorageGroupCompactionTask implements Runnable{
+
+ private String storageGroupName;
+ private ConcurrentHashMap<String, Boolean> sgCompactionStatus;
+
+ public StorageGroupCompactionTask(String storageGroupName) {
+ this.storageGroupName = storageGroupName;
+ }
+
+ void setSgCompactionStatus(ConcurrentHashMap<String, Boolean> sgCompactionStatus) {
+ this.sgCompactionStatus = sgCompactionStatus;
+ }
+
+ public String getStorageGroupName() {
+ return storageGroupName;
+ }
+
+ protected void clearCompactionStatus() {
+ // for test
+ if (sgCompactionStatus == null) {
+ sgCompactionStatus = new ConcurrentHashMap<>();
+ }
+ sgCompactionStatus.put(storageGroupName, false);
+ }
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index f3e70ff..db611dc 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -152,13 +152,14 @@ public abstract class TsFileManagement {
protected abstract void merge(long timePartition);
- public class CompactionMergeTask implements Runnable {
+ public class CompactionMergeTask extends StorageGroupCompactionTask {
private CloseCompactionMergeCallBack closeCompactionMergeCallBack;
private long timePartitionId;
public CompactionMergeTask(
CloseCompactionMergeCallBack closeCompactionMergeCallBack, long timePartitionId) {
+ super(storageGroupName);
this.closeCompactionMergeCallBack = closeCompactionMergeCallBack;
this.timePartitionId = timePartitionId;
}
@@ -167,14 +168,16 @@ public abstract class TsFileManagement {
public void run() {
merge(timePartitionId);
closeCompactionMergeCallBack.call(isMergeExecutedInCurrentTask, timePartitionId);
+ clearCompactionStatus();
}
}
- public class CompactionRecoverTask implements Runnable {
+ public class CompactionRecoverTask extends StorageGroupCompactionTask {
private CloseCompactionMergeCallBack closeCompactionMergeCallBack;
public CompactionRecoverTask(CloseCompactionMergeCallBack closeCompactionMergeCallBack) {
+ super(storageGroupName);
this.closeCompactionMergeCallBack = closeCompactionMergeCallBack;
}
@@ -184,6 +187,7 @@ public abstract class TsFileManagement {
// in recover logic, we do not have to start next compaction task, and in this case the param
// time partition is useless, we can just pass 0L
closeCompactionMergeCallBack.call(false, 0L);
+ clearCompactionStatus();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index e5cdc81..3993cda 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -49,6 +49,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
+import org.apache.iotdb.db.engine.compaction.StorageGroupCompactionTask;
import org.apache.iotdb.db.engine.compaction.TsFileManagement;
import org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
@@ -157,8 +158,6 @@ public class StorageGroupProcessor {
private final TreeMap<Long, TsFileProcessor> workSequenceTsFileProcessors = new TreeMap<>();
/** time partition id in the storage group -> tsFileProcessor for this time partition */
private final TreeMap<Long, TsFileProcessor> workUnsequenceTsFileProcessors = new TreeMap<>();
- /** compactionMergeWorking is used to wait for last compaction to be done. */
- private volatile boolean compactionMergeWorking = false;
// upgrading sequence TsFile resource list
private List<TsFileResource> upgradeSeqFileList = new LinkedList<>();
@@ -373,18 +372,30 @@ public class StorageGroupProcessor {
globalLatestFlushedTimeForEachDevice.putAll(endTimeMap);
}
- if (IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()
- && seqTsFileResources.size() > 0) {
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
+ CompactionMergeTaskPoolManager.getInstance()
+ .submitTask(new CompactionAllPartitionTask(storageGroupName));
+ }
+ }
+
+ public class CompactionAllPartitionTask extends StorageGroupCompactionTask{
+
+ CompactionAllPartitionTask(String storageGroupName) {
+ super(storageGroupName);
+ }
+
+ @Override
+ public void run() {
for (long timePartitionId : partitionLatestFlushedTimeForEachDevice.keySet()) {
- executeCompaction(
+ syncCompactOnePartition(
timePartitionId, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
}
+ clearCompactionStatus();
}
}
private void recoverCompaction() {
if (!CompactionMergeTaskPoolManager.getInstance().isTerminated()) {
- compactionMergeWorking = true;
logger.info("{} submit a compaction merge task", storageGroupName);
try {
CompactionMergeTaskPoolManager.getInstance()
@@ -1863,37 +1874,47 @@ public class StorageGroupProcessor {
}
logger.info("signal closing storage group condition in {}", storageGroupName);
- executeCompaction(
- tsFileProcessor.getTimeRangeId(),
- IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+ CompactionMergeTaskPoolManager.getInstance().submitTask(
+ new CompactionOnePartitionTask(storageGroupName, tsFileProcessor.getTimeRangeId()));
}
- private void executeCompaction(long timePartition, boolean fullMerge) {
- if (!compactionMergeWorking && !CompactionMergeTaskPoolManager.getInstance().isTerminated()) {
- compactionMergeWorking = true;
- logger.info("{} submit a compaction merge task", storageGroupName);
- try {
- // fork and filter current tsfile, then commit then to compaction merge
- tsFileManagement.forkCurrentFileList(timePartition);
- tsFileManagement.setForceFullMerge(fullMerge);
- CompactionMergeTaskPoolManager.getInstance()
- .submitTask(
- tsFileManagement
- .new CompactionMergeTask(this::closeCompactionMergeCallBack, timePartition));
- } catch (IOException | RejectedExecutionException e) {
- this.closeCompactionMergeCallBack(false, timePartition);
- logger.error("{} compaction submit task failed", storageGroupName);
- }
- } else {
- logger.info("{} last compaction merge task is working, skip current merge", storageGroupName);
+
+ public class CompactionOnePartitionTask extends StorageGroupCompactionTask{
+
+ private long partition;
+
+ CompactionOnePartitionTask(String storageGroupName, long partition) {
+ super(storageGroupName);
+ this.partition = partition;
+ }
+
+ @Override
+ public void run() {
+ syncCompactOnePartition(
+ partition,
+ IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+ clearCompactionStatus();
+ }
+ }
+
+ private void syncCompactOnePartition(long timePartition, boolean fullMerge) {
+ logger.info("{} submit a compaction merge task", storageGroupName);
+ try {
+ // fork and filter current tsfile, then commit then to compaction merge
+ tsFileManagement.forkCurrentFileList(timePartition);
+ tsFileManagement.setForceFullMerge(fullMerge);
+ tsFileManagement.new CompactionMergeTask(this::closeCompactionMergeCallBack, timePartition)
+ .run();
+ } catch (IOException e) {
+ this.closeCompactionMergeCallBack(false, timePartition);
+ logger.error("{} compaction submit task failed", storageGroupName);
}
}
/** close compaction merge callback, to release some locks */
private void closeCompactionMergeCallBack(boolean isMerge, long timePartitionId) {
- this.compactionMergeWorking = false;
if (isMerge && IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
- executeCompaction(
+ syncCompactOnePartition(
timePartitionId, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
}
}
@@ -1969,15 +1990,9 @@ public class StorageGroupProcessor {
}
}
- public void merge(boolean fullMerge) {
- writeLock();
- try {
- for (long timePartitionId : timePartitionIdVersionControllerMap.keySet()) {
- executeCompaction(timePartitionId, fullMerge);
- }
- } finally {
- writeUnlock();
- }
+ public void merge() {
+ CompactionMergeTaskPoolManager.getInstance()
+ .submitTask(new CompactionAllPartitionTask(storageGroupName));
}
/**
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionCacheTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionCacheTest.java
index 6c03e4f..9c581cf 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionCacheTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionCacheTest.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.engine.compaction;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index 4a9f513..18f81bf 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -600,7 +600,7 @@ public class StorageGroupProcessorTest {
}
processor.syncCloseAllWorkingTsFileProcessors();
- processor.merge(true);
+ processor.merge();
while (processor.getTsFileManagement().isUnseqMerging) {
// wait
}