You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by er...@apache.org on 2022/04/30 09:56:11 UTC
[iotdb] 01/01: [To rel/0.13][IOTDB-3018] Fix compation bugs on handling deleted target file
This is an automated email from the ASF dual-hosted git repository.
ericpai pushed a commit to branch bugfix/cp-iotdb-3018
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 22833be80705ef761103bd5909ce8e4836358b7e
Author: Eric Pai <er...@hotmail.com>
AuthorDate: Sat Apr 30 17:55:40 2022 +0800
[To rel/0.13][IOTDB-3018] Fix compation bugs on handling deleted target file
---
.../engine/compaction/CompactionTaskManager.java | 31 ++++----
.../task/RewriteCrossSpaceCompactionTask.java | 2 +-
.../inner/sizetiered/SizeTieredCompactionTask.java | 31 ++++----
.../inner/utils/InnerSpaceCompactionUtils.java | 7 ++
.../compaction/task/AbstractCompactionTask.java | 17 ++--
.../compaction/task/CompactionTaskSummary.java | 32 ++++++++
.../compaction/CompactionTaskManagerTest.java | 2 +-
.../inner/InnerCompactionEmptyTsFileTest.java | 90 ++++++++++++++++++++++
8 files changed, 173 insertions(+), 39 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
index 96021ce239..280ea16c27 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.constant.CompactionTaskStatus;
import org.apache.iotdb.db.engine.compaction.task.AbstractCompactionTask;
+import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary;
import org.apache.iotdb.db.service.IService;
import org.apache.iotdb.db.service.ServiceType;
import org.apache.iotdb.db.utils.TestOnly;
@@ -68,7 +69,8 @@ public class CompactionTaskManager implements IService {
new FixedPriorityBlockingQueue<>(1024, new CompactionTaskComparator());
// <logicalStorageGroupName,futureSet>, it is used to terminate all compaction tasks under the
// logicalStorageGroup
- private Map<String, Set<Future<Void>>> storageGroupTasks = new ConcurrentHashMap<>();
+ private Map<String, Set<Future<CompactionTaskSummary>>> storageGroupTasks =
+ new ConcurrentHashMap<>();
private List<AbstractCompactionTask> runningCompactionTaskList = new ArrayList<>();
// The thread pool that periodically fetches and executes the compaction task from
@@ -139,7 +141,7 @@ public class CompactionTaskManager implements IService {
if (taskExecutionPool != null) {
awaitTermination(taskExecutionPool, milliseconds);
awaitTermination(compactionTaskSubmissionThreadPool, milliseconds);
- logger.info("Waiting for task taskExecutionPool to shut down");
+ logger.info("Waiting for task taskExecutionPool to shut down in {} ms", milliseconds);
waitTermination();
storageGroupTasks.clear();
}
@@ -168,6 +170,7 @@ public class CompactionTaskManager implements IService {
}
}
storageGroupTasks.clear();
+ candidateCompactionTaskQueue.clear();
logger.info("All compaction task finish");
}
}
@@ -297,25 +300,25 @@ public class CompactionTaskManager implements IService {
/**
* This method will directly submit a task to thread pool if there is available thread.
*
- * @throws RejectedExecutionException
+ * @return the future of the task.
*/
- public synchronized void submitTask(Callable<Void> compactionMergeTask)
- throws RejectedExecutionException {
- if (taskExecutionPool != null && !taskExecutionPool.isTerminated()) {
- taskExecutionPool.submit(compactionMergeTask);
- return;
+ public synchronized Future<CompactionTaskSummary> submitTask(
+ Callable<CompactionTaskSummary> compactionMergeTask) throws RejectedExecutionException {
+ if (taskExecutionPool != null && !taskExecutionPool.isShutdown()) {
+ Future<CompactionTaskSummary> future = taskExecutionPool.submit(compactionMergeTask);
+ return future;
}
logger.warn(
"A CompactionTask failed to be submitted to CompactionTaskManager because {}",
taskExecutionPool == null
? "taskExecutionPool is null"
: "taskExecutionPool is terminated");
+ return null;
}
public synchronized Future<Void> submitSubTask(Callable<Void> subCompactionTask) {
- if (subCompactionTaskExecutionPool != null && !subCompactionTaskExecutionPool.isTerminated()) {
- Future<Void> future = subCompactionTaskExecutionPool.submit(subCompactionTask);
- return future;
+ if (subCompactionTaskExecutionPool != null && !subCompactionTaskExecutionPool.isShutdown()) {
+ return subCompactionTaskExecutionPool.submit(subCompactionTask);
}
return null;
}
@@ -325,12 +328,12 @@ public class CompactionTaskManager implements IService {
* corresponding storage group.
*/
public void abortCompaction(String fullStorageGroupName) {
- Set<Future<Void>> subTasks =
+ Set<Future<CompactionTaskSummary>> subTasks =
storageGroupTasks.getOrDefault(fullStorageGroupName, Collections.emptySet());
candidateCompactionTaskQueue.clear();
- Iterator<Future<Void>> subIterator = subTasks.iterator();
+ Iterator<Future<CompactionTaskSummary>> subIterator = subTasks.iterator();
while (subIterator.hasNext()) {
- Future<Void> next = subIterator.next();
+ Future<CompactionTaskSummary> next = subIterator.next();
if (!next.isDone() && !next.isCancelled()) {
next.cancel(true);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossSpaceCompactionTask.java
index 13f233f64c..a95b8d44d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossSpaceCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossSpaceCompactionTask.java
@@ -87,7 +87,7 @@ public class RewriteCrossSpaceCompactionTask extends AbstractCrossSpaceCompactio
executeCompaction();
} catch (Throwable throwable) {
// catch throwable instead of exception to handle OOM errors
- logger.error("Meet errors in cross space compaction, {}", throwable.getMessage());
+ logger.error("Meet errors in cross space compaction", throwable);
CompactionExceptionHandler.handleException(
fullStorageGroupName,
logFile,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
index 8779ead76b..8c01e06635 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
@@ -31,12 +31,14 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceList;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@@ -92,6 +94,8 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
TsFileResource targetTsFileResource =
TsFileNameGenerator.getInnerCompactionTargetFileResource(
selectedTsFileResourceList, sequence);
+ List<TsFileResource> targetTsFileList =
+ new ArrayList<>(Collections.singletonList(targetTsFileResource));
LOGGER.info(
"{} [Compaction] starting compaction task with {} files",
fullStorageGroupName,
@@ -107,8 +111,7 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
sizeTieredCompactionLogger = new CompactionLogger(logFile);
sizeTieredCompactionLogger.logFiles(
selectedTsFileResourceList, CompactionLogger.STR_SOURCE_FILES);
- sizeTieredCompactionLogger.logFiles(
- Collections.singletonList(targetTsFileResource), CompactionLogger.STR_TARGET_FILES);
+ sizeTieredCompactionLogger.logFiles(targetTsFileList, CompactionLogger.STR_TARGET_FILES);
LOGGER.info("{} [SizeTiredCompactionTask] Close the logger", fullStorageGroupName);
sizeTieredCompactionLogger.close();
LOGGER.info(
@@ -119,9 +122,7 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
InnerSpaceCompactionUtils.compact(targetTsFileResource, selectedTsFileResourceList);
} else {
CompactionUtils.compact(
- Collections.emptyList(),
- selectedTsFileResourceList,
- Collections.singletonList(targetTsFileResource));
+ Collections.emptyList(), selectedTsFileResourceList, targetTsFileList);
}
InnerSpaceCompactionUtils.moveTargetFile(targetTsFileResource, fullStorageGroupName);
@@ -140,14 +141,14 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
tsFileManager.replace(
selectedTsFileResourceList,
Collections.emptyList(),
- Collections.singletonList(targetTsFileResource),
+ targetTsFileList,
timePartition,
true);
} else {
tsFileManager.replace(
Collections.emptyList(),
selectedTsFileResourceList,
- Collections.singletonList(targetTsFileResource),
+ targetTsFileList,
timePartition,
false);
}
@@ -164,10 +165,11 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
isHoldingWriteLock[i] = true;
}
- if (targetTsFileResource.getTsFile().length()
- < TSFileConfig.MAGIC_STRING.getBytes().length * 2L + Byte.BYTES) {
+ if (targetTsFileResource.getTsFile().exists()
+ && targetTsFileResource.getTsFile().length()
+ < TSFileConfig.MAGIC_STRING.getBytes().length * 2L + Byte.BYTES) {
// the file size is smaller than magic string and version number
- throw new RuntimeException(
+ throw new TsFileNotCompleteException(
String.format(
"target file %s is smaller than magic string and version number size",
targetTsFileResource));
@@ -193,10 +195,6 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
FileUtils.delete(logFile);
}
} catch (Throwable throwable) {
- LOGGER.error(
- "{} [Compaction] Throwable is caught during execution of SizeTieredCompaction, {}",
- fullStorageGroupName,
- throwable);
LOGGER.warn("{} [Compaction] Start to handle exception", fullStorageGroupName);
if (sizeTieredCompactionLogger != null) {
sizeTieredCompactionLogger.close();
@@ -205,7 +203,7 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
CompactionExceptionHandler.handleException(
fullStorageGroupName,
logFile,
- Collections.singletonList(targetTsFileResource),
+ targetTsFileList,
selectedTsFileResourceList,
Collections.emptyList(),
tsFileManager,
@@ -216,7 +214,7 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
CompactionExceptionHandler.handleException(
fullStorageGroupName,
logFile,
- Collections.singletonList(targetTsFileResource),
+ targetTsFileList,
Collections.emptyList(),
selectedTsFileResourceList,
tsFileManager,
@@ -224,6 +222,7 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
true,
isSequence());
}
+ throw throwable;
} finally {
releaseFileLocksAndResetMergingStatus();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
index e5becdf2d2..cf402099c7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
@@ -270,6 +270,13 @@ public class InnerSpaceCompactionUtils {
*/
public static void moveTargetFile(TsFileResource targetResource, String fullStorageGroupName)
throws IOException {
+ if (!targetResource.getTsFile().exists()) {
+ logger.info(
+ "{} [Compaction] Tmp target tsfile {} may be deleted after compaction.",
+ fullStorageGroupName,
+ targetResource.getTsFilePath());
+ return;
+ }
if (!targetResource.getTsFilePath().endsWith(IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX)) {
logger.warn(
"{} [Compaction] Tmp target tsfile {} should be end with {}",
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java
index 359e7027b0..089e888145 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java
@@ -31,11 +31,12 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
* AbstractCompactionTask is the base class for all compaction task, it carries out the execution of
- * compaction. AbstractCompactionTask uses a template method, it execute the abstract function
- * <i>doCompaction</i> implemented by subclass, and decrease the currentTaskNum in
- * CompactionScheduler when the <i>doCompaction</i> finish.
+ * * compaction. AbstractCompactionTask uses a template method, it executes the abstract function *
+ * {@link AbstractCompactionTask#doCompaction()} implemented by subclass, and decrease the *
+ * currentTaskNum in CompactionScheduler when the {@link AbstractCompactionTask#doCompaction()} is *
+ * finished. The future returns the {@link CompactionTaskSummary} of this task execution.
*/
-public abstract class AbstractCompactionTask implements Callable<Void> {
+public abstract class AbstractCompactionTask implements Callable<CompactionTaskSummary> {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
protected String fullStorageGroupName;
@@ -60,20 +61,22 @@ public abstract class AbstractCompactionTask implements Callable<Void> {
protected abstract void doCompaction() throws Exception;
@Override
- public Void call() throws Exception {
+ public CompactionTaskSummary call() throws Exception {
long startTime = System.currentTimeMillis();
currentTaskNum.incrementAndGet();
+ boolean isSuccess = false;
try {
doCompaction();
+ isSuccess = true;
} catch (Exception e) {
- LOGGER.error(e.getMessage(), e);
+ LOGGER.error("Running compaction task failed", e);
} finally {
CompactionTaskManager.getInstance().removeRunningTaskFromList(this);
timeCost = System.currentTimeMillis() - startTime;
this.currentTaskNum.decrementAndGet();
}
- return null;
+ return new CompactionTaskSummary(isSuccess);
}
public String getFullStorageGroupName() {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionTaskSummary.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionTaskSummary.java
new file mode 100644
index 0000000000..a7380969ff
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionTaskSummary.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.compaction.task;
+
+/** The summary of one {@link AbstractCompactionTask} execution */
+public class CompactionTaskSummary {
+ private final boolean success;
+
+ public CompactionTaskSummary(boolean success) {
+ this.success = success;
+ }
+
+ public boolean isSuccess() {
+ return success;
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
index 511cdb144f..fa6c4386c2 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
@@ -178,7 +178,7 @@ public class CompactionTaskManagerTest extends InnerCompactionTest {
new SizeTieredCompactionTask(
"root.compactionTest", "0", 0, tsFileManager, seqResources, true, new AtomicInteger(0));
CompactionTaskManager manager = CompactionTaskManager.getInstance();
- manager.addTaskToWaitingQueue(task1);
+ Assert.assertTrue(manager.addTaskToWaitingQueue(task1));
manager.submitTaskFromTaskQueue();
while (manager.getTotalTaskCount() > 0) {
Thread.sleep(10);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionEmptyTsFileTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionEmptyTsFileTest.java
new file mode 100644
index 0000000000..fbc0a5620a
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionEmptyTsFileTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.compaction.inner;
+
+import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
+import org.apache.iotdb.db.engine.compaction.inner.sizetiered.SizeTieredCompactionTask;
+import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary;
+import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class InnerCompactionEmptyTsFileTest extends InnerCompactionTest {
+
+ File tempSGDir;
+
+ @Before
+ public void setUp() throws Exception {
+ tempSGDir = new File(TestConstant.getTestTsFileDir("root.compactionTest", 0, 0));
+ if (!tempSGDir.exists()) {
+ Assert.assertTrue(tempSGDir.mkdirs());
+ }
+ seqFileNum = 0;
+ unseqFileNum = 4;
+ super.setUp();
+ tempSGDir = new File(TestConstant.BASE_OUTPUT_PATH.concat("tempSG"));
+ tempSGDir.mkdirs();
+ tsFileManager = new TsFileManager(COMPACTION_TEST_SG, "0", tempSGDir.getAbsolutePath());
+ }
+
+ @After
+ public void tearDown() throws IOException, StorageEngineException {
+ super.tearDown();
+ FileUtils.deleteDirectory(tempSGDir);
+ }
+
+ @Override
+ void prepareFile(TsFileResource tsFileResource, long timeOffset, long ptNum, long valueOffset)
+ throws IOException, WriteProcessException {
+ // create some empty tsfiles
+ super.prepareFile(tsFileResource, 0, 0, 0);
+ }
+
+ @Test
+ public void testCompactWithPartialEmptyUnseqFiles() throws Exception {
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+
+ // Here we compact file 0-2
+ AbstractInnerSpaceCompactionTask task =
+ new SizeTieredCompactionTask(
+ "root.compactionTest",
+ "0",
+ 0,
+ tsFileManager,
+ unseqResources.subList(0, 3),
+ false,
+ new AtomicInteger(0));
+ Future<CompactionTaskSummary> future = CompactionTaskManager.getInstance().submitTask(task);
+ Assert.assertTrue(future.get().isSuccess());
+ }
+}