You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ej...@apache.org on 2021/06/04 11:19:43 UTC
[iotdb] branch rel/0.12 updated: [IOTDB-1419][To rel/0.12] remove
redundant clearCompactionStatus (#3338)
This is an automated email from the ASF dual-hosted git repository.
ejttianyu pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new 347c047 [IOTDB-1419][To rel/0.12] remove redundant clearCompactionStatus (#3338)
347c047 is described below
commit 347c0470e9180aa1dd6b2c17a75efb9c8bbe2613
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Fri Jun 4 19:19:20 2021 +0800
[IOTDB-1419][To rel/0.12] remove redundant clearCompactionStatus (#3338)
remove redundant clearCompactionStatus
Co-authored-by: zhanglingzhe <su...@foxmail.com>
---
.../db/engine/compaction/TsFileManagement.java | 5 +-
.../level/LevelCompactionTsFileManagement.java | 38 +++-----
.../engine/compaction/utils/CompactionUtils.java | 3 +-
.../engine/storagegroup/StorageGroupProcessor.java | 108 +++++++++++++--------
.../db/engine/storagegroup/TsFileResource.java | 5 +
.../db/sync/receiver/load/FileLoaderTest.java | 4 +-
.../iotdb/db/utils/TsFileRewriteToolTest.java | 2 +-
7 files changed, 91 insertions(+), 74 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index 7a7a30c..fffbd3b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -156,23 +156,20 @@ public abstract class TsFileManagement {
protected abstract void merge(long timePartition);
- public class CompactionMergeTask extends StorageGroupCompactionTask {
+ public class CompactionMergeTask {
private CloseCompactionMergeCallBack closeCompactionMergeCallBack;
private long timePartitionId;
public CompactionMergeTask(
CloseCompactionMergeCallBack closeCompactionMergeCallBack, long timePartitionId) {
- super(storageGroupName);
this.closeCompactionMergeCallBack = closeCompactionMergeCallBack;
this.timePartitionId = timePartitionId;
}
- @Override
public Void call() {
merge(timePartitionId);
closeCompactionMergeCallBack.call(isMergeExecutedInCurrentTask, timePartitionId);
- clearCompactionStatus();
return null;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 46a964f..4f4aa15 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -52,11 +52,9 @@ import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
-import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.COMPACTION_LOG_NAME;
import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.SOURCE_NAME;
import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.TARGET_NAME;
-import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
/** The TsFileManagement for LEVEL_COMPACTION, use level struct to manage TsFile list */
public class LevelCompactionTsFileManagement extends TsFileManagement {
@@ -265,7 +263,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
writeLock();
try {
long timePartitionId = tsFileResource.getTimePartition();
- int level = getMergeLevel(tsFileResource.getTsFile());
+ int level = TsFileResource.getMergeLevel(tsFileResource.getTsFile().getName());
if (sequence) {
if (level <= seqLevelNum - 1) {
// current file has normal level
@@ -480,7 +478,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
// get tsfile resource from list, as they have been recovered in StorageGroupProcessor
sourceTsFileResources.add(getTsFileResource(file, isSeq));
}
- int level = getMergeLevel(new File(sourceFileList.get(0)));
+ int level = TsFileResource.getMergeLevel(new File(sourceFileList.get(0)).getName());
RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(target);
// if not complete compaction, resume merge
if (writer.hasCrashed()) {
@@ -506,7 +504,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
throw new InterruptedException(
String.format("%s [Compaction] abort", storageGroupName));
}
- int targetLevel = getMergeLevel(targetResource.getTsFile());
+ int targetLevel = TsFileResource.getMergeLevel(targetResource.getTsFile().getName());
if (isSeq) {
sequenceTsFileResources.get(timePartition).get(targetLevel).add(targetResource);
sequenceRecoverTsFileResources.clear();
@@ -602,18 +600,20 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
&& forkedUnSequenceTsFileResources.get(0).size() > 0) {
isMergeExecutedInCurrentTask =
merge(
- isForceFullMerge,
- getTsFileListByTimePartition(true, timePartition),
- forkedUnSequenceTsFileResources.get(0),
- Long.MAX_VALUE);
+ isForceFullMerge,
+ getTsFileListByTimePartition(true, timePartition),
+ forkedUnSequenceTsFileResources.get(0),
+ Long.MAX_VALUE)
+ || isMergeExecutedInCurrentTask;
} else {
isMergeExecutedInCurrentTask =
merge(
- forkedUnSequenceTsFileResources,
- false,
- timePartition,
- unseqLevelNum,
- unseqFileNumInEachLevel);
+ forkedUnSequenceTsFileResources,
+ false,
+ timePartition,
+ unseqLevelNum,
+ unseqFileNumInEachLevel)
+ || isMergeExecutedInCurrentTask;
}
}
@@ -640,7 +640,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
boolean isMergeExecutedInCurrentTask = false;
CompactionLogger compactionLogger = null;
try {
- logger.info("{} start to filter compaction condition", storageGroupName);
+ logger.debug("{} start to filter compaction condition", storageGroupName);
for (int i = 0; i < currMaxLevel - 1; i++) {
List<TsFileResource> currLevelTsFileResource = mergeResources.get(i);
if (currMaxFileNumInEachLevel <= currLevelTsFileResource.size()) {
@@ -779,14 +779,6 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
return newUnSequenceTsFileResources;
}
- public static int getMergeLevel(File file) {
- String mergeLevelStr =
- file.getPath()
- .substring(file.getPath().lastIndexOf(FILE_NAME_SEPARATOR) + 1)
- .replaceAll(TSFILE_SUFFIX, "");
- return Integer.parseInt(mergeLevelStr);
- }
-
private TsFileResource getRecoverTsFileResource(String filePath, boolean isSeq)
throws IOException {
if (isSeq) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index 2d3d6a7..4f81329 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -443,7 +443,8 @@ public class CompactionUtils {
logger.info("{} tsfile does not exist", path);
return null;
}
- } catch (IOException e) {
+ } catch (Exception e) {
+ logger.warn("{} tsfile may be destroyed", path);
logger.error(
"Storage group {}, flush recover meets error. reader create failed.",
storageGroup,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 55c277d..8c2d95d 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
import org.apache.iotdb.db.engine.compaction.StorageGroupCompactionTask;
import org.apache.iotdb.db.engine.compaction.TsFileManagement;
-import org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.flush.CloseFileListener;
import org.apache.iotdb.db.engine.flush.FlushListener;
@@ -443,6 +442,42 @@ public class StorageGroupProcessor {
recoverTsFiles(value, false);
}
+ for (TsFileResource resource : tsFileManagement.getTsFileList(true)) {
+ long partitionNum = resource.getTimePartition();
+ updatePartitionFileVersion(partitionNum, resource.getVersion());
+ }
+ for (TsFileResource resource : tsFileManagement.getTsFileList(false)) {
+ long partitionNum = resource.getTimePartition();
+ updatePartitionFileVersion(partitionNum, resource.getVersion());
+ }
+ for (TsFileResource resource : upgradeSeqFileList) {
+ long partitionNum = resource.getTimePartition();
+ updatePartitionFileVersion(partitionNum, resource.getVersion());
+ }
+ for (TsFileResource resource : upgradeUnseqFileList) {
+ long partitionNum = resource.getTimePartition();
+ updatePartitionFileVersion(partitionNum, resource.getVersion());
+ }
+ updateLatestFlushedTime();
+
+ List<TsFileResource> seqTsFileResources = tsFileManagement.getTsFileList(true);
+ for (TsFileResource resource : seqTsFileResources) {
+ long timePartitionId = resource.getTimePartition();
+ Map<String, Long> endTimeMap = new HashMap<>();
+ for (String deviceId : resource.getDevices()) {
+ long endTime = resource.getEndTime(deviceId);
+ endTimeMap.put(deviceId, endTime);
+ }
+ latestTimeForEachDevice
+ .computeIfAbsent(timePartitionId, l -> new HashMap<>())
+ .putAll(endTimeMap);
+ partitionLatestFlushedTimeForEachDevice
+ .computeIfAbsent(timePartitionId, id -> new HashMap<>())
+ .putAll(endTimeMap);
+ globalLatestFlushedTimeForEachDevice.putAll(endTimeMap);
+ }
+
+ // leave it in the end
String taskName =
logicalStorageGroupName + "-" + virtualStorageGroupId + "-" + System.currentTimeMillis();
File mergingMods =
@@ -450,6 +485,7 @@ public class StorageGroupProcessor {
if (mergingMods.exists()) {
this.tsFileManagement.mergingModification = new ModificationFile(mergingMods.getPath());
}
+
RecoverMergeTask recoverMergeTask =
new RecoverMergeTask(
new ArrayList<>(tsFileManagement.getTsFileList(true)),
@@ -470,48 +506,9 @@ public class StorageGroupProcessor {
mergingMods.delete();
}
recoverCompaction();
- for (TsFileResource resource : tsFileManagement.getTsFileList(true)) {
- long partitionNum = resource.getTimePartition();
- updatePartitionFileVersion(partitionNum, resource.getVersion());
- }
- for (TsFileResource resource : tsFileManagement.getTsFileList(false)) {
- long partitionNum = resource.getTimePartition();
- updatePartitionFileVersion(partitionNum, resource.getVersion());
- }
- for (TsFileResource resource : upgradeSeqFileList) {
- long partitionNum = resource.getTimePartition();
- updatePartitionFileVersion(partitionNum, resource.getVersion());
- }
- for (TsFileResource resource : upgradeUnseqFileList) {
- long partitionNum = resource.getTimePartition();
- updatePartitionFileVersion(partitionNum, resource.getVersion());
- }
- updateLatestFlushedTime();
} catch (IOException | MetadataException e) {
throw new StorageGroupProcessorException(e);
}
-
- List<TsFileResource> seqTsFileResources = tsFileManagement.getTsFileList(true);
- for (TsFileResource resource : seqTsFileResources) {
- long timePartitionId = resource.getTimePartition();
- Map<String, Long> endTimeMap = new HashMap<>();
- for (String deviceId : resource.getDevices()) {
- long endTime = resource.getEndTime(deviceId);
- endTimeMap.put(deviceId, endTime);
- }
- latestTimeForEachDevice
- .computeIfAbsent(timePartitionId, l -> new HashMap<>())
- .putAll(endTimeMap);
- partitionLatestFlushedTimeForEachDevice
- .computeIfAbsent(timePartitionId, id -> new HashMap<>())
- .putAll(endTimeMap);
- globalLatestFlushedTimeForEachDevice.putAll(endTimeMap);
- }
-
- if (IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
- CompactionMergeTaskPoolManager.getInstance()
- .submitTask(new CompactionAllPartitionTask(logicalStorageGroupName));
- }
}
public class CompactionAllPartitionTask extends StorageGroupCompactionTask {
@@ -522,6 +519,10 @@ public class StorageGroupProcessor {
@Override
public Void call() {
+ logger.info(
+ "all partition in storage group {}: {}",
+ logicalStorageGroupName,
+ partitionLatestFlushedTimeForEachDevice.keySet());
for (long timePartitionId : partitionLatestFlushedTimeForEachDevice.keySet()) {
syncCompactOnePartition(
timePartitionId, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
@@ -540,7 +541,7 @@ public class StorageGroupProcessor {
try {
CompactionMergeTaskPoolManager.getInstance()
.submitTask(
- tsFileManagement.new CompactionRecoverTask(this::closeCompactionMergeCallBack));
+ tsFileManagement.new CompactionRecoverTask(this::closeCompactionRecoverCallBack));
} catch (RejectedExecutionException e) {
this.closeCompactionMergeCallBack(false, 0);
logger.error(
@@ -707,7 +708,7 @@ public class StorageGroupProcessor {
RestorableTsFileIOWriter writer;
try {
// this tsfile is not zero level, no need to perform redo wal
- if (LevelCompactionTsFileManagement.getMergeLevel(tsFileResource.getTsFile()) > 0) {
+ if (TsFileResource.getMergeLevel(tsFileResource.getTsFile().getName()) > 0) {
writer =
recoverPerformer.recover(false, this::getWalDirectByteBuffer, this::releaseWalBuffer);
if (writer.hasCrashed()) {
@@ -1986,7 +1987,10 @@ public class StorageGroupProcessor {
private void syncCompactOnePartition(long timePartition, boolean fullMerge) {
logger.info(
- "{}-{} submit a compaction merge task", logicalStorageGroupName, virtualStorageGroupId);
+ "{}-{} partition:{}, submit a compaction merge task",
+ logicalStorageGroupName,
+ virtualStorageGroupId,
+ timePartition);
try {
// fork and filter current tsfile, then commit then to compaction merge
tsFileManagement.forkCurrentFileList(timePartition);
@@ -2000,11 +2004,29 @@ public class StorageGroupProcessor {
}
}
+ /** close recover compaction merge callback, to start continuous compaction */
+ private void closeCompactionRecoverCallBack(boolean isMerge, long timePartitionId) {
+ logger.info(
+ "{}-{} recover finished, submit continuous compaction task",
+ logicalStorageGroupName,
+ virtualStorageGroupId);
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
+ CompactionMergeTaskPoolManager.getInstance()
+ .submitTask(new CompactionAllPartitionTask(logicalStorageGroupName));
+ }
+ }
+
/** close compaction merge callback, to release some locks */
private void closeCompactionMergeCallBack(boolean isMerge, long timePartitionId) {
if (isMerge && IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
syncCompactOnePartition(
timePartitionId, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+ } else {
+ logger.info(
+ "{}-{} partition:{}, do not have to submit a new compaction merge task",
+ logicalStorageGroupName,
+ virtualStorageGroupId,
+ timePartitionId);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index c3080d1..5c67e04 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -785,6 +785,11 @@ public class TsFileResource {
return tsFileName;
}
+ public static int getMergeLevel(String fileName) {
+ TsFileName tsFileName = getTsFileName(fileName);
+ return tsFileName.mergeCnt;
+ }
+
public static TsFileResource modifyTsFileNameUnseqMergCnt(TsFileResource tsFileResource) {
File tsFile = tsFileResource.getTsFile();
String path = tsFile.getParent();
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java b/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java
index 2ce45a0..fe5aeb2 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java
@@ -120,7 +120,7 @@ public class FileLoaderTest {
+ IoTDBConstant.FILE_NAME_SEPARATOR
+ rand
+ IoTDBConstant.FILE_NAME_SEPARATOR
- + "0.tsfile";
+ + "0-0.tsfile";
File syncFile = new File(fileName);
File dataFile =
@@ -244,7 +244,7 @@ public class FileLoaderTest {
+ IoTDBConstant.FILE_NAME_SEPARATOR
+ rand
+ IoTDBConstant.FILE_NAME_SEPARATOR
- + "0.tsfile";
+ + "0-0.tsfile";
File syncFile = new File(fileName);
File dataFile =
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/TsFileRewriteToolTest.java b/server/src/test/java/org/apache/iotdb/db/utils/TsFileRewriteToolTest.java
index 58126ef..7686021 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/TsFileRewriteToolTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/TsFileRewriteToolTest.java
@@ -102,7 +102,7 @@ public class TsFileRewriteToolTest {
boolean success = f.mkdir();
Assert.assertTrue(success);
}
- path = folder + File.separator + System.currentTimeMillis() + "-" + 0 + "-0.tsfile";
+ path = folder + File.separator + System.currentTimeMillis() + "-" + 0 + "-0-0.tsfile";
}
@After