You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by er...@apache.org on 2022/04/30 09:56:11 UTC

[iotdb] 01/01: [To rel/0.13][IOTDB-3018] Fix compation bugs on handling deleted target file

This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a commit to branch bugfix/cp-iotdb-3018
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 22833be80705ef761103bd5909ce8e4836358b7e
Author: Eric Pai <er...@hotmail.com>
AuthorDate: Sat Apr 30 17:55:40 2022 +0800

    [To rel/0.13][IOTDB-3018] Fix compation bugs on handling deleted target file
---
 .../engine/compaction/CompactionTaskManager.java   | 31 ++++----
 .../task/RewriteCrossSpaceCompactionTask.java      |  2 +-
 .../inner/sizetiered/SizeTieredCompactionTask.java | 31 ++++----
 .../inner/utils/InnerSpaceCompactionUtils.java     |  7 ++
 .../compaction/task/AbstractCompactionTask.java    | 17 ++--
 .../compaction/task/CompactionTaskSummary.java     | 32 ++++++++
 .../compaction/CompactionTaskManagerTest.java      |  2 +-
 .../inner/InnerCompactionEmptyTsFileTest.java      | 90 ++++++++++++++++++++++
 8 files changed, 173 insertions(+), 39 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
index 96021ce239..280ea16c27 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.compaction.constant.CompactionTaskStatus;
 import org.apache.iotdb.db.engine.compaction.task.AbstractCompactionTask;
+import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary;
 import org.apache.iotdb.db.service.IService;
 import org.apache.iotdb.db.service.ServiceType;
 import org.apache.iotdb.db.utils.TestOnly;
@@ -68,7 +69,8 @@ public class CompactionTaskManager implements IService {
       new FixedPriorityBlockingQueue<>(1024, new CompactionTaskComparator());
   // <logicalStorageGroupName,futureSet>, it is used to terminate all compaction tasks under the
   // logicalStorageGroup
-  private Map<String, Set<Future<Void>>> storageGroupTasks = new ConcurrentHashMap<>();
+  private Map<String, Set<Future<CompactionTaskSummary>>> storageGroupTasks =
+      new ConcurrentHashMap<>();
   private List<AbstractCompactionTask> runningCompactionTaskList = new ArrayList<>();
 
   // The thread pool that periodically fetches and executes the compaction task from
@@ -139,7 +141,7 @@ public class CompactionTaskManager implements IService {
     if (taskExecutionPool != null) {
       awaitTermination(taskExecutionPool, milliseconds);
       awaitTermination(compactionTaskSubmissionThreadPool, milliseconds);
-      logger.info("Waiting for task taskExecutionPool to shut down");
+      logger.info("Waiting for task taskExecutionPool to shut down in {} ms", milliseconds);
       waitTermination();
       storageGroupTasks.clear();
     }
@@ -168,6 +170,7 @@ public class CompactionTaskManager implements IService {
         }
       }
       storageGroupTasks.clear();
+      candidateCompactionTaskQueue.clear();
       logger.info("All compaction task finish");
     }
   }
@@ -297,25 +300,25 @@ public class CompactionTaskManager implements IService {
   /**
    * This method will directly submit a task to thread pool if there is available thread.
    *
-   * @throws RejectedExecutionException
+   * @return the future of the task.
    */
-  public synchronized void submitTask(Callable<Void> compactionMergeTask)
-      throws RejectedExecutionException {
-    if (taskExecutionPool != null && !taskExecutionPool.isTerminated()) {
-      taskExecutionPool.submit(compactionMergeTask);
-      return;
+  public synchronized Future<CompactionTaskSummary> submitTask(
+      Callable<CompactionTaskSummary> compactionMergeTask) throws RejectedExecutionException {
+    if (taskExecutionPool != null && !taskExecutionPool.isShutdown()) {
+      Future<CompactionTaskSummary> future = taskExecutionPool.submit(compactionMergeTask);
+      return future;
     }
     logger.warn(
         "A CompactionTask failed to be submitted to CompactionTaskManager because {}",
         taskExecutionPool == null
             ? "taskExecutionPool is null"
             : "taskExecutionPool is terminated");
+    return null;
   }
 
   public synchronized Future<Void> submitSubTask(Callable<Void> subCompactionTask) {
-    if (subCompactionTaskExecutionPool != null && !subCompactionTaskExecutionPool.isTerminated()) {
-      Future<Void> future = subCompactionTaskExecutionPool.submit(subCompactionTask);
-      return future;
+    if (subCompactionTaskExecutionPool != null && !subCompactionTaskExecutionPool.isShutdown()) {
+      return subCompactionTaskExecutionPool.submit(subCompactionTask);
     }
     return null;
   }
@@ -325,12 +328,12 @@ public class CompactionTaskManager implements IService {
    * corresponding storage group.
    */
   public void abortCompaction(String fullStorageGroupName) {
-    Set<Future<Void>> subTasks =
+    Set<Future<CompactionTaskSummary>> subTasks =
         storageGroupTasks.getOrDefault(fullStorageGroupName, Collections.emptySet());
     candidateCompactionTaskQueue.clear();
-    Iterator<Future<Void>> subIterator = subTasks.iterator();
+    Iterator<Future<CompactionTaskSummary>> subIterator = subTasks.iterator();
     while (subIterator.hasNext()) {
-      Future<Void> next = subIterator.next();
+      Future<CompactionTaskSummary> next = subIterator.next();
       if (!next.isDone() && !next.isCancelled()) {
         next.cancel(true);
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossSpaceCompactionTask.java
index 13f233f64c..a95b8d44d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossSpaceCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossSpaceCompactionTask.java
@@ -87,7 +87,7 @@ public class RewriteCrossSpaceCompactionTask extends AbstractCrossSpaceCompactio
       executeCompaction();
     } catch (Throwable throwable) {
       // catch throwable instead of exception to handle OOM errors
-      logger.error("Meet errors in cross space compaction, {}", throwable.getMessage());
+      logger.error("Meet errors in cross space compaction", throwable);
       CompactionExceptionHandler.handleException(
           fullStorageGroupName,
           logFile,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
index 8779ead76b..8c01e06635 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
@@ -31,12 +31,14 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResourceList;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
 
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -92,6 +94,8 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
     TsFileResource targetTsFileResource =
         TsFileNameGenerator.getInnerCompactionTargetFileResource(
             selectedTsFileResourceList, sequence);
+    List<TsFileResource> targetTsFileList =
+        new ArrayList<>(Collections.singletonList(targetTsFileResource));
     LOGGER.info(
         "{} [Compaction] starting compaction task with {} files",
         fullStorageGroupName,
@@ -107,8 +111,7 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
       sizeTieredCompactionLogger = new CompactionLogger(logFile);
       sizeTieredCompactionLogger.logFiles(
           selectedTsFileResourceList, CompactionLogger.STR_SOURCE_FILES);
-      sizeTieredCompactionLogger.logFiles(
-          Collections.singletonList(targetTsFileResource), CompactionLogger.STR_TARGET_FILES);
+      sizeTieredCompactionLogger.logFiles(targetTsFileList, CompactionLogger.STR_TARGET_FILES);
       LOGGER.info("{} [SizeTiredCompactionTask] Close the logger", fullStorageGroupName);
       sizeTieredCompactionLogger.close();
       LOGGER.info(
@@ -119,9 +122,7 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
         InnerSpaceCompactionUtils.compact(targetTsFileResource, selectedTsFileResourceList);
       } else {
         CompactionUtils.compact(
-            Collections.emptyList(),
-            selectedTsFileResourceList,
-            Collections.singletonList(targetTsFileResource));
+            Collections.emptyList(), selectedTsFileResourceList, targetTsFileList);
       }
 
       InnerSpaceCompactionUtils.moveTargetFile(targetTsFileResource, fullStorageGroupName);
@@ -140,14 +141,14 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
         tsFileManager.replace(
             selectedTsFileResourceList,
             Collections.emptyList(),
-            Collections.singletonList(targetTsFileResource),
+            targetTsFileList,
             timePartition,
             true);
       } else {
         tsFileManager.replace(
             Collections.emptyList(),
             selectedTsFileResourceList,
-            Collections.singletonList(targetTsFileResource),
+            targetTsFileList,
             timePartition,
             false);
       }
@@ -164,10 +165,11 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
         isHoldingWriteLock[i] = true;
       }
 
-      if (targetTsFileResource.getTsFile().length()
-          < TSFileConfig.MAGIC_STRING.getBytes().length * 2L + Byte.BYTES) {
+      if (targetTsFileResource.getTsFile().exists()
+          && targetTsFileResource.getTsFile().length()
+              < TSFileConfig.MAGIC_STRING.getBytes().length * 2L + Byte.BYTES) {
         // the file size is smaller than magic string and version number
-        throw new RuntimeException(
+        throw new TsFileNotCompleteException(
             String.format(
                 "target file %s is smaller than magic string and version number size",
                 targetTsFileResource));
@@ -193,10 +195,6 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
         FileUtils.delete(logFile);
       }
     } catch (Throwable throwable) {
-      LOGGER.error(
-          "{} [Compaction] Throwable is caught during execution of SizeTieredCompaction, {}",
-          fullStorageGroupName,
-          throwable);
       LOGGER.warn("{} [Compaction] Start to handle exception", fullStorageGroupName);
       if (sizeTieredCompactionLogger != null) {
         sizeTieredCompactionLogger.close();
@@ -205,7 +203,7 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
         CompactionExceptionHandler.handleException(
             fullStorageGroupName,
             logFile,
-            Collections.singletonList(targetTsFileResource),
+            targetTsFileList,
             selectedTsFileResourceList,
             Collections.emptyList(),
             tsFileManager,
@@ -216,7 +214,7 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
         CompactionExceptionHandler.handleException(
             fullStorageGroupName,
             logFile,
-            Collections.singletonList(targetTsFileResource),
+            targetTsFileList,
             Collections.emptyList(),
             selectedTsFileResourceList,
             tsFileManager,
@@ -224,6 +222,7 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
             true,
             isSequence());
       }
+      throw throwable;
     } finally {
       releaseFileLocksAndResetMergingStatus();
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
index e5becdf2d2..cf402099c7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
@@ -270,6 +270,13 @@ public class InnerSpaceCompactionUtils {
    */
   public static void moveTargetFile(TsFileResource targetResource, String fullStorageGroupName)
       throws IOException {
+    if (!targetResource.getTsFile().exists()) {
+      logger.info(
+          "{} [Compaction] Tmp target tsfile {} may be deleted after compaction.",
+          fullStorageGroupName,
+          targetResource.getTsFilePath());
+      return;
+    }
     if (!targetResource.getTsFilePath().endsWith(IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX)) {
       logger.warn(
           "{} [Compaction] Tmp target tsfile {} should be end with {}",
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java
index 359e7027b0..089e888145 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java
@@ -31,11 +31,12 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * AbstractCompactionTask is the base class for all compaction task, it carries out the execution of
- * compaction. AbstractCompactionTask uses a template method, it execute the abstract function
- * <i>doCompaction</i> implemented by subclass, and decrease the currentTaskNum in
- * CompactionScheduler when the <i>doCompaction</i> finish.
+ * * compaction. AbstractCompactionTask uses a template method, it executes the abstract function *
+ * {@link AbstractCompactionTask#doCompaction()} implemented by subclass, and decrease the *
+ * currentTaskNum in CompactionScheduler when the {@link AbstractCompactionTask#doCompaction()} is *
+ * finished. The future returns the {@link CompactionTaskSummary} of this task execution.
  */
-public abstract class AbstractCompactionTask implements Callable<Void> {
+public abstract class AbstractCompactionTask implements Callable<CompactionTaskSummary> {
   private static final Logger LOGGER =
       LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
   protected String fullStorageGroupName;
@@ -60,20 +61,22 @@ public abstract class AbstractCompactionTask implements Callable<Void> {
   protected abstract void doCompaction() throws Exception;
 
   @Override
-  public Void call() throws Exception {
+  public CompactionTaskSummary call() throws Exception {
     long startTime = System.currentTimeMillis();
     currentTaskNum.incrementAndGet();
+    boolean isSuccess = false;
     try {
       doCompaction();
+      isSuccess = true;
     } catch (Exception e) {
-      LOGGER.error(e.getMessage(), e);
+      LOGGER.error("Running compaction task failed", e);
     } finally {
       CompactionTaskManager.getInstance().removeRunningTaskFromList(this);
       timeCost = System.currentTimeMillis() - startTime;
       this.currentTaskNum.decrementAndGet();
     }
 
-    return null;
+    return new CompactionTaskSummary(isSuccess);
   }
 
   public String getFullStorageGroupName() {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionTaskSummary.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionTaskSummary.java
new file mode 100644
index 0000000000..a7380969ff
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionTaskSummary.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.compaction.task;
+
+/** The summary of one {@link AbstractCompactionTask} execution */
+public class CompactionTaskSummary {
+  private final boolean success;
+
+  public CompactionTaskSummary(boolean success) {
+    this.success = success;
+  }
+
+  public boolean isSuccess() {
+    return success;
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
index 511cdb144f..fa6c4386c2 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
@@ -178,7 +178,7 @@ public class CompactionTaskManagerTest extends InnerCompactionTest {
         new SizeTieredCompactionTask(
             "root.compactionTest", "0", 0, tsFileManager, seqResources, true, new AtomicInteger(0));
     CompactionTaskManager manager = CompactionTaskManager.getInstance();
-    manager.addTaskToWaitingQueue(task1);
+    Assert.assertTrue(manager.addTaskToWaitingQueue(task1));
     manager.submitTaskFromTaskQueue();
     while (manager.getTotalTaskCount() > 0) {
       Thread.sleep(10);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionEmptyTsFileTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionEmptyTsFileTest.java
new file mode 100644
index 0000000000..fbc0a5620a
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionEmptyTsFileTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.compaction.inner;
+
+import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
+import org.apache.iotdb.db.engine.compaction.inner.sizetiered.SizeTieredCompactionTask;
+import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary;
+import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class InnerCompactionEmptyTsFileTest extends InnerCompactionTest {
+
+  File tempSGDir;
+
+  @Before
+  public void setUp() throws Exception {
+    tempSGDir = new File(TestConstant.getTestTsFileDir("root.compactionTest", 0, 0));
+    if (!tempSGDir.exists()) {
+      Assert.assertTrue(tempSGDir.mkdirs());
+    }
+    seqFileNum = 0;
+    unseqFileNum = 4;
+    super.setUp();
+    tempSGDir = new File(TestConstant.BASE_OUTPUT_PATH.concat("tempSG"));
+    tempSGDir.mkdirs();
+    tsFileManager = new TsFileManager(COMPACTION_TEST_SG, "0", tempSGDir.getAbsolutePath());
+  }
+
+  @After
+  public void tearDown() throws IOException, StorageEngineException {
+    super.tearDown();
+    FileUtils.deleteDirectory(tempSGDir);
+  }
+
+  @Override
+  void prepareFile(TsFileResource tsFileResource, long timeOffset, long ptNum, long valueOffset)
+      throws IOException, WriteProcessException {
+    // create some empty tsfiles
+    super.prepareFile(tsFileResource, 0, 0, 0);
+  }
+
+  @Test
+  public void testCompactWithPartialEmptyUnseqFiles() throws Exception {
+    tsFileManager.addAll(seqResources, true);
+    tsFileManager.addAll(unseqResources, false);
+
+    // Here we compact file 0-2
+    AbstractInnerSpaceCompactionTask task =
+        new SizeTieredCompactionTask(
+            "root.compactionTest",
+            "0",
+            0,
+            tsFileManager,
+            unseqResources.subList(0, 3),
+            false,
+            new AtomicInteger(0));
+    Future<CompactionTaskSummary> future = CompactionTaskManager.getInstance().submitTask(task);
+    Assert.assertTrue(future.get().isSuccess());
+  }
+}