You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/06/04 03:43:05 UTC
[iotdb] branch rel/0.12 updated: [IOTDB-1419][To rel/0.12] fix
continuous compaction doesn't take effect when enablePartition (#3326)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new da162af [IOTDB-1419][To rel/0.12] fix continuous compaction doesn't take effect when enablePartition (#3326)
da162af is described below
commit da162af63d775df934098bd02ee935e1a2250cd1
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Fri Jun 4 11:42:42 2021 +0800
[IOTDB-1419][To rel/0.12] fix continuous compaction doesn't take effect when enablePartition (#3326)
---
.../compaction/CompactionMergeTaskPoolManager.java | 31 +++---
.../compaction/StorageGroupCompactionTask.java | 48 +++++++++
.../db/engine/compaction/TsFileManagement.java | 28 +++---
.../engine/storagegroup/StorageGroupProcessor.java | 107 +++++++++++----------
.../virtualSg/VirtualStorageGroupManager.java | 2 +-
.../storagegroup/StorageGroupProcessorTest.java | 2 +-
6 files changed, 144 insertions(+), 74 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
index cdf6f4a..c04661c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
@@ -36,7 +36,6 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
@@ -56,6 +55,8 @@ public class CompactionMergeTaskPoolManager implements IService {
private ExecutorService pool;
private Map<String, Set<Future<Void>>> storageGroupTasks = new ConcurrentHashMap<>();
+ private static ConcurrentHashMap<String, Boolean> sgCompactionStatus = new ConcurrentHashMap<>();
+
public static CompactionMergeTaskPoolManager getInstance() {
return INSTANCE;
}
@@ -159,16 +160,6 @@ public class CompactionMergeTaskPoolManager implements IService {
return ServiceType.COMPACTION_SERVICE;
}
- public void submitTask(String storageGroupName, Callable<Void> compactionMergeTask)
- throws RejectedExecutionException {
- if (pool != null && !pool.isTerminated()) {
- Future<Void> future = pool.submit(compactionMergeTask);
- storageGroupTasks
- .computeIfAbsent(storageGroupName, k -> new ConcurrentSkipListSet<>())
- .add(future);
- }
- }
-
/**
* Abort all compactions of a storage group. The caller must acquire the write lock of the
* corresponding storage group.
@@ -181,11 +172,29 @@ public class CompactionMergeTaskPoolManager implements IService {
Future<Void> next = subIterator.next();
if (!next.isDone() && !next.isCancelled()) {
next.cancel(true);
+ sgCompactionStatus.put(storageGroup, false);
}
subIterator.remove();
}
}
+ public synchronized void submitTask(StorageGroupCompactionTask storageGroupCompactionTask)
+ throws RejectedExecutionException {
+ if (pool != null && !pool.isTerminated()) {
+ String storageGroup = storageGroupCompactionTask.getStorageGroupName();
+ boolean isCompacting = sgCompactionStatus.computeIfAbsent(storageGroup, k -> false);
+ if (isCompacting) {
+ return;
+ }
+ storageGroupCompactionTask.setSgCompactionStatus(sgCompactionStatus);
+ sgCompactionStatus.put(storageGroup, true);
+ Future<Void> future = pool.submit(storageGroupCompactionTask);
+ storageGroupTasks
+ .computeIfAbsent(storageGroup, k -> new ConcurrentSkipListSet<>())
+ .add(future);
+ }
+ }
+
public boolean isTerminated() {
return pool == null || pool.isTerminated();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/StorageGroupCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/StorageGroupCompactionTask.java
new file mode 100644
index 0000000..78061ce
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/StorageGroupCompactionTask.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.compaction;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+
+public abstract class StorageGroupCompactionTask implements Callable<Void> {
+
+ private String storageGroupName;
+ private ConcurrentHashMap<String, Boolean> sgCompactionStatus;
+
+ public StorageGroupCompactionTask(String storageGroupName) {
+ this.storageGroupName = storageGroupName;
+ }
+
+ void setSgCompactionStatus(ConcurrentHashMap<String, Boolean> sgCompactionStatus) {
+ this.sgCompactionStatus = sgCompactionStatus;
+ }
+
+ public String getStorageGroupName() {
+ return storageGroupName;
+ }
+
+ protected void clearCompactionStatus() {
+ // for test
+ if (sgCompactionStatus == null) {
+ sgCompactionStatus = new ConcurrentHashMap<>();
+ }
+ sgCompactionStatus.put(storageGroupName, false);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index 595ddf8..7a7a30c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -44,7 +44,6 @@ import java.io.IOException;
import java.nio.file.Files;
import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.Callable;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -157,13 +156,14 @@ public abstract class TsFileManagement {
protected abstract void merge(long timePartition);
- public class CompactionMergeTask implements Callable<Void> {
+ public class CompactionMergeTask extends StorageGroupCompactionTask {
private CloseCompactionMergeCallBack closeCompactionMergeCallBack;
private long timePartitionId;
public CompactionMergeTask(
CloseCompactionMergeCallBack closeCompactionMergeCallBack, long timePartitionId) {
+ super(storageGroupName);
this.closeCompactionMergeCallBack = closeCompactionMergeCallBack;
this.timePartitionId = timePartitionId;
}
@@ -172,15 +172,17 @@ public abstract class TsFileManagement {
public Void call() {
merge(timePartitionId);
closeCompactionMergeCallBack.call(isMergeExecutedInCurrentTask, timePartitionId);
+ clearCompactionStatus();
return null;
}
}
- public class CompactionRecoverTask implements Callable<Void> {
+ public class CompactionRecoverTask extends StorageGroupCompactionTask {
private CloseCompactionMergeCallBack closeCompactionMergeCallBack;
public CompactionRecoverTask(CloseCompactionMergeCallBack closeCompactionMergeCallBack) {
+ super(storageGroupName);
this.closeCompactionMergeCallBack = closeCompactionMergeCallBack;
}
@@ -190,6 +192,7 @@ public abstract class TsFileManagement {
// in recover logic, we do not have to start next compaction task, and in this case the param
// time partition is useless, we can just pass 0L
closeCompactionMergeCallBack.call(false, 0L);
+ clearCompactionStatus();
return null;
}
}
@@ -199,15 +202,6 @@ public abstract class TsFileManagement {
List<TsFileResource> seqMergeList,
List<TsFileResource> unSeqMergeList,
long dataTTL) {
- if (isUnseqMerging) {
- if (logger.isInfoEnabled()) {
- logger.info(
- "{} Last merge is ongoing, currently consumed time: {}ms",
- storageGroupName,
- (System.currentTimeMillis() - mergeStartTime));
- }
- return false;
- }
// wait until seq merge has finished
while (isSeqMerging) {
try {
@@ -288,6 +282,16 @@ public abstract class TsFileManagement {
mergeFiles[0].size(),
mergeFiles[1].size());
}
+ // wait until unseq merge has finished
+ while (isUnseqMerging) {
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ logger.error("{} [Compaction] shutdown", storageGroupName, e);
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ }
return true;
} catch (MergeException | IOException e) {
logger.error("{} cannot select file for merge", storageGroupName, e);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 7427f98..e9fc64b 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
+import org.apache.iotdb.db.engine.compaction.StorageGroupCompactionTask;
import org.apache.iotdb.db.engine.compaction.TsFileManagement;
import org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
@@ -170,8 +171,6 @@ public class StorageGroupProcessor {
private final TreeMap<Long, TsFileProcessor> workSequenceTsFileProcessors = new TreeMap<>();
/** time partition id in the storage group -> tsFileProcessor for this time partition */
private final TreeMap<Long, TsFileProcessor> workUnsequenceTsFileProcessors = new TreeMap<>();
- /** compactionMergeWorking is used to wait for last compaction to be done. */
- private volatile boolean compactionMergeWorking = false;
// upgrading sequence TsFile resource list
private List<TsFileResource> upgradeSeqFileList = new LinkedList<>();
@@ -507,18 +506,31 @@ public class StorageGroupProcessor {
globalLatestFlushedTimeForEachDevice.putAll(endTimeMap);
}
- if (IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()
- && seqTsFileResources.size() > 0) {
- for (long timePartitionId : timePartitionIdVersionControllerMap.keySet()) {
- executeCompaction(
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
+ CompactionMergeTaskPoolManager.getInstance()
+ .submitTask(new CompactionAllPartitionTask(logicalStorageGroupName));
+ }
+ }
+
+ public class CompactionAllPartitionTask extends StorageGroupCompactionTask {
+
+ CompactionAllPartitionTask(String storageGroupName) {
+ super(storageGroupName);
+ }
+
+ @Override
+ public Void call() {
+ for (long timePartitionId : partitionLatestFlushedTimeForEachDevice.keySet()) {
+ syncCompactOnePartition(
timePartitionId, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
}
+ clearCompactionStatus();
+ return null;
}
}
private void recoverCompaction() {
if (!CompactionMergeTaskPoolManager.getInstance().isTerminated()) {
- compactionMergeWorking = true;
logger.info(
"{} - {} submit a compaction recover merge task",
logicalStorageGroupName,
@@ -526,7 +538,6 @@ public class StorageGroupProcessor {
try {
CompactionMergeTaskPoolManager.getInstance()
.submitTask(
- logicalStorageGroupName,
tsFileManagement.new CompactionRecoverTask(this::closeCompactionMergeCallBack));
} catch (RejectedExecutionException e) {
this.closeCompactionMergeCallBack(false, 0);
@@ -1945,47 +1956,51 @@ public class StorageGroupProcessor {
"signal closing storage group condition in {}",
logicalStorageGroupName + "-" + virtualStorageGroupId);
- executeCompaction(
- tsFileProcessor.getTimeRangeId(),
- IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+ CompactionMergeTaskPoolManager.getInstance()
+ .submitTask(
+ new CompactionOnePartitionTask(
+ logicalStorageGroupName, tsFileProcessor.getTimeRangeId()));
}
- private void executeCompaction(long timePartition, boolean fullMerge) {
- if (!compactionMergeWorking && !CompactionMergeTaskPoolManager.getInstance().isTerminated()) {
- compactionMergeWorking = true;
- logger.info(
- "{} submit a compaction merge task",
- logicalStorageGroupName + "-" + virtualStorageGroupId);
- try {
- // fork and filter current tsfile, then commit then to compaction merge
- tsFileManagement.forkCurrentFileList(timePartition);
- tsFileManagement.setForceFullMerge(fullMerge);
- CompactionMergeTaskPoolManager.getInstance()
- .submitTask(
- logicalStorageGroupName,
- tsFileManagement
- .new CompactionMergeTask(this::closeCompactionMergeCallBack, timePartition));
- } catch (IOException | RejectedExecutionException e) {
- this.closeCompactionMergeCallBack(false, timePartition);
- logger.error(
- "{} compaction submit task failed",
- logicalStorageGroupName + "-" + virtualStorageGroupId,
- e);
- }
- } else {
- logger.info(
- "{} last compaction merge task is working, skip current merge",
- logicalStorageGroupName + "-" + virtualStorageGroupId);
+ public class CompactionOnePartitionTask extends StorageGroupCompactionTask {
+
+ private long partition;
+
+ CompactionOnePartitionTask(String storageGroupName, long partition) {
+ super(storageGroupName);
+ this.partition = partition;
+ }
+
+ @Override
+ public Void call() {
+ syncCompactOnePartition(
+ partition, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+ clearCompactionStatus();
+ return null;
+ }
+ }
+
+ private void syncCompactOnePartition(long timePartition, boolean fullMerge) {
+ logger.info(
+ "{}-{} submit a compaction merge task", logicalStorageGroupName, virtualStorageGroupId);
+ try {
+ // fork and filter current tsfile, then commit then to compaction merge
+ tsFileManagement.forkCurrentFileList(timePartition);
+ tsFileManagement.setForceFullMerge(fullMerge);
+ tsFileManagement.new CompactionMergeTask(this::closeCompactionMergeCallBack, timePartition)
+ .call();
+ } catch (IOException e) {
+ this.closeCompactionMergeCallBack(false, timePartition);
+ logger.error(
+ "{}-{} compaction submit task failed", logicalStorageGroupName, virtualStorageGroupId);
}
}
/** close compaction merge callback, to release some locks */
private void closeCompactionMergeCallBack(boolean isMerge, long timePartitionId) {
if (isMerge && IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
- executeCompaction(
+ syncCompactOnePartition(
timePartitionId, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
- } else {
- this.compactionMergeWorking = false;
}
}
@@ -2086,15 +2101,9 @@ public class StorageGroupProcessor {
resources.clear();
}
- public void merge(boolean isFullMerge) {
- writeLock();
- try {
- for (long timePartitionId : partitionLatestFlushedTimeForEachDevice.keySet()) {
- executeCompaction(timePartitionId, isFullMerge);
- }
- } finally {
- writeUnlock();
- }
+ public void merge() {
+ CompactionMergeTaskPoolManager.getInstance()
+ .submitTask(new CompactionAllPartitionTask(logicalStorageGroupName));
}
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
index e8d1f77..fbbec79 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
@@ -298,7 +298,7 @@ public class VirtualStorageGroupManager {
public void mergeAll(boolean isFullMerge) {
for (StorageGroupProcessor storageGroupProcessor : virtualStorageGroupProcessor) {
if (storageGroupProcessor != null) {
- storageGroupProcessor.merge(isFullMerge);
+ storageGroupProcessor.merge();
}
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index e68c4af..8cb27ac 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -600,7 +600,7 @@ public class StorageGroupProcessorTest {
}
processor.syncCloseAllWorkingTsFileProcessors();
- processor.merge(IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+ processor.merge();
while (processor.getTsFileManagement().isUnseqMerging) {
// wait
}