You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/07/11 13:39:53 UTC

[incubator-iotdb] branch dev_merge updated: add merge recovery in system reboot

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev_merge
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/dev_merge by this push:
     new 23b6ad8  add merge recovery in system reboot
23b6ad8 is described below

commit 23b6ad899e7cfa3489b4bb476987a9b5b8dbe81c
Author: 江天 <jt...@163.com>
AuthorDate: Thu Jul 11 21:37:35 2019 +0800

    add merge recovery in system reboot
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  10 ++
 .../iotdb/db/engine/merge/MergeFileSelector.java   |   5 +-
 .../apache/iotdb/db/engine/merge/MergeTask.java    |  50 +++++++-
 .../iotdb/db/engine/merge/RecoverMergeTask.java    |  13 +-
 .../engine/storagegroup/StorageGroupProcessor.java | 134 +++++++++++++++------
 .../db/engine/storagegroup/TsFileResource.java     |  11 +-
 .../org/apache/iotdb/db/service/TSServiceImpl.java |   1 +
 7 files changed, 181 insertions(+), 43 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index badd42e..680c1e3 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -184,6 +184,8 @@ public class IoTDBConfig {
 
   private int mergeThreadNum = 2;
 
+  private boolean mergeAfterReboot = true;
+
   public IoTDBConfig() {
     // empty constructor
   }
@@ -507,4 +509,12 @@ public class IoTDBConfig {
   public void setMergeThreadNum(int mergeThreadNum) {
     this.mergeThreadNum = mergeThreadNum;
   }
+
+  public boolean isMergeAfterReboot() {
+    return mergeAfterReboot;
+  }
+
+  public void setMergeAfterReboot(boolean mergeAfterReboot) {
+    this.mergeAfterReboot = mergeAfterReboot;
+  }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeFileSelector.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeFileSelector.java
index 95d6131..c94eea9 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeFileSelector.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeFileSelector.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.stream.Collectors;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.MergeException;
 import org.apache.iotdb.db.exception.MetadataErrorException;
@@ -62,8 +63,8 @@ public class MergeFileSelector {
   public MergeFileSelector(
       List<TsFileResource> seqFiles,
       List<TsFileResource> unseqFiles, long memoryBudget) {
-    this.seqFiles = seqFiles;
-    this.unseqFiles = unseqFiles;
+    this.seqFiles = seqFiles.stream().filter(TsFileResource::isClosed).collect(Collectors.toList());
+    this.unseqFiles = unseqFiles.stream().filter(TsFileResource::isClosed).collect(Collectors.toList());
     this.memoryBudget = memoryBudget;
   }
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeTask.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeTask.java
index 1c5366e..39308a9 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeTask.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeTask.java
@@ -26,6 +26,7 @@ import static org.apache.iotdb.db.utils.QueryUtils.modifyChunkMetaData;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -101,17 +102,29 @@ public class MergeTask implements Callable<Void> {
 
   protected MergeCallback callback;
 
+  protected String taskName;
+
   public MergeTask(List<TsFileResource> seqFiles,
-      List<TsFileResource> unseqFiles, String storageGroupDir, MergeCallback callback) throws IOException {
+      List<TsFileResource> unseqFiles, String storageGroupDir, MergeCallback callback,
+      String taskName) throws IOException {
     this.seqFiles = seqFiles;
     this.unseqFiles = unseqFiles;
     this.storageGroupDir = storageGroupDir;
     this.callback = callback;
+    this.taskName = taskName;
   }
 
   @Override
   public Void call() throws Exception {
-    doMerge();
+    try  {
+      doMerge();
+    } catch (Exception e) {
+      logger.error("Runtime exception in merge {}", taskName, e);
+      seqFiles = Collections.emptyList();
+      unseqFiles = Collections.emptyList();
+      cleanUp(true);
+      throw e;
+    }
     return null;
   }
 
@@ -131,17 +144,34 @@ public class MergeTask implements Callable<Void> {
   protected void mergeFiles(List<TsFileResource> unmergedFiles) throws IOException {
     // decide whether to write the unmerged chunks to the merge files or to move the merged data
     // back to the origin seqFile's
+    if (logger.isInfoEnabled()) {
+      logger.info("{} starts to merge {} files", taskName, unmergedFiles.size());
+    }
+    int cnt = 0;
     for (TsFileResource seqFile : unmergedFiles) {
       int mergedChunkNum = mergedChunkCnt.getOrDefault(seqFile, 0);
       int unmergedChunkNum = unmergedChunkCnt.getOrDefault(seqFile, 0);
       if (mergedChunkNum >= unmergedChunkNum) {
         // move the unmerged data to the new file
+        if (logger.isDebugEnabled()) {
+          logger.debug("{} moving unmerged data of {} to the merged file", taskName,
+              seqFile.getFile().getName());
+        }
         moveUnmergedToNew(seqFile);
       } else {
         // move the merged data to the old file
+        if (logger.isDebugEnabled()) {
+          logger.debug("{} moving merged data of {} to the old file", taskName,
+              seqFile.getFile().getName());
+        }
         moveMergedToOld(seqFile);
       }
+      cnt ++;
+      if (logger.isInfoEnabled()) {
+        logger.debug("{} has merged {}/{} files", taskName, cnt, unmergedFiles.size());
+      }
     }
+    logger.info("{} has merged all files", taskName);
     mergeLogger.logMergeEnd();
   }
 
@@ -152,20 +182,35 @@ public class MergeTask implements Callable<Void> {
   }
 
   protected void mergeSeries(List<Path> unmergedSeries) throws IOException {
+    if (logger.isInfoEnabled()) {
+      logger.info("{} starts to merge {} series", taskName, unmergedSeries.size());
+    }
     for (TsFileResource seqFile : seqFiles) {
       unmergedChunkStartTimes.put(seqFile, new HashMap<>());
     }
     // merge each series and write data into each seqFile's temp merge file
+    int mergedCnt = 0;
+    double progress = 0.0;
     for (Path path : unmergedSeries) {
       mergeLogger.logTSStart(path);
       mergeOnePath(path);
       mergeLogger.logTSEnd(path);
+      mergedCnt ++;
+      if (logger.isDebugEnabled()) {
+        double newProgress = 100 * mergedCnt / (double) (unmergedSeries.size());
+        if (newProgress - progress >= 0.01) {
+          progress = newProgress;
+          logger.debug("{} has merged {}% series", taskName, progress);
+        }
+      }
     }
+    logger.info("{} all series are merged", taskName);
     mergeLogger.logAllTsEnd();
   }
 
 
   protected void cleanUp(boolean executeCallback) throws IOException {
+    logger.info("{} is cleaning up", taskName);
     for (TsFileSequenceReader sequenceReader : fileReaderCache.values()) {
       sequenceReader.close();
     }
@@ -289,6 +334,7 @@ public class MergeTask implements Callable<Void> {
 
     seqFile.getMergeQueryLock().writeLock().lock();
     try {
+      seqFile.getFile().delete();
       FileUtils.moveFile(fileWriter.getFile(), seqFile.getFile());
     } finally {
       seqFile.getMergeQueryLock().writeLock().unlock();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/RecoverMergeTask.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/RecoverMergeTask.java
index 740e854..a4b9c09 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/RecoverMergeTask.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/RecoverMergeTask.java
@@ -36,9 +36,13 @@ import org.apache.iotdb.db.exception.MetadataErrorException;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class RecoverMergeTask extends MergeTask {
 
+  private static final Logger logger = LoggerFactory.getLogger(RecoverMergeTask.class);
+
   private String currLine;
   private List<TsFileResource> mergeSeqFiles = new ArrayList<>();
   private List<TsFileResource> mergeUnseqFiles = new ArrayList<>();
@@ -53,19 +57,21 @@ public class RecoverMergeTask extends MergeTask {
   public RecoverMergeTask(
       List<TsFileResource> allSeqFiles,
       List<TsFileResource> allUnseqFiles,
-      String storageGroupDir, MergeCallback callback) throws IOException {
-    super(allSeqFiles, allUnseqFiles, storageGroupDir, callback);
+      String storageGroupDir, MergeCallback callback, String taskName) throws IOException {
+    super(allSeqFiles, allUnseqFiles, storageGroupDir, callback, taskName);
   }
 
   public void recoverMerge(boolean continueMerge) throws IOException, MetadataErrorException {
     File logFile = new File(storageGroupDir, MergeLogger.MERGE_LOG_NAME);
     if (!logFile.exists()) {
+      logger.info("{} no merge.log, merge recovery ends", taskName);
       return;
     }
     mergeSeqFiles = new ArrayList<>();
     mergeUnseqFiles = new ArrayList<>();
 
     Status status = determineStatus(logFile);
+    logger.info("{} merge recovery status determined: {}", taskName, status);
     switch (status) {
       case NONE:
         logFile.delete();
@@ -98,12 +104,14 @@ public class RecoverMergeTask extends MergeTask {
         cleanUp(continueMerge);
         break;
     }
+    logger.info("{} merge recovery ends", taskName);
   }
 
 
   // scan metadata to compute how many chunks are merged/unmerged so at last we can decide to
   // move the merged chunks or the unmerged chunks
   private void recoverChunkCounts() throws IOException {
+    logger.debug("{} recovering chunk counts", taskName);
     for (TsFileResource tsFileResource : seqFiles) {
       RestorableTsFileIOWriter mergeFileWriter = getMergeFileWriter(tsFileResource);
       mergeFileWriter.makeMetadataVisible();
@@ -150,6 +158,7 @@ public class RecoverMergeTask extends MergeTask {
   }
 
   private void truncateFiles() throws IOException {
+    logger.debug("{} truncating {} files", taskName, fileLastPositions.size());
     for (Entry<File, Long> entry : fileLastPositions.entrySet()) {
       File file = entry.getKey();
       Long lastPosition = entry.getValue();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index e3876c3..f805feb 100755
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iotdb.db.engine.storagegroup;
 
+import static org.apache.iotdb.db.engine.merge.MergeTask.MERGE_SUFFIX;
+import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
 import static org.apache.iotdb.tsfile.common.constant.SystemConstant.TSFILE_SUFFIX;
 
 import java.io.File;
@@ -39,6 +41,7 @@ import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.merge.MergeFileSelector;
 import org.apache.iotdb.db.engine.merge.MergeTask;
 import org.apache.iotdb.db.engine.merge.MergeManager;
+import org.apache.iotdb.db.engine.merge.RecoverMergeTask;
 import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
@@ -48,6 +51,7 @@ import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
 import org.apache.iotdb.db.exception.MergeException;
+import org.apache.iotdb.db.exception.MetadataErrorException;
 import org.apache.iotdb.db.exception.StorageGroupProcessorException;
 import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.exception.TsFileProcessorException;
@@ -184,13 +188,23 @@ public class StorageGroupProcessor {
   private void recover() throws ProcessorException {
     logger.info("recover Storage Group  {}", storageGroupName);
 
-    // collect TsFiles from sequential data directory
-    List<File> tsFiles = getAllFiles(DirectoryManager.getInstance().getAllSequenceFileFolders());
-    recoverSeqFiles(tsFiles);
+    // collect TsFiles from sequential and unsequential data directory
+    List<TsFileResource> seqTsFiles = getAllFiles(DirectoryManager.getInstance().getAllSequenceFileFolders());
+    List<TsFileResource> unseqTsFiles =
+        getAllFiles(DirectoryManager.getInstance().getAllUnSequenceFileFolders());
 
-    // collect TsFiles from unsequential data directory
-    tsFiles = getAllFiles(DirectoryManager.getInstance().getAllUnSequenceFileFolders());
-    recoverUnseqFiles(tsFiles);
+    String taskName = storageGroupName + System.currentTimeMillis();
+    try {
+      RecoverMergeTask recoverMergeTask = new RecoverMergeTask(seqTsFiles, unseqTsFiles,
+          storageGroupSysDir.getPath(), this::mergeEndAction, taskName);
+      logger.info("{} a RecoverMergeTask {} starts...", storageGroupName, taskName);
+      recoverMergeTask.recoverMerge(IoTDBDescriptor.getInstance().getConfig().isMergeAfterReboot());
+    } catch (IOException | MetadataErrorException e) {
+      throw new ProcessorException(e);
+    }
+
+    recoverSeqFiles(seqTsFiles);
+    recoverUnseqFiles(unseqTsFiles);
 
     for (TsFileResource resource : sequenceFileList) {
       latestTimeForEachDevice.putAll(resource.getEndTimeMap());
@@ -198,23 +212,47 @@ public class StorageGroupProcessor {
     }
   }
 
-  private List<File> getAllFiles(List<String> folders) {
+  private List<TsFileResource> getAllFiles(List<String> folders) {
     List<File> tsFiles = new ArrayList<>();
     for (String baseDir : folders) {
       File fileFolder = new File(baseDir, storageGroupName);
       if (!fileFolder.exists()) {
         continue;
       }
+      // some TsFileResource may be persisting when the system crashed, try recovering such
+      // resources
+      continueFailedRenames(fileFolder, TEMP_SUFFIX);
+
+      // some TsFiles were going to be replaced by the merged files when the system crashed and
+      // the process was interrupted before the merged files could be named
+      continueFailedRenames(fileFolder, MERGE_SUFFIX);
+
       Collections
           .addAll(tsFiles, fileFolder.listFiles(file -> file.getName().endsWith(TSFILE_SUFFIX)));
     }
-    return tsFiles;
+    tsFiles.sort(this::compareFileName);
+    List<TsFileResource> ret = new ArrayList<>();
+    tsFiles.forEach(f -> ret.add(new TsFileResource(f)));
+    return ret;
   }
 
-  private void recoverSeqFiles(List<File> tsFiles) throws ProcessorException {
-    tsFiles.sort(this::compareFileName);
-    for (File tsFile : tsFiles) {
-      TsFileResource tsFileResource = new TsFileResource(tsFile);
+  private void continueFailedRenames(File fileFolder, String suffix) {
+    File[] files = fileFolder.listFiles(file -> file.getName().endsWith(suffix));
+    if (files != null) {
+      for (File tempResource : files) {
+        File originResource = new File(tempResource.getPath().replace(suffix, ""));
+        if (originResource.exists()) {
+          tempResource.delete();
+        } else {
+          tempResource.renameTo(originResource);
+        }
+      }
+    }
+  }
+
+  private void recoverSeqFiles(List<TsFileResource> tsFiles) throws ProcessorException {
+
+    for (TsFileResource tsFileResource : tsFiles) {
       sequenceFileList.add(tsFileResource);
       TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-"
           , fileSchema, versionController, tsFileResource, false);
@@ -222,10 +260,8 @@ public class StorageGroupProcessor {
     }
   }
 
-  private void recoverUnseqFiles(List<File> tsFiles) throws ProcessorException {
-    tsFiles.sort(this::compareFileName);
-    for (File tsFile : tsFiles) {
-      TsFileResource tsFileResource = new TsFileResource(tsFile);
+  private void recoverUnseqFiles(List<TsFileResource> tsFiles) throws ProcessorException {
+    for (TsFileResource tsFileResource : tsFiles) {
       unSequenceFileList.add(tsFileResource);
       TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-",
           fileSchema,
@@ -660,6 +696,7 @@ public class StorageGroupProcessor {
       }
       if (unSequenceFileList.isEmpty() || sequenceFileList.isEmpty()) {
         logger.info("{} no files to be merged", storageGroupName);
+        return;
       }
 
       long budget = IoTDBDescriptor.getInstance().getConfig().getMergeMemoryBudget();
@@ -672,12 +709,15 @@ public class StorageGroupProcessor {
               budget);
           return;
         }
+        String taskName = storageGroupName + "-" + System.currentTimeMillis();
         MergeTask mergeTask = new MergeTask(mergeFiles[0], mergeFiles[1],
-            storageGroupSysDir.getPath(), this::mergeEndAction);
+            storageGroupSysDir.getPath(), this::mergeEndAction, taskName);
+        mergingModification = new ModificationFile(storageGroupSysDir + File.separator + "merge"
+            + ".mods");
         MergeManager.getINSTANCE().submit(mergeTask);
         if (logger.isInfoEnabled()) {
-          logger.info("{} submits a merge task, merging {} seqFiles, {} unseqFiles",
-              storageGroupName, mergeFiles[0].size(), mergeFiles[1].size());
+          logger.info("{} submits a merge task {}, merging {} seqFiles, {} unseqFiles",
+              storageGroupName, taskName, mergeFiles[0].size(), mergeFiles[1].size());
         }
         isMerging = true;
         mergeStartTime = System.currentTimeMillis();
@@ -693,6 +733,15 @@ public class StorageGroupProcessor {
   }
 
   private void mergeEndAction(List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles) {
+    logger.info("{} a merge task is ending...", storageGroupName);
+
+    if (seqFiles.isEmpty()) {
+      // merge runtime exception arose, just end this merge
+      isMerging = false;
+      logger.info("{} a merge task abnormally ends", storageGroupName);
+      return;
+    }
+
     mergeLock.writeLock().lock();
     try {
       unSequenceFileList.removeAll(unseqFiles);
@@ -700,32 +749,47 @@ public class StorageGroupProcessor {
       mergeLock.writeLock().unlock();
     }
 
+    for (int i = 0; i < unseqFiles.size(); i++) {
+      TsFileResource unseqFile = unseqFiles.get(i);
+      unseqFile.getMergeQueryLock().writeLock().lock();
+      try {
+        unseqFile.remove();
+      } finally {
+        unseqFile.getMergeQueryLock().writeLock().unlock();
+      }
+    }
+
     for (int i = 0; i < seqFiles.size(); i++) {
       TsFileResource seqFile = seqFiles.get(i);
       seqFile.getMergeQueryLock().writeLock().lock();
       mergeLock.writeLock().lock();
       try {
-        // remove old modifications and write modifications generated during merge
-        seqFile.removeModFile();
-        for (Modification modification : mergingModification.getModifications()) {
-          seqFile.getModFile().write(modification);
-        }
-      } catch (IOException e) {
-        logger.error("{} cannot clean the ModificationFile of {} after merge", storageGroupName,
-            seqFile.getFile(), e);
-      }
-      if (i == seqFiles.size() - 1) {
+        logger.debug("{} is updating the {} merged file's modification file", storageGroupName, i);
         try {
-          mergingModification.remove();
+          // remove old modifications and write modifications generated during merge
+          seqFile.removeModFile();
+          for (Modification modification : mergingModification.getModifications()) {
+            seqFile.getModFile().write(modification);
+          }
         } catch (IOException e) {
-          logger.error("{} cannot remove merging modification ", storageGroupName, e);
+          logger.error("{} cannot clean the ModificationFile of {} after merge", storageGroupName,
+              seqFile.getFile(), e);
         }
-        mergingModification = null;
-        isMerging = false;
+        if (i == seqFiles.size() - 1) {
+          try {
+            mergingModification.remove();
+          } catch (IOException e) {
+            logger.error("{} cannot remove merging modification ", storageGroupName, e);
+          }
+          mergingModification = null;
+          isMerging = false;
+        }
+      } finally {
+        seqFile.getMergeQueryLock().writeLock().unlock();
+        mergeLock.writeLock().unlock();
       }
-      seqFile.getMergeQueryLock().writeLock().unlock();
-      mergeLock.writeLock().unlock();
     }
+    logger.info("{} a merge task ends", storageGroupName);
   }
 
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index b00e422..f2484c1 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -123,8 +123,10 @@ public class TsFileResource {
         ReadWriteIOUtils.write(entry.getValue(), outputStream);
       }
     }
-    FileUtils.moveFile(new File(file + RESOURCE_SUFFIX + TEMP_SUFFIX),
-        new File(file + RESOURCE_SUFFIX));
+    File src = new File(file + RESOURCE_SUFFIX + TEMP_SUFFIX);
+    File dest = new File(file + RESOURCE_SUFFIX);
+    dest.delete();
+    FileUtils.moveFile(src, dest);
   }
 
   public void deSerialize() throws IOException {
@@ -240,4 +242,9 @@ public class TsFileResource {
     getModFile().remove();
     modFile = null;
   }
+
+  public void remove() {
+    file.delete();
+    new File(file.getPath() + RESOURCE_SUFFIX).delete();
+  }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index f77c353..1b6befa 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -400,6 +400,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         return true;
       case "merge":
          StorageEngine.getInstance().mergeAll();
+         return true;
       default:
         return false;
     }