You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/07/15 02:20:39 UTC

[incubator-iotdb] branch dev_merge updated: do not start TImedMergeThread until StorageEngine is recovered

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev_merge
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/dev_merge by this push:
     new 83fce5c  do not start TImedMergeThread until StorageEngine is recovered
83fce5c is described below

commit 83fce5caf76db1d6f19b6c7d3af8314662507eef
Author: 江天 <jt...@163.com>
AuthorDate: Mon Jul 15 10:18:18 2019 +0800

    do not start TImedMergeThread until StorageEngine is recovered
---
 .../main/java/org/apache/iotdb/db/engine/merge/MergeManager.java   | 7 +++----
 .../java/org/apache/iotdb/db/engine/merge/RecoverMergeTask.java    | 7 ++-----
 .../apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java | 3 +--
 .../test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java   | 1 -
 4 files changed, 6 insertions(+), 12 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeManager.java
index e488b72..e188ee0 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeManager.java
@@ -42,7 +42,6 @@ public class MergeManager implements IService {
   private ScheduledExecutorService timedMergeThreadPool;
 
   private MergeManager() {
-    start();
   }
 
   public static MergeManager getINSTANCE() {
@@ -67,11 +66,11 @@ public class MergeManager implements IService {
       if (mergeInterval > 0) {
         timedMergeThreadPool = Executors.newSingleThreadScheduledExecutor( r -> new Thread(r,
             "TimedMergeThread"));
-        timedMergeThreadPool.scheduleAtFixedRate(this::flushAll, 0,
+        timedMergeThreadPool.scheduleAtFixedRate(this::flushAll, mergeInterval,
             mergeInterval, TimeUnit.SECONDS);
       }
+      logger.info("MergeManager started");
     }
-    logger.info("MergeManager started");
   }
 
   @Override
@@ -87,8 +86,8 @@ public class MergeManager implements IService {
         // wait
       }
       mergeTaskPool = null;
+      logger.info("MergeManager stopped");
     }
-    logger.info("MergeManager stopped");
   }
 
   @Override
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/RecoverMergeTask.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/RecoverMergeTask.java
index e95bf5f..1a3a452 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/RecoverMergeTask.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/RecoverMergeTask.java
@@ -53,11 +53,8 @@ public class RecoverMergeTask extends MergeTask {
   private List<Path> mergedPaths = new ArrayList<>();
   private List<TsFileResource> unmergedFiles;
 
-  public RecoverMergeTask(
-      List<TsFileResource> allSeqFiles,
-      List<TsFileResource> allUnseqFiles,
-      String storageGroupDir, MergeCallback callback, String taskName) throws IOException {
-    super(allSeqFiles, allUnseqFiles, storageGroupDir, callback, taskName);
+  public RecoverMergeTask(String storageGroupDir, MergeCallback callback, String taskName) throws IOException {
+    super(null, null, storageGroupDir, callback, taskName);
   }
 
   public void recoverMerge(boolean continueMerge) throws IOException {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 7ff8b3a..fdc5b23 100755
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -200,8 +200,7 @@ public class StorageGroupProcessor {
       if (mergingMods.exists()) {
         mergingModification = new ModificationFile(storageGroupSysDir + File.separator + MERGING_MODIFICAITON_FILE_NAME);
       }
-      RecoverMergeTask recoverMergeTask = new RecoverMergeTask(seqTsFiles, unseqTsFiles,
-          storageGroupSysDir.getPath(), this::mergeEndAction, taskName);
+      RecoverMergeTask recoverMergeTask = new RecoverMergeTask(storageGroupSysDir.getPath(), this::mergeEndAction, taskName);
       logger.info("{} a RecoverMergeTask {} starts...", storageGroupName, taskName);
       recoverMergeTask.recoverMerge(IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot());
       if (!IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot()) {
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java
index f96bb17..f7367e0 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java
@@ -32,7 +32,6 @@ import org.junit.Test;
 
 public class MergeLogTest extends MergeTaskTest {
 
-
   @Test
   public void testMergeLog() throws Exception {
     MergeTask mergeTask =