You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/06/09 11:24:36 UTC

[incubator-iotdb] branch enhance_merge_management created (now efa77dd)

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a change to branch enhance_merge_management
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at efa77dd  enhance merge task management

This branch includes the following new commits:

     new efa77dd  enhance merge task management

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: enhance merge task management

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch enhance_merge_management
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit efa77dd52f0f073dfc47c8af8d4709e828d1a12c
Author: jt2594838 <jt...@163.com>
AuthorDate: Tue Jun 9 19:24:23 2020 +0800

    enhance merge task management
---
 docs/UserGuide/Operation Manual/SQL Reference.md   |   8 +
 .../org/apache/iotdb/db/qp/strategy/SqlBase.g4     |   5 +
 .../org/apache/iotdb/db/conf/IoTDBConstant.java    |   6 +
 .../iotdb/db/engine/merge/manage/MergeFuture.java  | 153 +++++++++++++++
 .../iotdb/db/engine/merge/manage/MergeManager.java | 193 +++++++++++++++++-
 .../db/engine/merge/manage/MergeManagerMBean.java  |  26 +++
 .../db/engine/merge/manage/MergeThreadPool.java    |  48 +++++
 .../iotdb/db/engine/merge/task/MergeFileTask.java  | 107 +++++++---
 .../db/engine/merge/task/MergeMultiChunkTask.java  | 216 ++++++++++++++-------
 .../iotdb/db/engine/merge/task/MergeTask.java      |  77 +++++++-
 .../db/engine/merge/task/RecoverMergeTask.java     |  13 +-
 .../main/java/org/apache/iotdb/db/qp/Planner.java  |   2 +
 .../apache/iotdb/db/qp/constant/SQLConstant.java   |   4 +
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |  49 +++++
 .../org/apache/iotdb/db/qp/logical/Operator.java   |   3 +-
 .../db/qp/logical/sys/ShowMergeStatusOperator.java |  30 +++
 .../db/qp/physical/sys/ShowMergeStatusPlan.java    |  27 +++
 .../apache/iotdb/db/qp/physical/sys/ShowPlan.java  |   2 +-
 .../iotdb/db/qp/strategy/LogicalGenerator.java     |   8 +
 .../iotdb/db/qp/strategy/PhysicalGenerator.java    |   3 +
 .../org/apache/iotdb/db/service/RPCService.java    |   4 +-
 .../org/apache/iotdb/db/service/ServiceType.java   |   2 +-
 .../org/apache/iotdb/db/service/StaticResps.java   |  12 ++
 .../org/apache/iotdb/db/service/TSServiceImpl.java |   2 +
 .../iotdb/db/engine/merge/MergeManagerTest.java    | 168 ++++++++++++++++
 .../iotdb/db/integration/IoTDBMergeTest.java       |  47 ++++-
 26 files changed, 1092 insertions(+), 123 deletions(-)

diff --git a/docs/UserGuide/Operation Manual/SQL Reference.md b/docs/UserGuide/Operation Manual/SQL Reference.md
index 3e15816..d23b36b 100644
--- a/docs/UserGuide/Operation Manual/SQL Reference.md	
+++ b/docs/UserGuide/Operation Manual/SQL Reference.md	
@@ -208,6 +208,14 @@ Eg: IoTDB > SHOW STORAGE GROUP
 Note: This statement can be used in IoTDB Client and JDBC.
 ```
 
+* Show Merge Status Statement
+
+```
+SHOW MERGE STATUS
+Eg: IoTDB > SHOW MERGE STATUS
+Note: This statement can be used in IoTDB Client and JDBC.
+```
+
 * Count Timeseries Statement
 
 ```
diff --git a/server/src/main/antlr4/org/apache/iotdb/db/qp/strategy/SqlBase.g4 b/server/src/main/antlr4/org/apache/iotdb/db/qp/strategy/SqlBase.g4
index 7d5b9f2..c464446 100644
--- a/server/src/main/antlr4/org/apache/iotdb/db/qp/strategy/SqlBase.g4
+++ b/server/src/main/antlr4/org/apache/iotdb/db/qp/strategy/SqlBase.g4
@@ -73,6 +73,7 @@ statement
     | SHOW STORAGE GROUP #showStorageGroup
     | SHOW CHILD PATHS prefixPath? #showChildPaths
     | SHOW DEVICES prefixPath? #showDevices
+    | SHOW MERGE STATUS #showMergeStatus
     | COUNT TIMESERIES prefixPath? (GROUP BY LEVEL OPERATOR_EQ INT)? #countTimeseries
     | COUNT NODES prefixPath LEVEL OPERATOR_EQ INT #countNodes
     | LOAD CONFIGURATION (MINUS GLOBAL)? #loadConfigurationStatement
@@ -876,6 +877,10 @@ TRUE
 FALSE
     : F A L S E
     ;
+
+STATUS
+    : S T A T U S
+    ;
 //============================
 // End of the keywords list
 //============================
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index 58e3154..836afbd 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -73,6 +73,12 @@ public class IoTDBConstant {
   public static final String COLUMN_STORAGE_GROUP = "storage group";
   public static final String COLUMN_TTL = "ttl";
 
+  public static final String COLUMN_TASK_NAME = "task name";
+  public static final String COLUMN_CREATED_TIME = "created time";
+  public static final String COLUMN_PROGRESS = "progress";
+  public static final String COLUMN_CANCELLED = "cancelled";
+  public static final String COLUMN_DONE = "done";
+
   public static final String PATH_WILDCARD = "*";
 
   // data folder name
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeFuture.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeFuture.java
new file mode 100644
index 0000000..8d42003
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeFuture.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge.manage;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.concurrent.FutureTask;
+import org.apache.iotdb.db.engine.merge.task.MergeMultiChunkTask.MergeChunkHeapTask;
+import org.apache.iotdb.db.engine.merge.task.MergeTask;
+
+public abstract class MergeFuture extends FutureTask<Void> implements Comparable<MergeFuture> {
+
+  private final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"
+      + ".SSS'Z'");
+  private Date createdTime;
+
+  public MergeFuture(Callable callable) {
+    super(callable);
+    createdTime = new Date(System.currentTimeMillis());
+  }
+
+  public String getCreatedTime() {
+    return dateFormat.format(createdTime);
+  }
+
+  public abstract String getTaskName();
+
+  public abstract String getProgress();
+
+  @Override
+  public int compareTo(MergeFuture future) {
+    return this.createdTime.compareTo(future.createdTime);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    MergeFuture future = (MergeFuture) o;
+    return Objects.equals(createdTime, future.createdTime);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(createdTime);
+  }
+
+  public static class MainMergeFuture extends MergeFuture {
+
+    private MergeTask bindingTask;
+
+    public MainMergeFuture(MergeTask task) {
+      super(task);
+      bindingTask = task;
+    }
+
+    @Override
+    public String getTaskName() {
+      return bindingTask.getTaskName();
+    }
+
+    @Override
+    public String getProgress() {
+      return bindingTask.getProgress();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      if (!super.equals(o)) {
+        return false;
+      }
+      MainMergeFuture that = (MainMergeFuture) o;
+      return Objects.equals(bindingTask, that.bindingTask);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(super.hashCode(), bindingTask);
+    }
+  }
+
+  public static class SubMergeFuture extends MergeFuture {
+
+    private MergeChunkHeapTask bindingTask;
+
+    public SubMergeFuture(MergeChunkHeapTask task) {
+      super(task);
+      bindingTask = task;
+    }
+
+    @Override
+    public String getTaskName() {
+      return bindingTask.getTaskName();
+    }
+
+    @Override
+    public String getProgress() {
+      return bindingTask.getProgress();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      if (!super.equals(o)) {
+        return false;
+      }
+      SubMergeFuture that = (SubMergeFuture) o;
+      return Objects.equals(bindingTask, that.bindingTask);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(super.hashCode(), bindingTask);
+    }
+  }
+}
+
+
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
index 06de716..1c87a40 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
@@ -19,7 +19,16 @@
 
 package org.apache.iotdb.db.engine.merge.manage;
 
-import java.util.concurrent.Callable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -27,11 +36,14 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.merge.task.MergeMultiChunkTask.MergeChunkHeapTask;
 import org.apache.iotdb.db.engine.merge.task.MergeTask;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.service.IService;
+import org.apache.iotdb.db.service.JMXService;
 import org.apache.iotdb.db.service.ServiceType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,15 +52,23 @@ import org.slf4j.LoggerFactory;
  * MergeManager provides a ThreadPool to queue and run all merge tasks to restrain the total
  * resources occupied by merge and manages a Timer to periodically issue a global merge.
  */
-public class MergeManager implements IService {
+public class MergeManager implements IService, MergeManagerMBean {
 
   private static final Logger logger = LoggerFactory.getLogger(MergeManager.class);
   private static final MergeManager INSTANCE = new MergeManager();
+  private final String mbeanName = String
+      .format("%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE,
+          getID().getJmxName());
 
   private AtomicInteger threadCnt = new AtomicInteger();
   private ThreadPoolExecutor mergeTaskPool;
   private ThreadPoolExecutor mergeChunkSubTaskPool;
   private ScheduledExecutorService timedMergeThreadPool;
+  private ScheduledExecutorService taskCleanerThreadPool;
+
+  // TODO: add to JMX
+  private Map<String, Set<MergeFuture>> storageGroupMainTasks = new ConcurrentHashMap<>();
+  private Map<String, Set<MergeFuture>> storageGroupSubTasks = new ConcurrentHashMap<>();
 
   private MergeManager() {
   }
@@ -58,15 +78,20 @@ public class MergeManager implements IService {
   }
 
   public void submitMainTask(MergeTask mergeTask) {
-    mergeTaskPool.submit(mergeTask);
+    MergeFuture future = (MergeFuture) mergeTaskPool.submit(mergeTask);
+    storageGroupMainTasks.computeIfAbsent(mergeTask.getStorageGroupName(),
+        k -> new ConcurrentSkipListSet<>()).add(future);
   }
 
-  public Future submitChunkSubTask(Callable callable) {
-    return mergeChunkSubTaskPool.submit(callable);
+  public Future<Void> submitChunkSubTask(MergeChunkHeapTask task) {
+    MergeFuture future = (MergeFuture) mergeChunkSubTaskPool.submit(task);
+    storageGroupSubTasks.computeIfAbsent(task.getStorageGroupName(), k -> new ConcurrentSkipListSet<>()).add(future);
+    return future;
   }
 
   @Override
   public void start() {
+    JMXService.registerMBean(this, mbeanName);
     if (mergeTaskPool == null) {
       int threadNum = IoTDBDescriptor.getInstance().getConfig().getMergeThreadNum();
       if (threadNum <= 0) {
@@ -78,11 +103,9 @@ public class MergeManager implements IService {
         chunkSubThreadNum = 1;
       }
 
-      mergeTaskPool =
-          (ThreadPoolExecutor) Executors.newFixedThreadPool(threadNum,
+      mergeTaskPool = new MergeThreadPool(threadNum,
               r -> new Thread(r, "MergeThread-" + threadCnt.getAndIncrement()));
-      mergeChunkSubTaskPool =
-          (ThreadPoolExecutor) Executors.newFixedThreadPool(threadNum * chunkSubThreadNum,
+      mergeChunkSubTaskPool = new MergeThreadPool(threadNum * chunkSubThreadNum,
               r -> new Thread(r, "MergeChunkSubThread-" + threadCnt.getAndIncrement()));
       long mergeInterval = IoTDBDescriptor.getInstance().getConfig().getMergeIntervalSec();
       if (mergeInterval > 0) {
@@ -91,6 +114,10 @@ public class MergeManager implements IService {
         timedMergeThreadPool.scheduleAtFixedRate(this::mergeAll, mergeInterval,
             mergeInterval, TimeUnit.SECONDS);
       }
+
+      taskCleanerThreadPool = Executors.newSingleThreadScheduledExecutor( r -> new Thread(r,
+          "MergeTaskCleaner"));
+      taskCleanerThreadPool.scheduleAtFixedRate(this::cleanFinishedTask, 30, 30, TimeUnit.MINUTES);
       logger.info("MergeManager started");
     }
   }
@@ -102,6 +129,8 @@ public class MergeManager implements IService {
         timedMergeThreadPool.shutdownNow();
         timedMergeThreadPool = null;
       }
+      taskCleanerThreadPool.shutdownNow();
+      taskCleanerThreadPool = null;
       mergeTaskPool.shutdownNow();
       mergeChunkSubTaskPool.shutdownNow();
       logger.info("Waiting for task pool to shut down");
@@ -116,6 +145,7 @@ public class MergeManager implements IService {
       mergeTaskPool = null;
       logger.info("MergeManager stopped");
     }
+    JMXService.deregisterMBean(mbeanName);
   }
 
   @Override
@@ -125,6 +155,9 @@ public class MergeManager implements IService {
         awaitTermination(timedMergeThreadPool, millseconds);
         timedMergeThreadPool = null;
       }
+      awaitTermination(taskCleanerThreadPool, millseconds);
+      taskCleanerThreadPool = null;
+
       awaitTermination(mergeTaskPool, millseconds);
       awaitTermination(mergeChunkSubTaskPool, millseconds);
       logger.info("Waiting for task pool to shut down");
@@ -164,4 +197,146 @@ public class MergeManager implements IService {
       logger.error("Cannot perform a global merge because", e);
     }
   }
+
+  /**
+   * Abort all merges of a storage group. The caller must acquire the write lock of the
+   * corresponding storage group.
+   * @param storageGroup
+   */
+  @Override
+  public void abortMerge(String storageGroup) {
+    // abort sub-tasks first
+    Set<MergeFuture> subTasks = storageGroupSubTasks
+        .getOrDefault(storageGroup, Collections.emptySet());
+    Iterator<MergeFuture> subIterator = subTasks.iterator();
+    while (subIterator.hasNext()) {
+      Future<Void> next = subIterator.next();
+      if (!next.isDone() && !next.isCancelled()) {
+        next.cancel(true);
+      }
+      subIterator.remove();
+    }
+    // abort main tasks
+    Set<MergeFuture> mainTasks = storageGroupMainTasks
+        .getOrDefault(storageGroup, Collections.emptySet());
+    Iterator<MergeFuture> mainIterator = mainTasks.iterator();
+    while (mainIterator.hasNext()) {
+      Future<Void> next = mainIterator.next();
+      if (!next.isDone() && !next.isCancelled()) {
+        next.cancel(true);
+      }
+      mainIterator.remove();
+    }
+  }
+
+  private void cleanFinishedTask() {
+    for (Set<MergeFuture> subTasks : storageGroupSubTasks.values()) {
+      subTasks.removeIf(next -> next.isDone() || next.isCancelled());
+    }
+    for (Set<MergeFuture> mainTasks : storageGroupMainTasks.values()) {
+      mainTasks.removeIf(next -> next.isDone() || next.isCancelled());
+    }
+  }
+
+  /**
+   *
+   * @return 2 maps, the first map contains status of main merge tasks and the second map
+   * contains status of merge chunk heap tasks, both map use storage groups as keys and list of
+   * merge status as values.
+   */
+  public Map<String, List<TaskStatus>>[] collectTaskStatus() {
+    Map<String, List<TaskStatus>>[] result = new Map[2];
+    result[0] = new HashMap<>();
+    result[1] = new HashMap<>();
+    for (Entry<String, Set<MergeFuture>> stringSetEntry : storageGroupMainTasks.entrySet()) {
+      String storageGroup = stringSetEntry.getKey();
+      Set<MergeFuture> tasks = stringSetEntry.getValue();
+      for (MergeFuture task : tasks) {
+        result[0].computeIfAbsent(storageGroup, s -> new ArrayList<>()).add(new TaskStatus(task));
+      }
+    }
+
+    for (Entry<String, Set<MergeFuture>> stringSetEntry : storageGroupSubTasks.entrySet()) {
+      String storageGroup = stringSetEntry.getKey();
+      Set<MergeFuture> tasks = stringSetEntry.getValue();
+      for (MergeFuture task : tasks) {
+        result[1].computeIfAbsent(storageGroup, s -> new ArrayList<>()).add(new TaskStatus(task));
+      }
+    }
+    return result;
+  }
+
+  public String genMergeTaskReport() {
+    Map<String, List<TaskStatus>>[] statusMaps = collectTaskStatus();
+    StringBuilder builder = new StringBuilder("Main tasks:").append(System.lineSeparator());
+    for (Entry<String, List<TaskStatus>> stringListEntry : statusMaps[0].entrySet()) {
+      String storageGroup = stringListEntry.getKey();
+      List<TaskStatus> statusList = stringListEntry.getValue();
+      builder.append("\t").append("Storage group: ").append(storageGroup).append(System.lineSeparator());
+      for (TaskStatus status : statusList) {
+        builder.append("\t\t").append(status.toString()).append(System.lineSeparator());
+      }
+    }
+
+    builder.append("Sub tasks:").append(System.lineSeparator());
+    for (Entry<String, List<TaskStatus>> stringListEntry : statusMaps[1].entrySet()) {
+      String storageGroup = stringListEntry.getKey();
+      List<TaskStatus> statusList = stringListEntry.getValue();
+      builder.append("\t").append("Storage group: ").append(storageGroup).append(System.lineSeparator());
+      for (TaskStatus status : statusList) {
+        builder.append("\t\t").append(status.toString()).append(System.lineSeparator());
+      }
+    }
+    return builder.toString();
+  }
+
+  @Override
+  public void printMergeStatus() {
+    if (logger.isInfoEnabled()) {
+      logger.info("Running tasks:\n {}", genMergeTaskReport());
+    }
+  }
+
+  public static class TaskStatus {
+    private String taskName;
+    private String createdTime;
+    private String progress;
+    private boolean isDone;
+    private boolean isCancelled;
+
+    public TaskStatus(MergeFuture future) {
+      this.taskName = future.getTaskName();
+      this.createdTime = future.getCreatedTime();
+      this.progress = future.getProgress();
+      this.isCancelled = future.isCancelled();
+      this.isDone = future.isDone();
+    }
+
+    @Override
+    public String toString() {
+      return String.format("%s, "
+              + "%s, %s, done:%s, cancelled:%s", taskName,
+          createdTime, progress, isDone, isCancelled);
+    }
+
+    public String getTaskName() {
+      return taskName;
+    }
+
+    public String getCreatedTime() {
+      return createdTime;
+    }
+
+    public String getProgress() {
+      return progress;
+    }
+
+    public boolean isDone() {
+      return isDone;
+    }
+
+    public boolean isCancelled() {
+      return isCancelled;
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManagerMBean.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManagerMBean.java
new file mode 100644
index 0000000..645f51f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManagerMBean.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge.manage;
+
+public interface MergeManagerMBean {
+  void printMergeStatus();
+
+  void abortMerge(String storageGroup);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeThreadPool.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeThreadPool.java
new file mode 100644
index 0000000..0ac292f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeThreadPool.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge.manage;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.iotdb.db.engine.merge.manage.MergeFuture.MainMergeFuture;
+import org.apache.iotdb.db.engine.merge.manage.MergeFuture.SubMergeFuture;
+import org.apache.iotdb.db.engine.merge.task.MergeMultiChunkTask.MergeChunkHeapTask;
+import org.apache.iotdb.db.engine.merge.task.MergeTask;
+
+public class MergeThreadPool extends ThreadPoolExecutor {
+
+  public MergeThreadPool(int corePoolSize, ThreadFactory threadFactory) {
+    super(corePoolSize, corePoolSize, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(),
+        threadFactory);
+  }
+
+  @Override
+  protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+    if (callable instanceof MergeTask) {
+      return (RunnableFuture<T>) new MainMergeFuture((MergeTask) callable);
+    } else {
+      return (RunnableFuture<T>) new SubMergeFuture((MergeChunkHeapTask) callable);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
index 15cfa6e..4a2efbf 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
@@ -65,6 +65,9 @@ class MergeFileTask {
 
   private FSFactory fsFactory = FSFactoryProducer.getFSFactory();
 
+  private int currentMergeIndex;
+  private String currMergeFile;
+
   MergeFileTask(String taskName, MergeContext context, MergeLogger mergeLogger,
       MergeResource resource, List<TsFileResource> unmergedSeqFiles) {
     this.taskName = taskName;
@@ -81,8 +84,11 @@ class MergeFileTask {
       logger.info("{} starts to merge {} files", taskName, unmergedFiles.size());
     }
     long startTime = System.currentTimeMillis();
-    int cnt = 0;
-    for (TsFileResource seqFile : unmergedFiles) {
+    for (int i = 0; i < unmergedFiles.size(); i++) {
+      TsFileResource seqFile = unmergedFiles.get(i);
+      currentMergeIndex = i;
+      currMergeFile = seqFile.getPath();
+
       int mergedChunkNum = context.getMergedChunkCnt().getOrDefault(seqFile, 0);
       int unmergedChunkNum = context.getUnmergedChunkCnt().getOrDefault(seqFile, 0);
       if (mergedChunkNum >= unmergedChunkNum) {
@@ -102,10 +108,13 @@ class MergeFileTask {
         }
         moveMergedToOld(seqFile);
       }
-      cnt++;
-      if (logger.isInfoEnabled()) {
-        logger.debug("{} has merged {}/{} files", taskName, cnt, unmergedFiles.size());
+
+      if (Thread.interrupted()) {
+        Thread.currentThread().interrupt();
+        return;
       }
+
+      logProgress();
     }
     if (logger.isInfoEnabled()) {
       logger.info("{} has merged all files after {}ms", taskName,
@@ -114,6 +123,18 @@ class MergeFileTask {
     mergeLogger.logMergeEnd();
   }
 
+  private void logProgress() {
+    if (logger.isInfoEnabled()) {
+      logger.debug("{} has merged {}, processed {}/{} files", taskName, currMergeFile,
+          currentMergeIndex + 1, unmergedFiles.size());
+    }
+  }
+
+  public String getProgress() {
+    return String.format("Merging %s, processed %d/%d files", currMergeFile,
+        currentMergeIndex + 1, unmergedFiles.size());
+  }
+
   private void moveMergedToOld(TsFileResource seqFile) throws IOException {
     int mergedChunkNum = context.getMergedChunkCnt().getOrDefault(seqFile, 0);
     if (mergedChunkNum == 0) {
@@ -127,17 +148,8 @@ class MergeFileTask {
       FileReaderManager.getInstance().closeFileAndRemoveReader(seqFile.getPath());
 
       resource.removeFileReader(seqFile);
-      TsFileIOWriter oldFileWriter;
-      try {
-        oldFileWriter = new ForceAppendTsFileWriter(seqFile.getFile());
-        mergeLogger.logFileMergeStart(seqFile.getFile(),
-            ((ForceAppendTsFileWriter) oldFileWriter).getTruncatePosition());
-        logger.debug("{} moving merged chunks of {} to the old file", taskName, seqFile);
-        ((ForceAppendTsFileWriter) oldFileWriter).doTruncate();
-      } catch (TsFileNotCompleteException e) {
-        // this file may already be truncated if this merge is a system reboot merge
-        oldFileWriter = new RestorableTsFileIOWriter(seqFile.getFile());
-      }
+      TsFileIOWriter oldFileWriter = getOldFileWriter(seqFile);
+
       // filter the chunks that have been merged
       oldFileWriter.filterChunks(context.getUnmergedChunkStartTimes().get(seqFile));
 
@@ -156,6 +168,13 @@ class MergeFileTask {
           String deviceId = entry.getKey();
           List<ChunkMetadata> chunkMetadataList = entry.getValue();
           writeMergedChunkGroup(chunkMetadataList, deviceId, newFileReader, oldFileWriter);
+
+          if (Thread.interrupted()) {
+            Thread.currentThread().interrupt();
+            oldFileWriter.close();
+            restoreOldFile(seqFile);
+            return;
+          }
         }
       }
       oldFileWriter.endFile();
@@ -175,19 +194,50 @@ class MergeFileTask {
               .getFile(nextMergeVersionFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX));
       seqFile.setFile(nextMergeVersionFile);
     } catch (Exception e) {
-      RestorableTsFileIOWriter oldFileRecoverWriter = new RestorableTsFileIOWriter(
-          seqFile.getFile());
-      if (oldFileRecoverWriter.hasCrashed() && oldFileRecoverWriter.canWrite()) {
-        oldFileRecoverWriter.endFile();
-      } else {
-        oldFileRecoverWriter.close();
-      }
+      restoreOldFile(seqFile);
       throw e;
     } finally {
       seqFile.writeUnlock();
     }
   }
 
+  /**
+   * Restore an old seq file which is being written new chunks when exceptions occur or the task
+   * is aborted.
+   * @param seqFile
+   * @throws IOException
+   */
+  private void restoreOldFile(TsFileResource seqFile) throws IOException {
+    RestorableTsFileIOWriter oldFileRecoverWriter = new RestorableTsFileIOWriter(
+        seqFile.getFile());
+    if (oldFileRecoverWriter.hasCrashed() && oldFileRecoverWriter.canWrite()) {
+      oldFileRecoverWriter.endFile();
+    } else {
+      oldFileRecoverWriter.close();
+    }
+  }
+
+  /**
+   * Open an appending writer for an old seq file so we can add new chunks to it.
+   * @param seqFile
+   * @return
+   * @throws IOException
+   */
+  private TsFileIOWriter getOldFileWriter(TsFileResource seqFile) throws IOException {
+    TsFileIOWriter oldFileWriter;
+    try {
+      oldFileWriter = new ForceAppendTsFileWriter(seqFile.getFile());
+      mergeLogger.logFileMergeStart(seqFile.getFile(),
+          ((ForceAppendTsFileWriter) oldFileWriter).getTruncatePosition());
+      logger.debug("{} moving merged chunks of {} to the old file", taskName, seqFile);
+      ((ForceAppendTsFileWriter) oldFileWriter).doTruncate();
+    } catch (TsFileNotCompleteException e) {
+      // this file may already be truncated if this merge is a system reboot merge
+      oldFileWriter = new RestorableTsFileIOWriter(seqFile.getFile());
+    }
+    return oldFileWriter;
+  }
+
   private void updateHistoricalVersions(TsFileResource seqFile) {
     // as the new file contains data of other files, track their versions in the new file
     // so that we will be able to compare data across different IoTDBs that share the same file
@@ -245,6 +295,12 @@ class MergeFileTask {
         fileWriter.startChunkGroup(path.getDevice());
         long maxVersion = writeUnmergedChunks(chunkStartTimes, chunkMetadataList,
             resource.getFileReader(seqFile), fileWriter);
+
+        if (Thread.interrupted()) {
+          Thread.currentThread().interrupt();
+          return;
+        }
+
         fileWriter.writeVersion(maxVersion + 1);
         fileWriter.endChunkGroup();
       }
@@ -303,6 +359,11 @@ class MergeFileTask {
           break;
         }
       }
+
+      if (Thread.interrupted()) {
+        Thread.currentThread().interrupt();
+        return maxVersion;
+      }
     }
     return maxVersion;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
index 67caf2c..464ce44 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
@@ -28,6 +28,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.PriorityQueue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -57,7 +58,7 @@ import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class MergeMultiChunkTask {
+public class MergeMultiChunkTask {
 
   private static final Logger logger = LoggerFactory.getLogger(MergeMultiChunkTask.class);
   private static int minChunkPointNum = IoTDBDescriptor.getInstance().getConfig()
@@ -81,9 +82,11 @@ class MergeMultiChunkTask {
   private int concurrentMergeSeriesNum;
   private List<Path> currMergingPaths = new ArrayList<>();
 
-  MergeMultiChunkTask(MergeContext context, String taskName, MergeLogger mergeLogger,
+  private String storageGroupName;
+
+  public MergeMultiChunkTask(MergeContext context, String taskName, MergeLogger mergeLogger,
       MergeResource mergeResource, boolean fullMerge, List<Path> unmergedSeries,
-      int concurrentMergeSeriesNum) {
+      int concurrentMergeSeriesNum, String storageGroupName) {
     this.mergeContext = context;
     this.taskName = taskName;
     this.mergeLogger = mergeLogger;
@@ -91,6 +94,7 @@ class MergeMultiChunkTask {
     this.fullMerge = fullMerge;
     this.unmergedSeries = unmergedSeries;
     this.concurrentMergeSeriesNum = concurrentMergeSeriesNum;
+    this.storageGroupName = storageGroupName;
   }
 
   void mergeSeries() throws IOException {
@@ -110,6 +114,11 @@ class MergeMultiChunkTask {
         currMergingPaths = pathSelector.next();
         mergePaths();
         resource.clearChunkWriterCache();
+        if (Thread.interrupted()) {
+          logger.info("MergeMultiChunkTask {} aborted", taskName);
+          Thread.currentThread().interrupt();
+          return;
+        }
         mergedSeriesCnt += currMergingPaths.size();
         logMergeProgress();
       }
@@ -131,6 +140,10 @@ class MergeMultiChunkTask {
     }
   }
 
+  public String getProgress() {
+    return String.format("Processed %d/%d series", mergedSeriesCnt, unmergedSeries.size());
+  }
+
   private void mergePaths() throws IOException {
     mergeLogger.logTSStart(currMergingPaths);
     IPointReader[] unseqReaders;
@@ -144,6 +157,11 @@ class MergeMultiChunkTask {
 
     for (int i = 0; i < resource.getSeqFiles().size(); i++) {
       pathsMergeOneFile(i, unseqReaders);
+
+      if (Thread.interrupted()) {
+        Thread.currentThread().interrupt();
+        return;
+      }
     }
     mergeLogger.logTSEnd();
   }
@@ -153,7 +171,7 @@ class MergeMultiChunkTask {
     TsFileResource currTsFile = resource.getSeqFiles().get(seqFileIdx);
     String deviceId = currMergingPaths.get(0).getDevice();
     long currDeviceMinTime = currTsFile.getStartTime(deviceId);
-    //COMMENTS: is this correct? how about if there are other devices (in the currMergingPaths) that have unseq data?
+    // all paths in one call are from the same device
     if (currDeviceMinTime == Long.MAX_VALUE) {
       return;
     }
@@ -178,6 +196,11 @@ class MergeMultiChunkTask {
       modifications[i] = resource.getModifications(currTsFile, currMergingPaths.get(i));
       seqChunkMeta[i] = resource.queryChunkMetadata(currMergingPaths.get(i), currTsFile);
       modifyChunkMetaData(seqChunkMeta[i], modifications[i]);
+
+      if (Thread.interrupted()) {
+        Thread.currentThread().interrupt();
+        return;
+      }
     }
 
     List<Integer> unskippedPathIndices = filterNoDataPaths(seqChunkMeta, seqFileIdx);
@@ -245,22 +268,28 @@ class MergeMultiChunkTask {
     mergedChunkNum.set(0);
     unmergedChunkNum.set(0);
 
-    List<Future> futures = new ArrayList<>();
+    List<Future<Void>> futures = new ArrayList<>();
     for (int i = 0; i < mergeChunkSubTaskNum; i++) {
-      int finalI = i;
-      futures.add(MergeManager.getINSTANCE().submitChunkSubTask(() -> {
-        mergeChunkHeap(chunkIdxHeaps[finalI], metaListEntries, ptWrittens,
-            reader,
-            mergeFileWriter, unseqReaders,
-            currFile,
-            isLastFile);
-        return null;
-      }));
+      futures.add(MergeManager.getINSTANCE()
+          .submitChunkSubTask(new MergeChunkHeapTask(chunkIdxHeaps[i],
+              metaListEntries, ptWrittens,
+              reader,
+              mergeFileWriter, unseqReaders,
+              currFile,
+              isLastFile, i)));
+
+      if (Thread.interrupted()) {
+        Thread.currentThread().interrupt();
+        return false;
+      }
     }
     for (int i = 0; i < mergeChunkSubTaskNum; i++) {
       try {
         futures.get(i).get();
-      } catch (InterruptedException | ExecutionException e) {
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        return false;
+      } catch (ExecutionException e) {
         throw new IOException(e);
       }
     }
@@ -274,56 +303,7 @@ class MergeMultiChunkTask {
     return mergedChunkNum.get() > 0;
   }
 
-  private void mergeChunkHeap(PriorityQueue<Integer> chunkIdxHeap, MetaListEntry[] metaListEntries,
-      int[] ptWrittens,
-      TsFileSequenceReader reader,
-      RestorableTsFileIOWriter mergeFileWriter, IPointReader[] unseqReaders,
-      TsFileResource currFile,
-      boolean isLastFile) throws IOException {
-    while (!chunkIdxHeap.isEmpty()) {
-      int pathIdx = chunkIdxHeap.poll();
-      Path path = currMergingPaths.get(pathIdx);
-      MeasurementSchema measurementSchema = resource.getSchema(path);
-      IChunkWriter chunkWriter = resource.getChunkWriter(measurementSchema);
-      if (metaListEntries[pathIdx] != null) {
-        MetaListEntry metaListEntry = metaListEntries[pathIdx];
-        ChunkMetadata currMeta = metaListEntry.current();
-        boolean isLastChunk = !metaListEntry.hasNext();
-        boolean chunkOverflowed = MergeUtils
-            .isChunkOverflowed(currTimeValuePairs[pathIdx], currMeta);
-        boolean chunkTooSmall = MergeUtils
-            .isChunkTooSmall(ptWrittens[pathIdx], currMeta, isLastChunk, minChunkPointNum);
-
-        Chunk chunk;
-        synchronized (reader) {
-          chunk = reader.readMemChunk(currMeta);
-        }
-        ptWrittens[pathIdx] = mergeChunkV2(currMeta, chunkOverflowed, chunkTooSmall, chunk,
-            ptWrittens[pathIdx], pathIdx, mergeFileWriter, unseqReaders[pathIdx], chunkWriter,
-            currFile);
-
-        if (!isLastChunk) {
-          metaListEntry.next();
-          chunkIdxHeap.add(pathIdx);
-          continue;
-        }
-      }
-      // this only happens when the seqFiles do not contain this series, otherwise the remaining
-      // data will be merged with the last chunk in the seqFiles
-      if (isLastFile && currTimeValuePairs[pathIdx] != null) {
-        ptWrittens[pathIdx] += writeRemainingUnseq(chunkWriter, unseqReaders[pathIdx],
-            Long.MAX_VALUE,
-            pathIdx);
-        mergedChunkNum.incrementAndGet();
-      }
-      // the last merged chunk may still be smaller than the threshold, flush it anyway
-      if (ptWrittens[pathIdx] > 0) {
-        synchronized (mergeFileWriter) {
-          chunkWriter.writeToFileWriter(mergeFileWriter);
-        }
-      }
-    }
-  }
+
 
   /**
    * merge a sequence chunk SK
@@ -338,6 +318,7 @@ class MergeMultiChunkTask {
    * 3. other cases: need to unCompress the chunk and write 3.1 SK isn't overflowed 3.2 SK is
    * overflowed
    */
+  @SuppressWarnings("java:S2445") // avoid writing the same writer concurrently
   private int mergeChunkV2(ChunkMetadata currMeta, boolean chunkOverflowed,
       boolean chunkTooSmall, Chunk chunk, int lastUnclosedChunkPoint, int pathIdx,
       TsFileIOWriter mergeFileWriter, IPointReader unseqReader,
@@ -379,7 +360,7 @@ class MergeMultiChunkTask {
     }
 
     // update points written statistics
-    mergeContext.incTotalPointWritten(unclosedChunkPoint - lastUnclosedChunkPoint);
+    mergeContext.incTotalPointWritten((long) unclosedChunkPoint - lastUnclosedChunkPoint);
     if (minChunkPointNum > 0 && unclosedChunkPoint >= minChunkPointNum
         || unclosedChunkPoint > 0 && minChunkPointNum < 0) {
       // the new chunk's size is large enough and it should be flushed
@@ -445,4 +426,107 @@ class MergeMultiChunkTask {
     }
     return cnt;
   }
+
+  public class MergeChunkHeapTask implements Callable<Void> {
+
+    private PriorityQueue<Integer> chunkIdxHeap;
+    private MetaListEntry[] metaListEntries;
+    private int[] ptWrittens;
+    private TsFileSequenceReader reader;
+    private RestorableTsFileIOWriter mergeFileWriter;
+    private IPointReader[] unseqReaders;
+    private TsFileResource currFile;
+    private boolean isLastFile;
+    private int taskNum;
+
+    private int totalSeriesNum;
+
+    public MergeChunkHeapTask(PriorityQueue<Integer> chunkIdxHeap,
+        MetaListEntry[] metaListEntries, int[] ptWrittens,
+        TsFileSequenceReader reader,
+        RestorableTsFileIOWriter mergeFileWriter,
+        IPointReader[] unseqReaders, TsFileResource currFile, boolean isLastFile, int taskNum) {
+      this.chunkIdxHeap = chunkIdxHeap;
+      this.metaListEntries = metaListEntries;
+      this.ptWrittens = ptWrittens;
+      this.reader = reader;
+      this.mergeFileWriter = mergeFileWriter;
+      this.unseqReaders = unseqReaders;
+      this.currFile = currFile;
+      this.isLastFile = isLastFile;
+      this.taskNum = taskNum;
+      this.totalSeriesNum = chunkIdxHeap.size();
+    }
+
+    @Override
+    public Void call() throws Exception {
+      mergeChunkHeap();
+      return null;
+    }
+
+    @SuppressWarnings("java:S2445") // avoid reading the same reader concurrently
+    private void mergeChunkHeap() throws IOException {
+      while (!chunkIdxHeap.isEmpty()) {
+        int pathIdx = chunkIdxHeap.poll();
+        Path path = currMergingPaths.get(pathIdx);
+        MeasurementSchema measurementSchema = resource.getSchema(path);
+        IChunkWriter chunkWriter = resource.getChunkWriter(measurementSchema);
+        if (Thread.interrupted()) {
+          Thread.currentThread().interrupt();
+          return;
+        }
+
+        if (metaListEntries[pathIdx] != null) {
+          MetaListEntry metaListEntry = metaListEntries[pathIdx];
+          ChunkMetadata currMeta = metaListEntry.current();
+          boolean isLastChunk = !metaListEntry.hasNext();
+          boolean chunkOverflowed = MergeUtils
+              .isChunkOverflowed(currTimeValuePairs[pathIdx], currMeta);
+          boolean chunkTooSmall = MergeUtils
+              .isChunkTooSmall(ptWrittens[pathIdx], currMeta, isLastChunk, minChunkPointNum);
+
+          Chunk chunk;
+          synchronized (reader) {
+            chunk = reader.readMemChunk(currMeta);
+          }
+          ptWrittens[pathIdx] = mergeChunkV2(currMeta, chunkOverflowed, chunkTooSmall, chunk,
+              ptWrittens[pathIdx], pathIdx, mergeFileWriter, unseqReaders[pathIdx], chunkWriter,
+              currFile);
+
+          if (!isLastChunk) {
+            metaListEntry.next();
+            chunkIdxHeap.add(pathIdx);
+            continue;
+          }
+        }
+        // this only happens when the seqFiles do not contain this series, otherwise the remaining
+        // data will be merged with the last chunk in the seqFiles
+        if (isLastFile && currTimeValuePairs[pathIdx] != null) {
+          ptWrittens[pathIdx] += writeRemainingUnseq(chunkWriter, unseqReaders[pathIdx],
+              Long.MAX_VALUE,
+              pathIdx);
+          mergedChunkNum.incrementAndGet();
+        }
+        // the last merged chunk may still be smaller than the threshold, flush it anyway
+        if (ptWrittens[pathIdx] > 0) {
+          synchronized (mergeFileWriter) {
+            chunkWriter.writeToFileWriter(mergeFileWriter);
+          }
+        }
+      }
+    }
+
+    public String getStorageGroupName() {
+      return storageGroupName;
+    }
+
+    public String getTaskName() {
+      return taskName + "_" + taskNum;
+    }
+
+    public String getProgress() {
+      return String.format("Processed %d/%d series", totalSeriesNum - chunkIdxHeap.size(),
+          totalSeriesNum);
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
index 6d1a6fc..68476aa 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
@@ -67,6 +67,11 @@ public class MergeTask implements Callable<Void> {
   String taskName;
   boolean fullMerge;
 
+  States states = States.START;
+
+  MergeMultiChunkTask chunkTask;
+  MergeFileTask fileTask;
+
   MergeTask(List<TsFileResource> seqFiles,
       List<TsFileResource> unseqFiles, String storageGroupSysDir, MergeCallback callback,
       String taskName, boolean fullMerge, String storageGroupName) {
@@ -96,15 +101,19 @@ public class MergeTask implements Callable<Void> {
       doMerge();
     } catch (Exception e) {
       logger.error("Runtime exception in merge {}", taskName, e);
-      cleanUp(false);
-      // call the callback to make sure the StorageGroup exit merging status, but passing 2
-      // empty file lists to avoid files being deleted.
-      callback.call(Collections.emptyList(), Collections.emptyList(), new File(storageGroupSysDir, MergeLogger.MERGE_LOG_NAME));
-      throw e;
+      abort();
     }
     return null;
   }
 
+  private void abort() throws IOException {
+    states = States.ABORTED;
+    cleanUp(false);
+    // call the callback to make sure the StorageGroup exit merging status, but passing 2
+    // empty file lists to avoid files being deleted.
+    callback.call(Collections.emptyList(), Collections.emptyList(), new File(storageGroupSysDir, MergeLogger.MERGE_LOG_NAME));
+  }
+
   private void doMerge() throws IOException, MetadataException {
     if (logger.isInfoEnabled()) {
       logger.info("{} starts to merge {} seqFiles, {} unseqFiles", taskName,
@@ -132,14 +141,30 @@ public class MergeTask implements Callable<Void> {
 
     mergeLogger.logMergeStart();
 
-    MergeMultiChunkTask mergeChunkTask = new MergeMultiChunkTask(mergeContext, taskName, mergeLogger, resource,
-        fullMerge, unmergedSeries, concurrentMergeSeriesNum);
-    mergeChunkTask.mergeSeries();
+    chunkTask = new MergeMultiChunkTask(mergeContext, taskName, mergeLogger, resource,
+        fullMerge, unmergedSeries, concurrentMergeSeriesNum, storageGroupName);
+    states = States.MERGE_CHUNKS;
+    chunkTask.mergeSeries();
+    if (Thread.interrupted()) {
+      logger.info("Merge task {} aborted", taskName);
+      abort();
+      return;
+    }
+
 
-    MergeFileTask mergeFileTask = new MergeFileTask(taskName, mergeContext, mergeLogger, resource,
+    fileTask = new MergeFileTask(taskName, mergeContext, mergeLogger, resource,
         resource.getSeqFiles());
-    mergeFileTask.mergeFiles();
+    states = States.MERGE_FILES;
+    chunkTask = null;
+    fileTask.mergeFiles();
+    if (Thread.interrupted()) {
+      logger.info("Merge task {} aborted", taskName);
+      abort();
+      return;
+    }
 
+    states = States.CLEAN_UP;
+    fileTask = null;
     cleanUp(true);
     if (logger.isInfoEnabled()) {
       double elapsedTime = (double) (System.currentTimeMillis() - startTime) / 1000.0;
@@ -183,4 +208,36 @@ public class MergeTask implements Callable<Void> {
       logFile.delete();
     }
   }
+
+  public String getStorageGroupName() {
+    return storageGroupName;
+  }
+
+  enum States {
+    START,
+    MERGE_CHUNKS,
+    MERGE_FILES,
+    CLEAN_UP,
+    ABORTED
+  }
+
+  public String getProgress() {
+    switch (states) {
+      case ABORTED:
+        return "Aborted";
+      case CLEAN_UP:
+        return "Cleaning up";
+      case MERGE_FILES:
+        return "Merging files: " + fileTask.getProgress();
+      case MERGE_CHUNKS:
+        return "Merging series: " + chunkTask.getProgress();
+      case START:
+      default:
+        return "Just started";
+    }
+  }
+
+  public String getTaskName() {
+    return taskName;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
index c0beb3d..9857d40 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
@@ -104,7 +104,7 @@ public class RecoverMergeTask extends MergeTask {
       }
 
       MergeMultiChunkTask mergeChunkTask = new MergeMultiChunkTask(mergeContext, taskName, mergeLogger, resource,
-          fullMerge, analyzer.getUnmergedPaths(), concurrentMergeSeriesNum);
+          fullMerge, analyzer.getUnmergedPaths(), concurrentMergeSeriesNum, storageGroupName);
       analyzer.setUnmergedPaths(null);
       mergeChunkTask.mergeSeries();
 
@@ -158,9 +158,8 @@ public class RecoverMergeTask extends MergeTask {
       long maxChunkNum = chunkNums[1];
       long fileMetaSize = MergeUtils.getFileMetaSize(seqFile, resource.getFileReader(seqFile));
       long newSingleSeriesSeqReadCost =  fileMetaSize * maxChunkNum / totalChunkNum;
-      singleSeriesSeqReadCost = newSingleSeriesSeqReadCost > singleSeriesSeqReadCost ?
-          newSingleSeriesSeqReadCost : singleSeriesSeqReadCost;
-      maxSeqReadCost = fileMetaSize > maxSeqReadCost ? fileMetaSize : maxSeqReadCost;
+      singleSeriesSeqReadCost = Math.max(newSingleSeriesSeqReadCost, singleSeriesSeqReadCost);
+      maxSeqReadCost = Math.max(fileMetaSize, maxSeqReadCost);
       seqWriteCost += fileMetaSize;
     }
 
@@ -169,10 +168,8 @@ public class RecoverMergeTask extends MergeTask {
     int ub = MaxSeriesMergeFileSelector.MAX_SERIES_NUM;
     int mid = (lb + ub) / 2;
     while (mid != lb) {
-      long unseqCost = singleSeriesUnseqCost * mid < maxUnseqCost ? singleSeriesUnseqCost * mid :
-          maxUnseqCost;
-      long seqReadCos = singleSeriesSeqReadCost * mid < maxSeqReadCost ?
-          singleSeriesSeqReadCost * mid : maxSeqReadCost;
+      long unseqCost = Math.min(singleSeriesUnseqCost * mid, maxUnseqCost);
+      long seqReadCos = Math.min(singleSeriesSeqReadCost * mid, maxSeqReadCost);
       long totalCost = unseqCost + seqReadCos + seqWriteCost;
       if (totalCost <= memBudget) {
         lb = mid;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/Planner.java b/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
index 68031e9..3b31bbe 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
@@ -98,6 +98,8 @@ public class Planner {
       case FLUSH:
       case MERGE:
       case CLEAR_CACHE:
+      case NULL:
+      case SHOW_MERGE_STATUS:
         return operator;
       case QUERY:
       case UPDATE:
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
index 468f286..1acf1a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
@@ -150,6 +150,8 @@ public class SQLConstant {
   public static final int TOK_LOAD_CONFIGURATION_GLOBAL = 85;
   public static final int TOK_LOAD_CONFIGURATION_LOCAL = 86;
 
+  public static final int TOK_SHOW_MERGE_STATUS = 87;
+
   public static final Map<Integer, String> tokenSymbol = new HashMap<>();
   public static final Map<Integer, String> tokenNames = new HashMap<>();
   public static final Map<Integer, Integer> reverseWords = new HashMap<>();
@@ -217,6 +219,8 @@ public class SQLConstant {
     tokenNames.put(TOK_LOAD_FILES, "TOK_LOAD_FILES");
     tokenNames.put(TOK_REMOVE_FILE, "TOK_REMOVE_FILE");
     tokenNames.put(TOK_MOVE_FILE, "TOK_MOVE_FILE");
+
+    tokenNames.put(TOK_SHOW_MERGE_STATUS, "TOK_SHOW_MERGE_STATUS");
   }
 
   static {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index cce6140..f77ef78 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -18,15 +18,20 @@
  */
 package org.apache.iotdb.db.qp.executor;
 
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CANCELLED;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CHILD_PATHS;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COLUMN;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COUNT;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CREATED_TIME;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_DEVICES;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_DONE;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ITEM;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PARAMETER;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PRIVILEGE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PROGRESS;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ROLE;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_STORAGE_GROUP;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TASK_NAME;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES_ALIAS;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES_COMPRESSION;
@@ -48,6 +53,7 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeSet;
 
@@ -67,6 +73,8 @@ import org.apache.iotdb.db.engine.cache.ChunkCache;
 import org.apache.iotdb.db.engine.cache.ChunkMetadataCache;
 import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
 import org.apache.iotdb.db.engine.flush.pool.FlushTaskPoolManager;
+import org.apache.iotdb.db.engine.merge.manage.MergeManager;
+import org.apache.iotdb.db.engine.merge.manage.MergeManager.TaskStatus;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.DeleteFailedException;
@@ -349,6 +357,8 @@ public class PlanExecutor implements IPlanExecutor {
         return processCountNodeTimeSeries((CountPlan) showPlan);
       case COUNT_NODES:
         return processCountNodes((CountPlan) showPlan);
+      case MERGE_STATUS:
+        return processShowMergeStatus();
       default:
         throw new QueryProcessException(String.format("Unrecognized show plan %s", showPlan));
     }
@@ -1493,4 +1503,43 @@ public class PlanExecutor implements IPlanExecutor {
   protected void loadConfiguration(LoadConfigurationPlan plan) throws QueryProcessException {
     IoTDBDescriptor.getInstance().loadHotModifiedProps();
   }
+
+  private QueryDataSet processShowMergeStatus() {
+    List<Path> headerList = new ArrayList<>();
+    List<TSDataType> typeList = new ArrayList<>();
+    headerList.add(new Path(COLUMN_STORAGE_GROUP));
+    headerList.add(new Path(COLUMN_TASK_NAME));
+    headerList.add(new Path(COLUMN_CREATED_TIME));
+    headerList.add(new Path(COLUMN_PROGRESS));
+    headerList.add(new Path(COLUMN_CANCELLED));
+    headerList.add(new Path(COLUMN_DONE));
+
+    typeList.add(TSDataType.TEXT);
+    typeList.add(TSDataType.TEXT);
+    typeList.add(TSDataType.TEXT);
+    typeList.add(TSDataType.TEXT);
+    typeList.add(TSDataType.BOOLEAN);
+    typeList.add(TSDataType.BOOLEAN);
+    ListDataSet dataSet = new ListDataSet(headerList, typeList);
+    Map<String, List<TaskStatus>>[] taskStatus = MergeManager.getINSTANCE().collectTaskStatus();
+    for (Map<String, List<TaskStatus>> statusMap : taskStatus) {
+      for (Entry<String, List<TaskStatus>> stringListEntry : statusMap.entrySet()) {
+        for (TaskStatus status : stringListEntry.getValue()) {
+          dataSet.putRecord(toRowRecord(status, stringListEntry.getKey()));
+        }
+      }
+    }
+    return dataSet;
+  }
+
+  public RowRecord toRowRecord(TaskStatus status, String storageGroup) {
+    RowRecord record = new RowRecord(0);
+    record.addField(new Binary(storageGroup), TSDataType.TEXT);
+    record.addField(new Binary(status.getTaskName()), TSDataType.TEXT);
+    record.addField(new Binary(status.getCreatedTime()), TSDataType.TEXT);
+    record.addField(new Binary(status.getProgress()), TSDataType.TEXT);
+    record.addField(status.isCancelled(), TSDataType.BOOLEAN);
+    record.addField(status.isDone(), TSDataType.BOOLEAN);
+    return record;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
index 0a849df..746326c 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
@@ -75,6 +75,7 @@ public abstract class Operator {
     LIST_USER_PRIVILEGE, LIST_ROLE_PRIVILEGE, LIST_USER_ROLES, LIST_ROLE_USERS,
     GRANT_WATERMARK_EMBEDDING, REVOKE_WATERMARK_EMBEDDING,
     TTL, DELETE_STORAGE_GROUP, LOAD_CONFIGURATION, SHOW, LOAD_FILES, REMOVE_FILE, MOVE_FILE, LAST, GROUP_BY_FILL,
-    ALTER_TIMESERIES, FLUSH, MERGE, FULL_MERGE, CLEAR_CACHE
+    ALTER_TIMESERIES, FLUSH, MERGE, FULL_MERGE, CLEAR_CACHE,
+    SHOW_MERGE_STATUS
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowMergeStatusOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowMergeStatusOperator.java
new file mode 100644
index 0000000..d10cc2e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowMergeStatusOperator.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.qp.logical.sys;
+
+import org.apache.iotdb.db.qp.logical.RootOperator;
+
+public class ShowMergeStatusOperator extends RootOperator {
+
+  public ShowMergeStatusOperator(int tokenIntType) {
+    super(tokenIntType);
+    setOperatorType(OperatorType.SHOW_MERGE_STATUS);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowMergeStatusPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowMergeStatusPlan.java
new file mode 100644
index 0000000..51c2d52
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowMergeStatusPlan.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.qp.physical.sys;
+
+public class ShowMergeStatusPlan extends ShowPlan {
+
+  public ShowMergeStatusPlan() {
+    super(ShowContentType.MERGE_STATUS);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java
index ced1b80..bde812a 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java
@@ -50,7 +50,7 @@ public class ShowPlan extends PhysicalPlan {
 
   public enum ShowContentType {
     DYNAMIC_PARAMETER, FLUSH_TASK_INFO, TTL, VERSION, TIMESERIES, STORAGE_GROUP, CHILD_PATH, DEVICES,
-    COUNT_TIMESERIES, COUNT_NODE_TIMESERIES, COUNT_NODES
+    COUNT_TIMESERIES, COUNT_NODE_TIMESERIES, COUNT_NODES, MERGE_STATUS
   }
 
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
index 43e4c4a..8048f86 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
@@ -65,6 +65,7 @@ import org.apache.iotdb.db.qp.logical.sys.SetStorageGroupOperator;
 import org.apache.iotdb.db.qp.logical.sys.SetTTLOperator;
 import org.apache.iotdb.db.qp.logical.sys.ShowChildPathsOperator;
 import org.apache.iotdb.db.qp.logical.sys.ShowDevicesOperator;
+import org.apache.iotdb.db.qp.logical.sys.ShowMergeStatusOperator;
 import org.apache.iotdb.db.qp.logical.sys.ShowOperator;
 import org.apache.iotdb.db.qp.logical.sys.ShowTTLOperator;
 import org.apache.iotdb.db.qp.logical.sys.ShowTimeSeriesOperator;
@@ -141,6 +142,7 @@ import org.apache.iotdb.db.qp.strategy.SqlBaseParser.SetTTLStatementContext;
 import org.apache.iotdb.db.qp.strategy.SqlBaseParser.ShowAllTTLStatementContext;
 import org.apache.iotdb.db.qp.strategy.SqlBaseParser.ShowChildPathsContext;
 import org.apache.iotdb.db.qp.strategy.SqlBaseParser.ShowDevicesContext;
+import org.apache.iotdb.db.qp.strategy.SqlBaseParser.ShowMergeStatusContext;
 import org.apache.iotdb.db.qp.strategy.SqlBaseParser.ShowStorageGroupContext;
 import org.apache.iotdb.db.qp.strategy.SqlBaseParser.ShowTTLStatementContext;
 import org.apache.iotdb.db.qp.strategy.SqlBaseParser.ShowTimeseriesContext;
@@ -1610,4 +1612,10 @@ public class LogicalGenerator extends SqlBaseBaseListener {
               String.format("encoding %s does not support %s", tsEncoding, tsDataType));
     }
   }
+
+  @Override
+  public void enterShowMergeStatus(ShowMergeStatusContext ctx) {
+    super.enterShowMergeStatus(ctx);
+    initializedOperator = new ShowMergeStatusOperator(SQLConstant.TOK_SHOW_MERGE_STATUS);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index 644b99b..abd2d23 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -82,6 +82,7 @@ import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowChildPathsPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowMergeStatusPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowPlan.ShowContentType;
 import org.apache.iotdb.db.qp.physical.sys.ShowTTLPlan;
@@ -271,6 +272,8 @@ public class PhysicalGenerator {
             OperatorType.MOVE_FILE);
       case CLEAR_CACHE:
         return new ClearCachePlan();
+      case SHOW_MERGE_STATUS:
+        return new ShowMergeStatusPlan();
       default:
         throw new LogicalOperatorException(operator.getType().toString(), "");
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/RPCService.java b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
index f3edd9f..2ae9928 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
@@ -94,8 +94,8 @@ public class RPCService implements RPCServiceMBean, IService {
 
   @Override
   public void start() throws StartupException {
-      JMXService.registerMBean(getInstance(), mbeanName);
-      startService();
+    JMXService.registerMBean(getInstance(), mbeanName);
+    startService();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
index e1498bc..ad1569b 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
@@ -36,7 +36,7 @@ public enum ServiceType {
   FILE_READER_MANAGER_SERVICE("File reader manager ServerService", ""),
   SYNC_SERVICE("SYNC ServerService", ""),
   UPGRADE_SERVICE("UPGRADE DataService", ""),
-  MERGE_SERVICE("Merge Manager", ""),
+  MERGE_SERVICE("Merge Manager", "Merge Manager"),
   PERFORMANCE_STATISTIC_SERVICE("PERFORMANCE_STATISTIC_SERVICE", "PERFORMANCE_STATISTIC_SERVICE"),
   MANAGE_DYNAMIC_PARAMETERS_SERVICE("Manage Dynamic Parameters", "Manage Dynamic Parameters"),
   TVLIST_ALLOCATOR_SERVICE("TVList Allocator", ""),
diff --git a/server/src/main/java/org/apache/iotdb/db/service/StaticResps.java b/server/src/main/java/org/apache/iotdb/db/service/StaticResps.java
index 26fe5f8..6b0a0c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/StaticResps.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/StaticResps.java
@@ -19,15 +19,20 @@
 
 package org.apache.iotdb.db.service;
 
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CANCELLED;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CHILD_PATHS;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COLUMN;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COUNT;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CREATED_TIME;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_DEVICES;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_DONE;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ITEM;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PARAMETER;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PRIVILEGE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PROGRESS;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ROLE;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_STORAGE_GROUP;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TASK_NAME;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES_COMPRESSION;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES_DATATYPE;
@@ -123,6 +128,13 @@ class StaticResps {
       Arrays.asList(TSDataType.TEXT.toString(), TSDataType.TEXT.toString()), false
   );
 
+  static final TSExecuteStatementResp MERGE_STATUS_RESP = getNoTimeExecuteResp(
+      Arrays.asList(COLUMN_STORAGE_GROUP, COLUMN_TASK_NAME, COLUMN_CREATED_TIME, COLUMN_PROGRESS,
+          COLUMN_CANCELLED, COLUMN_DONE),
+      Arrays.asList(TSDataType.TEXT.toString(), TSDataType.TEXT.toString(),
+          TSDataType.TEXT.toString(),
+          TSDataType.TEXT.toString(), TSDataType.BOOLEAN.toString(), TSDataType.BOOLEAN.toString()));
+
   private static TSExecuteStatementResp getNoTimeExecuteResp(List<String> columns,
       List<String> dataTypes) {
     return getExecuteResp(columns, dataTypes, true);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 7ae24a9..fe79492 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -633,6 +633,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         return StaticResps.COUNT_NODES;
       case COUNT_TIMESERIES:
         return StaticResps.COUNT_TIMESERIES;
+      case MERGE_STATUS:
+        return StaticResps.MERGE_STATUS_RESP;
       default:
         logger.error("Unsupported show content type: {}", showPlan.getShowContentType());
         throw new QueryProcessException(
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java
new file mode 100644
index 0000000..abe7660
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.PriorityQueue;
+import org.apache.iotdb.db.engine.merge.manage.MergeManager;
+import org.apache.iotdb.db.engine.merge.task.MergeMultiChunkTask;
+import org.apache.iotdb.db.engine.merge.task.MergeTask;
+import org.junit.Test;
+
+public class MergeManagerTest extends MergeTest {
+
+  @Test
+  public void testGenMergeReport() {
+    FakedMergeMultiChunkTask chunkTask = new FakedMergeMultiChunkTask();
+    for (int i = 0; i < 5; i++) {
+      MergeManager.getINSTANCE().submitMainTask(new FakedMainMergeTask(i));
+      MergeManager.getINSTANCE().submitChunkSubTask(chunkTask.createSubTask(i));
+    }
+
+    String report = MergeManager.getINSTANCE().genMergeTaskReport();
+    checkReport(report);
+  }
+
+  @Test
+  public void testAbortMerge() {
+    FakedMergeMultiChunkTask chunkTask = new FakedMergeMultiChunkTask();
+    for (int i = 0; i < 5; i++) {
+      MergeManager.getINSTANCE().submitMainTask(new FakedMainMergeTask(i));
+      MergeManager.getINSTANCE().submitChunkSubTask(chunkTask.createSubTask(i));
+    }
+
+    MergeManager.getINSTANCE().abortMerge("non-exist");
+    String report = MergeManager.getINSTANCE().genMergeTaskReport();
+
+    checkReport(report);
+
+    MergeManager.getINSTANCE().abortMerge("test");
+    report = MergeManager.getINSTANCE().genMergeTaskReport();
+    assertEquals(String.format("Main tasks:%n"
+        + "Sub tasks:%n"), report);
+  }
+
+  private void checkReport(String report) {
+    String[] split = report.split(System.lineSeparator());
+    assertEquals("Main tasks:", split[0]);
+    assertEquals("\tStorage group: test", split[1]);
+    for (int i = 0; i < 5; i++) {
+      assertTrue(split[2 + i].contains("task" + i));
+      assertTrue(split[2 + i].contains("0,"));
+      assertTrue(split[2 + i].contains("done:false"));
+      assertTrue(split[2 + i].contains("cancelled:false"));
+    }
+    assertEquals("Sub tasks:", split[7]);
+    assertEquals("\tStorage group: test", split[8]);
+    for (int i = 0; i < 5; i++) {
+      assertTrue(split[9 + i].contains("task" + i));
+      assertTrue(split[9 + i].contains("0,"));
+      assertTrue(split[9 + i].contains("done:false"));
+      assertTrue(split[9 + i].contains("cancelled:false"));
+    }
+  }
+
+  static class FakedMainMergeTask extends MergeTask {
+
+    private int serialNum;
+    private String progress = "0";
+
+    public FakedMainMergeTask(int serialNum) {
+      super(null, null, null, null, false,
+          0,
+          null);
+      this.serialNum = serialNum;
+    }
+
+    @Override
+    public Void call() {
+      while (!Thread.currentThread().isInterrupted()) {
+        // wait until interrupt
+      }
+      progress = "1";
+      return null;
+    }
+
+    @Override
+    public String getStorageGroupName() {
+      return "test";
+    }
+
+    @Override
+    public String getProgress() {
+      return progress;
+    }
+
+    @Override
+    public String getTaskName() {
+      return "task" + serialNum;
+    }
+  }
+
+  static class FakedMergeMultiChunkTask extends MergeMultiChunkTask {
+
+    public FakedMergeMultiChunkTask() {
+      super(null, null, null, null, false, null,
+          0, null);
+    }
+
+    public MergeChunkHeapTask createSubTask(int serialNum) {
+      return new FakedSubMergeTask(serialNum);
+    }
+
+    class FakedSubMergeTask extends MergeChunkHeapTask {
+
+      private int serialNum;
+      private String progress = "0";
+
+      public FakedSubMergeTask(int serialNum) {
+        super(new PriorityQueue<>(), null, null, null, null, null, null, false, serialNum);
+        this.serialNum = serialNum;
+      }
+
+      @Override
+      public Void call() {
+        while (!Thread.currentThread().isInterrupted()) {
+          // wait until interrupt
+        }
+        progress = "1";
+        return null;
+      }
+
+      @Override
+      public String getStorageGroupName() {
+        return "test";
+      }
+
+      @Override
+      public String getProgress() {
+        return progress;
+      }
+
+      @Override
+      public String getTaskName() {
+        return "task" + serialNum;
+      }
+    }
+  }
+
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java
index 35d402f..e2313b4 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java
@@ -28,7 +28,6 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.jdbc.Config;
 import org.junit.After;
@@ -169,7 +168,7 @@ public class IoTDBMergeTest {
   }
 
   @Test
-  public void testCrossPartition() throws SQLException, StorageEngineException {
+  public void testCrossPartition() throws SQLException {
     logger.info("testCrossPartition...");
     try (Connection connection = DriverManager
         .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
@@ -229,4 +228,48 @@ public class IoTDBMergeTest {
       assertEquals(10000, cnt);
     }
   }
+
+  @Test
+  public void testShowMergeStatus() throws SQLException {
+    try (Connection connection = DriverManager
+        .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      statement.execute("SET STORAGE GROUP TO root.mergeTest");
+      for (int i = 1; i <= 3; i++) {
+        try {
+          statement.execute("CREATE TIMESERIES root.mergeTest.s" + i + " WITH DATATYPE=INT64,"
+              + "ENCODING=PLAIN");
+        } catch (SQLException e) {
+          // ignore
+        }
+      }
+
+      for (int j = 1; j <= 10; j++) {
+        statement.execute(String.format("INSERT INTO root.mergeTest(timestamp,s1,s2,s3) VALUES (%d,%d,"
+            + "%d,%d)", j, j+1, j+2, j+3));
+      }
+      statement.execute("FLUSH");
+      for (int j = 1; j <= 10; j++) {
+        statement.execute(String.format("INSERT INTO root.mergeTest(timestamp,s1,s2,s3) VALUES (%d,%d,"
+            + "%d,%d)", j, j+10, j+20, j+30));
+      }
+      statement.execute("FLUSH");
+      statement.execute("MERGE");
+
+      int cnt;
+      try (ResultSet resultSet = statement.executeQuery("SHOW MERGE STATUS")) {
+        cnt = 0;
+        int colNum = resultSet.getMetaData().getColumnCount();
+        while (resultSet.next()) {
+          StringBuilder stringBuilder = new StringBuilder();
+          for (int i = 0; i < colNum; i++) {
+            stringBuilder.append(resultSet.getString(i + 1)).append(",");
+          }
+          System.out.println(stringBuilder.toString());
+          cnt++;
+        }
+      }
+      assertEquals(1, cnt);
+    }
+  }
 }