You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by er...@apache.org on 2022/04/27 07:49:38 UTC

[iotdb] branch bugfix/iotdb-3018 created (now f33dee2b1f)

This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a change to branch bugfix/iotdb-3018
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at f33dee2b1f [IOTDB-3018] Fix compation bugs on handling deleted target file and service halt

This branch includes the following new commits:

     new f33dee2b1f [IOTDB-3018] Fix compation bugs on handling deleted target file and service halt

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: [IOTDB-3018] Fix compation bugs on handling deleted target file and service halt

Posted by er...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a commit to branch bugfix/iotdb-3018
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f33dee2b1f6e6e76171e3832c1914c33231c0b0e
Author: ericpai <er...@hotmail.com>
AuthorDate: Wed Apr 27 15:49:24 2022 +0800

    [IOTDB-3018] Fix compation bugs on handling deleted target file and service halt
---
 .../db/engine/compaction/CompactionTaskManager.java      | 16 ++++++++--------
 .../compaction/cross/CrossSpaceCompactionTask.java       |  2 +-
 .../compaction/inner/InnerSpaceCompactionTask.java       | 14 +++++++++-----
 .../engine/compaction/task/AbstractCompactionTask.java   |  2 --
 .../iotdb/db/engine/storagegroup/TsFileManager.java      |  6 ++++++
 5 files changed, 24 insertions(+), 16 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
index 2c159d8331..b94f61d06b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
@@ -124,7 +124,7 @@ public class CompactionTaskManager implements IService {
   }
 
   @Override
-  public synchronized void stop() {
+  public void stop() {
     if (taskExecutionPool != null) {
       taskExecutionPool.shutdownNow();
       compactionTaskSubmissionThreadPool.shutdownNow();
@@ -136,18 +136,18 @@ public class CompactionTaskManager implements IService {
   }
 
   @Override
-  public synchronized void waitAndStop(long milliseconds) {
+  public void waitAndStop(long milliseconds) {
     if (taskExecutionPool != null) {
       awaitTermination(taskExecutionPool, milliseconds);
       awaitTermination(compactionTaskSubmissionThreadPool, milliseconds);
-      logger.info("Waiting for task taskExecutionPool to shut down");
+      logger.info("Waiting for task taskExecutionPool to shut down in {} ms", milliseconds);
       waitTermination();
       storageGroupTasks.clear();
     }
   }
 
   @TestOnly
-  public synchronized void waitAllCompactionFinish() {
+  public void waitAllCompactionFinish() {
     long sleepingStartTime = 0;
     if (taskExecutionPool != null) {
       while (taskExecutionPool.getActiveCount() > 0 || taskExecutionPool.getQueue().size() > 0) {
@@ -172,7 +172,7 @@ public class CompactionTaskManager implements IService {
     }
   }
 
-  private synchronized void waitTermination() {
+  private void waitTermination() {
     long startTime = System.currentTimeMillis();
     while (!taskExecutionPool.isTerminated()) {
       int timeMillis = 0;
@@ -192,7 +192,7 @@ public class CompactionTaskManager implements IService {
     logger.info("CompactionManager stopped");
   }
 
-  private synchronized void awaitTermination(ExecutorService service, long milliseconds) {
+  private void awaitTermination(ExecutorService service, long milliseconds) {
     try {
       service.shutdown();
       service.awaitTermination(milliseconds, TimeUnit.MILLISECONDS);
@@ -305,7 +305,7 @@ public class CompactionTaskManager implements IService {
    */
   public synchronized void submitTask(AbstractCompactionTask compactionTask)
       throws RejectedExecutionException {
-    if (taskExecutionPool != null && !taskExecutionPool.isTerminated()) {
+    if (taskExecutionPool != null && !taskExecutionPool.isShutdown()) {
       Future<Void> future = taskExecutionPool.submit(compactionTask);
       storageGroupTasks
           .computeIfAbsent(compactionTask.getFullStorageGroupName(), x -> new HashMap<>())
@@ -320,7 +320,7 @@ public class CompactionTaskManager implements IService {
   }
 
   public synchronized Future<Void> submitSubTask(Callable<Void> subCompactionTask) {
-    if (subCompactionTaskExecutionPool != null && !subCompactionTaskExecutionPool.isTerminated()) {
+    if (subCompactionTaskExecutionPool != null && !subCompactionTaskExecutionPool.isShutdown()) {
       Future<Void> future = subCompactionTaskExecutionPool.submit(subCompactionTask);
       return future;
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java
index 8f45039fb4..273abb8099 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java
@@ -149,7 +149,7 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask {
       }
     } catch (Throwable throwable) {
       // catch throwable instead of exception to handle OOM errors
-      LOGGER.error("Meet errors in cross space compaction, {}", throwable.getMessage());
+      LOGGER.error("Meet errors in cross space compaction.", throwable);
       CompactionExceptionHandler.handleException(
           fullStorageGroupName,
           logFile,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java
index a7b5acaaa3..3b4c9381a4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -122,7 +123,9 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask {
 
       // carry out the compaction
       performer.setSourceFiles(selectedTsFileResourceList);
-      performer.setTargetFiles(Collections.singletonList(targetTsFileResource));
+      // As elements in targetFiles may be removed in ReadPointCompactionPerformer, we should use a
+      // mutable list instead of Collections.singletonList()
+      performer.setTargetFiles(new ArrayList<>(Collections.singletonList(targetTsFileResource)));
       performer.perform();
 
       CompactionUtils.moveTargetFile(
@@ -166,8 +169,9 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask {
         isHoldingWriteLock[i] = true;
       }
 
-      if (targetTsFileResource.getTsFile().length()
-          < TSFileConfig.MAGIC_STRING.getBytes().length * 2L + Byte.BYTES) {
+      if (targetTsFileResource.getTsFile().exists()
+          && targetTsFileResource.getTsFile().length()
+              < TSFileConfig.MAGIC_STRING.getBytes().length * 2L + Byte.BYTES) {
         // the file size is smaller than magic string and version number
         throw new RuntimeException(
             String.format(
@@ -195,9 +199,9 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask {
       }
     } catch (Throwable throwable) {
       LOGGER.error(
-          "{} [Compaction] Throwable is caught during execution of SizeTieredCompaction, {}",
+          "{} [Compaction] Throwable is caught during execution of SizeTieredCompaction.",
           fullStorageGroupName,
-          throwable.getMessage());
+          throwable);
       LOGGER.warn("{} [Compaction] Start to handle exception", fullStorageGroupName);
       if (throwable instanceof InterruptedException) {
         Thread.currentThread().interrupt();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java
index 3fbf180eb2..c25bd91ae1 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java
@@ -72,7 +72,6 @@ public abstract class AbstractCompactionTask implements Callable<Void> {
       doCompaction();
     } catch (InterruptedException e) {
       LOGGER.warn("Current task is interrupted");
-      Thread.interrupted();
     } catch (Exception e) {
       LOGGER.error(e.getMessage(), e);
     } finally {
@@ -81,7 +80,6 @@ public abstract class AbstractCompactionTask implements Callable<Void> {
       timeCost = System.currentTimeMillis() - startTime;
       finished = true;
     }
-
     return null;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
index e363b6765f..7bea37b36a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
@@ -222,12 +222,18 @@ public class TsFileManager {
       if (isTargetSequence) {
         // seq inner space compaction or cross space compaction
         for (TsFileResource resource : targetFileResources) {
+          if (!resource.getTsFile().exists()) {
+            continue;
+          }
           TsFileResourceManager.getInstance().registerSealedTsFileResource(resource);
           sequenceFiles.get(timePartition).keepOrderInsert(resource);
         }
       } else {
         // unseq inner space compaction
         for (TsFileResource resource : targetFileResources) {
+          if (!resource.getTsFile().exists()) {
+            continue;
+          }
           TsFileResourceManager.getInstance().registerSealedTsFileResource(resource);
           unsequenceFiles.get(timePartition).keepOrderInsert(resource);
         }