You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by er...@apache.org on 2022/04/30 09:56:10 UTC

[iotdb] branch bugfix/cp-iotdb-3018 created (now 22833be807)

This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a change to branch bugfix/cp-iotdb-3018
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 22833be807 [To rel/0.13][IOTDB-3018] Fix compation bugs on handling deleted target file

This branch includes the following new commits:

     new 22833be807 [To rel/0.13][IOTDB-3018] Fix compation bugs on handling deleted target file

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: [To rel/0.13][IOTDB-3018] Fix compation bugs on handling deleted target file

Posted by er...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a commit to branch bugfix/cp-iotdb-3018
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 22833be80705ef761103bd5909ce8e4836358b7e
Author: Eric Pai <er...@hotmail.com>
AuthorDate: Sat Apr 30 17:55:40 2022 +0800

    [To rel/0.13][IOTDB-3018] Fix compation bugs on handling deleted target file
---
 .../engine/compaction/CompactionTaskManager.java   | 31 ++++----
 .../task/RewriteCrossSpaceCompactionTask.java      |  2 +-
 .../inner/sizetiered/SizeTieredCompactionTask.java | 31 ++++----
 .../inner/utils/InnerSpaceCompactionUtils.java     |  7 ++
 .../compaction/task/AbstractCompactionTask.java    | 17 ++--
 .../compaction/task/CompactionTaskSummary.java     | 32 ++++++++
 .../compaction/CompactionTaskManagerTest.java      |  2 +-
 .../inner/InnerCompactionEmptyTsFileTest.java      | 90 ++++++++++++++++++++++
 8 files changed, 173 insertions(+), 39 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
index 96021ce239..280ea16c27 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.compaction.constant.CompactionTaskStatus;
 import org.apache.iotdb.db.engine.compaction.task.AbstractCompactionTask;
+import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary;
 import org.apache.iotdb.db.service.IService;
 import org.apache.iotdb.db.service.ServiceType;
 import org.apache.iotdb.db.utils.TestOnly;
@@ -68,7 +69,8 @@ public class CompactionTaskManager implements IService {
       new FixedPriorityBlockingQueue<>(1024, new CompactionTaskComparator());
   // <logicalStorageGroupName,futureSet>, it is used to terminate all compaction tasks under the
   // logicalStorageGroup
-  private Map<String, Set<Future<Void>>> storageGroupTasks = new ConcurrentHashMap<>();
+  private Map<String, Set<Future<CompactionTaskSummary>>> storageGroupTasks =
+      new ConcurrentHashMap<>();
   private List<AbstractCompactionTask> runningCompactionTaskList = new ArrayList<>();
 
   // The thread pool that periodically fetches and executes the compaction task from
@@ -139,7 +141,7 @@ public class CompactionTaskManager implements IService {
     if (taskExecutionPool != null) {
       awaitTermination(taskExecutionPool, milliseconds);
       awaitTermination(compactionTaskSubmissionThreadPool, milliseconds);
-      logger.info("Waiting for task taskExecutionPool to shut down");
+      logger.info("Waiting for task taskExecutionPool to shut down in {} ms", milliseconds);
       waitTermination();
       storageGroupTasks.clear();
     }
@@ -168,6 +170,7 @@ public class CompactionTaskManager implements IService {
         }
       }
       storageGroupTasks.clear();
+      candidateCompactionTaskQueue.clear();
       logger.info("All compaction task finish");
     }
   }
@@ -297,25 +300,25 @@ public class CompactionTaskManager implements IService {
   /**
    * This method will directly submit a task to thread pool if there is available thread.
    *
-   * @throws RejectedExecutionException
+   * @return the future of the task.
    */
-  public synchronized void submitTask(Callable<Void> compactionMergeTask)
-      throws RejectedExecutionException {
-    if (taskExecutionPool != null && !taskExecutionPool.isTerminated()) {
-      taskExecutionPool.submit(compactionMergeTask);
-      return;
+  public synchronized Future<CompactionTaskSummary> submitTask(
+      Callable<CompactionTaskSummary> compactionMergeTask) throws RejectedExecutionException {
+    if (taskExecutionPool != null && !taskExecutionPool.isShutdown()) {
+      Future<CompactionTaskSummary> future = taskExecutionPool.submit(compactionMergeTask);
+      return future;
     }
     logger.warn(
         "A CompactionTask failed to be submitted to CompactionTaskManager because {}",
         taskExecutionPool == null
             ? "taskExecutionPool is null"
             : "taskExecutionPool is terminated");
+    return null;
   }
 
   public synchronized Future<Void> submitSubTask(Callable<Void> subCompactionTask) {
-    if (subCompactionTaskExecutionPool != null && !subCompactionTaskExecutionPool.isTerminated()) {
-      Future<Void> future = subCompactionTaskExecutionPool.submit(subCompactionTask);
-      return future;
+    if (subCompactionTaskExecutionPool != null && !subCompactionTaskExecutionPool.isShutdown()) {
+      return subCompactionTaskExecutionPool.submit(subCompactionTask);
     }
     return null;
   }
@@ -325,12 +328,12 @@ public class CompactionTaskManager implements IService {
    * corresponding storage group.
    */
   public void abortCompaction(String fullStorageGroupName) {
-    Set<Future<Void>> subTasks =
+    Set<Future<CompactionTaskSummary>> subTasks =
         storageGroupTasks.getOrDefault(fullStorageGroupName, Collections.emptySet());
     candidateCompactionTaskQueue.clear();
-    Iterator<Future<Void>> subIterator = subTasks.iterator();
+    Iterator<Future<CompactionTaskSummary>> subIterator = subTasks.iterator();
     while (subIterator.hasNext()) {
-      Future<Void> next = subIterator.next();
+      Future<CompactionTaskSummary> next = subIterator.next();
       if (!next.isDone() && !next.isCancelled()) {
         next.cancel(true);
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossSpaceCompactionTask.java
index 13f233f64c..a95b8d44d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossSpaceCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossSpaceCompactionTask.java
@@ -87,7 +87,7 @@ public class RewriteCrossSpaceCompactionTask extends AbstractCrossSpaceCompactio
       executeCompaction();
     } catch (Throwable throwable) {
       // catch throwable instead of exception to handle OOM errors
-      logger.error("Meet errors in cross space compaction, {}", throwable.getMessage());
+      logger.error("Meet errors in cross space compaction", throwable);
       CompactionExceptionHandler.handleException(
           fullStorageGroupName,
           logFile,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
index 8779ead76b..8c01e06635 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
@@ -31,12 +31,14 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResourceList;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
 
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -92,6 +94,8 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
     TsFileResource targetTsFileResource =
         TsFileNameGenerator.getInnerCompactionTargetFileResource(
             selectedTsFileResourceList, sequence);
+    List<TsFileResource> targetTsFileList =
+        new ArrayList<>(Collections.singletonList(targetTsFileResource));
     LOGGER.info(
         "{} [Compaction] starting compaction task with {} files",
         fullStorageGroupName,
@@ -107,8 +111,7 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
       sizeTieredCompactionLogger = new CompactionLogger(logFile);
       sizeTieredCompactionLogger.logFiles(
           selectedTsFileResourceList, CompactionLogger.STR_SOURCE_FILES);
-      sizeTieredCompactionLogger.logFiles(
-          Collections.singletonList(targetTsFileResource), CompactionLogger.STR_TARGET_FILES);
+      sizeTieredCompactionLogger.logFiles(targetTsFileList, CompactionLogger.STR_TARGET_FILES);
       LOGGER.info("{} [SizeTiredCompactionTask] Close the logger", fullStorageGroupName);
       sizeTieredCompactionLogger.close();
       LOGGER.info(
@@ -119,9 +122,7 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
         InnerSpaceCompactionUtils.compact(targetTsFileResource, selectedTsFileResourceList);
       } else {
         CompactionUtils.compact(
-            Collections.emptyList(),
-            selectedTsFileResourceList,
-            Collections.singletonList(targetTsFileResource));
+            Collections.emptyList(), selectedTsFileResourceList, targetTsFileList);
       }
 
       InnerSpaceCompactionUtils.moveTargetFile(targetTsFileResource, fullStorageGroupName);
@@ -140,14 +141,14 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
         tsFileManager.replace(
             selectedTsFileResourceList,
             Collections.emptyList(),
-            Collections.singletonList(targetTsFileResource),
+            targetTsFileList,
             timePartition,
             true);
       } else {
         tsFileManager.replace(
             Collections.emptyList(),
             selectedTsFileResourceList,
-            Collections.singletonList(targetTsFileResource),
+            targetTsFileList,
             timePartition,
             false);
       }
@@ -164,10 +165,11 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
         isHoldingWriteLock[i] = true;
       }
 
-      if (targetTsFileResource.getTsFile().length()
-          < TSFileConfig.MAGIC_STRING.getBytes().length * 2L + Byte.BYTES) {
+      if (targetTsFileResource.getTsFile().exists()
+          && targetTsFileResource.getTsFile().length()
+              < TSFileConfig.MAGIC_STRING.getBytes().length * 2L + Byte.BYTES) {
         // the file size is smaller than magic string and version number
-        throw new RuntimeException(
+        throw new TsFileNotCompleteException(
             String.format(
                 "target file %s is smaller than magic string and version number size",
                 targetTsFileResource));
@@ -193,10 +195,6 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
         FileUtils.delete(logFile);
       }
     } catch (Throwable throwable) {
-      LOGGER.error(
-          "{} [Compaction] Throwable is caught during execution of SizeTieredCompaction, {}",
-          fullStorageGroupName,
-          throwable);
       LOGGER.warn("{} [Compaction] Start to handle exception", fullStorageGroupName);
       if (sizeTieredCompactionLogger != null) {
         sizeTieredCompactionLogger.close();
@@ -205,7 +203,7 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
         CompactionExceptionHandler.handleException(
             fullStorageGroupName,
             logFile,
-            Collections.singletonList(targetTsFileResource),
+            targetTsFileList,
             selectedTsFileResourceList,
             Collections.emptyList(),
             tsFileManager,
@@ -216,7 +214,7 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
         CompactionExceptionHandler.handleException(
             fullStorageGroupName,
             logFile,
-            Collections.singletonList(targetTsFileResource),
+            targetTsFileList,
             Collections.emptyList(),
             selectedTsFileResourceList,
             tsFileManager,
@@ -224,6 +222,7 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
             true,
             isSequence());
       }
+      throw throwable;
     } finally {
       releaseFileLocksAndResetMergingStatus();
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
index e5becdf2d2..cf402099c7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
@@ -270,6 +270,13 @@ public class InnerSpaceCompactionUtils {
    */
   public static void moveTargetFile(TsFileResource targetResource, String fullStorageGroupName)
       throws IOException {
+    if (!targetResource.getTsFile().exists()) {
+      logger.info(
+          "{} [Compaction] Tmp target tsfile {} may be deleted after compaction.",
+          fullStorageGroupName,
+          targetResource.getTsFilePath());
+      return;
+    }
     if (!targetResource.getTsFilePath().endsWith(IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX)) {
       logger.warn(
           "{} [Compaction] Tmp target tsfile {} should be end with {}",
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java
index 359e7027b0..089e888145 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java
@@ -31,11 +31,12 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * AbstractCompactionTask is the base class for all compaction task, it carries out the execution of
- * compaction. AbstractCompactionTask uses a template method, it execute the abstract function
- * <i>doCompaction</i> implemented by subclass, and decrease the currentTaskNum in
- * CompactionScheduler when the <i>doCompaction</i> finish.
+ * * compaction. AbstractCompactionTask uses a template method, it executes the abstract function *
+ * {@link AbstractCompactionTask#doCompaction()} implemented by subclass, and decrease the *
+ * currentTaskNum in CompactionScheduler when the {@link AbstractCompactionTask#doCompaction()} is *
+ * finished. The future returns the {@link CompactionTaskSummary} of this task execution.
  */
-public abstract class AbstractCompactionTask implements Callable<Void> {
+public abstract class AbstractCompactionTask implements Callable<CompactionTaskSummary> {
   private static final Logger LOGGER =
       LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
   protected String fullStorageGroupName;
@@ -60,20 +61,22 @@ public abstract class AbstractCompactionTask implements Callable<Void> {
   protected abstract void doCompaction() throws Exception;
 
   @Override
-  public Void call() throws Exception {
+  public CompactionTaskSummary call() throws Exception {
     long startTime = System.currentTimeMillis();
     currentTaskNum.incrementAndGet();
+    boolean isSuccess = false;
     try {
       doCompaction();
+      isSuccess = true;
     } catch (Exception e) {
-      LOGGER.error(e.getMessage(), e);
+      LOGGER.error("Running compaction task failed", e);
     } finally {
       CompactionTaskManager.getInstance().removeRunningTaskFromList(this);
       timeCost = System.currentTimeMillis() - startTime;
       this.currentTaskNum.decrementAndGet();
     }
 
-    return null;
+    return new CompactionTaskSummary(isSuccess);
   }
 
   public String getFullStorageGroupName() {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionTaskSummary.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionTaskSummary.java
new file mode 100644
index 0000000000..a7380969ff
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionTaskSummary.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.compaction.task;
+
+/** The summary of one {@link AbstractCompactionTask} execution */
+public class CompactionTaskSummary {
+  private final boolean success;
+
+  public CompactionTaskSummary(boolean success) {
+    this.success = success;
+  }
+
+  public boolean isSuccess() {
+    return success;
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
index 511cdb144f..fa6c4386c2 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
@@ -178,7 +178,7 @@ public class CompactionTaskManagerTest extends InnerCompactionTest {
         new SizeTieredCompactionTask(
             "root.compactionTest", "0", 0, tsFileManager, seqResources, true, new AtomicInteger(0));
     CompactionTaskManager manager = CompactionTaskManager.getInstance();
-    manager.addTaskToWaitingQueue(task1);
+    Assert.assertTrue(manager.addTaskToWaitingQueue(task1));
     manager.submitTaskFromTaskQueue();
     while (manager.getTotalTaskCount() > 0) {
       Thread.sleep(10);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionEmptyTsFileTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionEmptyTsFileTest.java
new file mode 100644
index 0000000000..fbc0a5620a
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionEmptyTsFileTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.compaction.inner;
+
+import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
+import org.apache.iotdb.db.engine.compaction.inner.sizetiered.SizeTieredCompactionTask;
+import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary;
+import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class InnerCompactionEmptyTsFileTest extends InnerCompactionTest {
+
+  File tempSGDir;
+
+  @Before
+  public void setUp() throws Exception {
+    tempSGDir = new File(TestConstant.getTestTsFileDir("root.compactionTest", 0, 0));
+    if (!tempSGDir.exists()) {
+      Assert.assertTrue(tempSGDir.mkdirs());
+    }
+    seqFileNum = 0;
+    unseqFileNum = 4;
+    super.setUp();
+    tempSGDir = new File(TestConstant.BASE_OUTPUT_PATH.concat("tempSG"));
+    tempSGDir.mkdirs();
+    tsFileManager = new TsFileManager(COMPACTION_TEST_SG, "0", tempSGDir.getAbsolutePath());
+  }
+
+  @After
+  public void tearDown() throws IOException, StorageEngineException {
+    super.tearDown();
+    FileUtils.deleteDirectory(tempSGDir);
+  }
+
+  @Override
+  void prepareFile(TsFileResource tsFileResource, long timeOffset, long ptNum, long valueOffset)
+      throws IOException, WriteProcessException {
+    // create some empty tsfiles
+    super.prepareFile(tsFileResource, 0, 0, 0);
+  }
+
+  @Test
+  public void testCompactWithPartialEmptyUnseqFiles() throws Exception {
+    tsFileManager.addAll(seqResources, true);
+    tsFileManager.addAll(unseqResources, false);
+
+    // Here we compact file 0-2
+    AbstractInnerSpaceCompactionTask task =
+        new SizeTieredCompactionTask(
+            "root.compactionTest",
+            "0",
+            0,
+            tsFileManager,
+            unseqResources.subList(0, 3),
+            false,
+            new AtomicInteger(0));
+    Future<CompactionTaskSummary> future = CompactionTaskManager.getInstance().submitTask(task);
+    Assert.assertTrue(future.get().isSuccess());
+  }
+}