You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/07/24 03:35:33 UTC

[incubator-iotdb] 01/01: reconstruct flush pool class

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch manage_flush_pool
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit fa888914fdbc3d01f297fa7572bfd6461931a5d8
Author: lta <li...@163.com>
AuthorDate: Wed Jul 24 11:34:32 2019 +0800

    reconstruct flush pool class
---
 .../{storagegroup => flush}/FlushManager.java      |  32 ++++++-
 .../{memtable => flush}/MemTableFlushTask.java     |   7 +-
 .../{memtable => flush}/NotifyFlushMemTable.java   |   5 +-
 .../pool/AbstractPoolManager.java}                 |  65 ++++++-------
 .../engine/flush/pool/FlushSubTaskPoolManager.java |  79 ++++++++++++++++
 .../db/engine/flush/pool/FlushTaskPoolManager.java |  77 +++++++++++++++
 .../iotdb/db/engine/memtable/AbstractMemTable.java |   3 +-
 .../iotdb/db/engine/pool/FlushPoolManager.java     | 104 ---------------------
 .../db/engine/storagegroup/TsFileProcessor.java    |  19 ++--
 .../java/org/apache/iotdb/db/service/IoTDB.java    |   2 +
 .../org/apache/iotdb/db/service/ServiceType.java   |   3 +-
 .../iotdb/db/tools/MemEst/MemEstToolCmd.java       |   6 +-
 .../writelog/recover/TsFileRecoverPerformer.java   |   2 +-
 .../db/engine/memtable/MemTableFlushTaskTest.java  |   1 +
 14 files changed, 239 insertions(+), 166 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/FlushManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
similarity index 70%
rename from server/src/main/java/org/apache/iotdb/db/engine/storagegroup/FlushManager.java
rename to server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
index 6141ab5..f996c18 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/FlushManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
@@ -16,20 +16,42 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.engine.storagegroup;
+package org.apache.iotdb.db.engine.flush;
 
 import java.util.concurrent.ConcurrentLinkedDeque;
-import org.apache.iotdb.db.engine.pool.FlushPoolManager;
+import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
+import org.apache.iotdb.db.engine.flush.pool.FlushTaskPoolManager;
+import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.service.IService;
+import org.apache.iotdb.db.service.ServiceType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class FlushManager {
+public class FlushManager implements IService {
 
   private static final Logger logger = LoggerFactory.getLogger(FlushManager.class);
 
   private ConcurrentLinkedDeque<TsFileProcessor> tsFileProcessorQueue = new ConcurrentLinkedDeque<>();
 
-  private FlushPoolManager flushPool = FlushPoolManager.getInstance();
+  private FlushTaskPoolManager flushPool = FlushTaskPoolManager.getInstance();
+
+  @Override
+  public void start() throws StartupException {
+    FlushSubTaskPoolManager.getInstance().start();
+    FlushTaskPoolManager.getInstance().start();
+  }
+
+  @Override
+  public void stop() {
+    FlushSubTaskPoolManager.getInstance().stop();
+    FlushTaskPoolManager.getInstance().stop();
+  }
+
+  @Override
+  public ServiceType getID() {
+    return ServiceType.FLUSH_SERVICE;
+  }
 
   class FlushThread implements Runnable {
 
@@ -46,7 +68,7 @@ public class FlushManager {
    * Add BufferWriteProcessor to asyncTryToFlush manager
    */
   @SuppressWarnings("squid:S2445")
-  void registerTsFileProcessor(TsFileProcessor tsFileProcessor) {
+  public void registerTsFileProcessor(TsFileProcessor tsFileProcessor) {
     synchronized (tsFileProcessor) {
       if (!tsFileProcessor.isManagedByFlushManager() && tsFileProcessor.getFlushingMemTableSize() > 0) {
         logger.info("storage group {} begin to submit a flush thread, flushing memtable size: {}",
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
similarity index 97%
rename from server/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java
rename to server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index e4cddf5..b6cdb97 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -12,13 +12,16 @@
  * or implied.  See the License for the specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.engine.memtable;
+package org.apache.iotdb.db.engine.flush;
 
 import java.io.IOException;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import org.apache.iotdb.db.engine.pool.FlushSubTaskPoolManager;
+import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
+import org.apache.iotdb.db.engine.memtable.ChunkBufferPool;
+import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
 import org.apache.iotdb.db.exception.FlushRunTimeException;
 import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/NotifyFlushMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java
similarity index 85%
rename from server/src/main/java/org/apache/iotdb/db/engine/memtable/NotifyFlushMemTable.java
rename to server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java
index 1862509..295dadf 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/NotifyFlushMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java
@@ -16,8 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.engine.memtable;
+package org.apache.iotdb.db.engine.flush;
 
+import org.apache.iotdb.db.engine.memtable.AbstractMemTable;
+import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 /**
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/pool/FlushSubTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java
similarity index 53%
rename from server/src/main/java/org/apache/iotdb/db/engine/pool/FlushSubTaskPoolManager.java
rename to server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java
index 243f052..cd11ec1 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/pool/FlushSubTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java
@@ -16,50 +16,40 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.engine.pool;
+
+package org.apache.iotdb.db.engine.flush.pool;
 
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.db.concurrent.ThreadName;
-import org.apache.iotdb.db.exception.ProcessorException;
-
-public class FlushSubTaskPoolManager {
-
-  private static final int EXIT_WAIT_TIME = 60 * 1000;
-
-  private ExecutorService pool;
+import org.apache.iotdb.db.exception.StartupException;
+import org.slf4j.Logger;
 
-  private FlushSubTaskPoolManager() {
-    this.pool = IoTDBThreadPoolFactory
-        .newCachedThreadPool(ThreadName.FLUSH_SUB_TASK_SERVICE.getName());
-  }
+public abstract class AbstractPoolManager {
+  
+  private static final int WAIT_TIMEOUT = 2000;
 
-  public static FlushSubTaskPoolManager getInstance() {
-    return FlushSubTaskPoolManager.InstanceHolder.instance;
-  }
+  protected ExecutorService pool;
 
   /**
    * Block new flush submits and exit when all RUNNING THREADS AND TASKS IN THE QUEUE end.
-   *
-   * @param block if set to true, this method will wait for timeOut milliseconds.
-   * @param timeout block time out in milliseconds.
-   * @throws ProcessorException if timeOut is reached or being interrupted while waiting to exit.
    */
-  public void close(boolean block, long timeout) throws ProcessorException {
-    pool.shutdown();
-    if (block) {
+  public void close() {
+    Logger logger = getLogger();
+    pool.shutdownNow();
+    long totalWaitTime = WAIT_TIMEOUT;
+    logger.info("Waiting for {} thread pool to shut down.", getName());
+    while (!pool.isTerminated()) {
       try {
-        if (!pool.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
-          throw new ProcessorException("Flush thread pool doesn't exit after "
-              + EXIT_WAIT_TIME + " ms");
+        if (!pool.awaitTermination(WAIT_TIMEOUT, TimeUnit.MILLISECONDS)) {
+          logger.info("{} thread pool doesn't exit after {}ms.", getName(),
+              + totalWaitTime);
         }
+        totalWaitTime += WAIT_TIMEOUT;
       } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new ProcessorException("Interrupted while waiting flush thread pool to exit. ", e);
+        logger.error("Interrupted while waiting {} thread pool to exit. ", getName(), e);
       }
     }
   }
@@ -76,15 +66,6 @@ public class FlushSubTaskPoolManager {
     return ((ThreadPoolExecutor) pool).getActiveCount();
   }
 
-  private static class InstanceHolder {
-
-    private InstanceHolder() {
-      //allowed to do nothing
-    }
-
-    private static FlushSubTaskPoolManager instance = new FlushSubTaskPoolManager();
-  }
-
   public int getWaitingTasksNumber() {
     return ((ThreadPoolExecutor) pool).getQueue().size();
   }
@@ -92,4 +73,12 @@ public class FlushSubTaskPoolManager {
   public int getCorePoolSize() {
     return ((ThreadPoolExecutor) pool).getCorePoolSize();
   }
+
+  public abstract Logger getLogger();
+
+  public abstract void start();
+
+  public abstract void stop();
+
+  public abstract String getName();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java
new file mode 100644
index 0000000..448fb49
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.flush.pool;
+
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.service.IService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlushSubTaskPoolManager extends AbstractPoolManager {
+
+  private static final Logger LOGGER = LoggerFactory
+      .getLogger(FlushSubTaskPoolManager.class);
+
+  private FlushSubTaskPoolManager() {
+    this.pool = IoTDBThreadPoolFactory
+        .newCachedThreadPool(ThreadName.FLUSH_SUB_TASK_SERVICE.getName());
+  }
+
+  public static FlushSubTaskPoolManager getInstance() {
+    return FlushSubTaskPoolManager.InstanceHolder.instance;
+  }
+
+  @Override
+  public Logger getLogger() {
+    return LOGGER;
+  }
+
+  @Override
+  public String getName() {
+    return "flush sub task";
+  }
+
+  @Override
+  public void start() {
+    if (pool == null) {
+      this.pool = IoTDBThreadPoolFactory
+          .newCachedThreadPool(ThreadName.FLUSH_SUB_TASK_SERVICE.getName());
+    }
+    LOGGER.info("Flush encoding sub task manager started.");
+  }
+
+  @Override
+  public void stop() {
+    if (pool != null) {
+      close();
+      pool = null;
+    }
+    LOGGER.info("Flush encoding sub task manager stopped");
+  }
+
+  private static class InstanceHolder {
+
+    private InstanceHolder() {
+      //allowed to do nothing
+    }
+
+    private static FlushSubTaskPoolManager instance = new FlushSubTaskPoolManager();
+  }
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushTaskPoolManager.java
new file mode 100644
index 0000000..aec372c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushTaskPoolManager.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.flush.pool;
+
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlushTaskPoolManager extends AbstractPoolManager {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(FlushTaskPoolManager.class);
+
+  private FlushTaskPoolManager() {
+    int threadCnt = IoTDBDescriptor.getInstance().getConfig().getConcurrentFlushThread();
+    pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, ThreadName.FLUSH_SERVICE.getName());
+  }
+
+  public static FlushTaskPoolManager getInstance() {
+    return InstanceHolder.instance;
+  }
+
+  @Override
+  public Logger getLogger() {
+    return LOGGER;
+  }
+
+  @Override
+  public String getName() {
+    return "flush task";
+  }
+
+  @Override
+  public void start() {
+    if (pool == null) {
+      int threadCnt = IoTDBDescriptor.getInstance().getConfig().getConcurrentFlushThread();
+      pool = IoTDBThreadPoolFactory
+          .newFixedThreadPool(threadCnt, ThreadName.FLUSH_SERVICE.getName());
+    }
+    LOGGER.info("Flush task manager started.");
+  }
+
+  @Override
+  public void stop() {
+    if (pool != null) {
+      close();
+      pool = null;
+    }
+    LOGGER.info("Flush task manager stopped");
+  }
+
+  private static class InstanceHolder {
+
+    private InstanceHolder() {
+      //allowed to do nothing
+    }
+
+    private static FlushTaskPoolManager instance = new FlushTaskPoolManager();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index d25e913..a4fd2a9 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -27,9 +27,8 @@ import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.utils.MemUtils;
-import org.apache.iotdb.db.utils.TimeValuePair;
 import org.apache.iotdb.db.rescon.TVListAllocator;
+import org.apache.iotdb.db.utils.MemUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 public abstract class AbstractMemTable implements IMemTable {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/pool/FlushPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/pool/FlushPoolManager.java
deleted file mode 100644
index fcc560b..0000000
--- a/server/src/main/java/org/apache/iotdb/db/engine/pool/FlushPoolManager.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.pool;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.db.concurrent.ThreadName;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.ProcessorException;
-
-public class FlushPoolManager {
-
-  private static final int EXIT_WAIT_TIME = 60 * 1000;
-
-  private ExecutorService pool;
-  private int threadCnt;
-
-  private FlushPoolManager() {
-    IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-    this.threadCnt = config.getConcurrentFlushThread();
-    this.pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, ThreadName.FLUSH_SERVICE.getName());
-  }
-
-  public static FlushPoolManager getInstance() {
-    return InstanceHolder.instance;
-  }
-
-  /**
-   * Block new flush submits and exit when all RUNNING THREADS AND TASKS IN THE QUEUE end.
-   *
-   * @param block
-   *            if set to true, this method will wait for timeOut milliseconds.
-   * @param timeout
-   *            block time out in milliseconds.
-   * @throws ProcessorException
-   *             if timeOut is reached or being interrupted while waiting to exit.
-   */
-  public void close(boolean block, long timeout) throws ProcessorException {
-    pool.shutdown();
-    if (block) {
-      try {
-        if (!pool.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
-          throw new ProcessorException("Flush thread pool doesn't exit after "
-              + EXIT_WAIT_TIME + " ms");
-        }
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new ProcessorException("Interrupted while waiting flush thread pool to exit. ", e);
-      }
-    }
-  }
-
-  public synchronized Future<?> submit(Runnable task) {
-    return pool.submit(task);
-  }
-
-  public synchronized <T>Future<T> submit(Callable<T> task){
-    return pool.submit(task);
-  }
-
-  public int getActiveCnt() {
-    return ((ThreadPoolExecutor) pool).getActiveCount();
-  }
-
-  public int getThreadCnt() {
-    return threadCnt;
-  }
-
-  private static class InstanceHolder {
-    private InstanceHolder(){
-      //allowed to do nothing
-    }
-    private static FlushPoolManager instance = new FlushPoolManager();
-  }
-
-  public int getWaitingTasksNumber() {
-    return ((ThreadPoolExecutor) pool).getQueue().size();
-  }
-
-  public int getCorePoolSize() {
-    return ((ThreadPoolExecutor) pool).getCorePoolSize();
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index d27db69..34dcbac 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -31,11 +31,11 @@ import java.util.function.Supplier;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.adapter.CompressionRatio;
-import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.flush.FlushManager;
+import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
+import org.apache.iotdb.db.engine.flush.NotifyFlushMemTable;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.memtable.MemSeriesLazyMerger;
-import org.apache.iotdb.db.engine.memtable.MemTableFlushTask;
-import org.apache.iotdb.db.engine.memtable.NotifyFlushMemTable;
 import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
@@ -364,7 +364,7 @@ public class TsFileProcessor {
    * Take the first MemTable from the flushingMemTables and flush it. Called by a flush thread of
    * the flush manager pool
    */
-  void flushOneMemTable() {
+  public void flushOneMemTable() {
     IMemTable memTableToFlush;
     memTableToFlush = flushingMemTables.getFirst();
 
@@ -417,7 +417,8 @@ public class TsFileProcessor {
         }
         endFile();
       } catch (IOException | TsFileProcessorException e) {
-        logger.error("meet error when flush FileMetadata to {}, change system mode to read-only", tsFileResource.getFile().getAbsolutePath());
+        logger.error("meet error when flush FileMetadata to {}, change system mode to read-only",
+            tsFileResource.getFile().getAbsolutePath());
         IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
         try {
           writer.reset();
@@ -458,7 +459,7 @@ public class TsFileProcessor {
   }
 
 
-  boolean isManagedByFlushManager() {
+  public boolean isManagedByFlushManager() {
     return managedByFlushManager;
   }
 
@@ -480,11 +481,11 @@ public class TsFileProcessor {
     }
   }
 
-  void setManagedByFlushManager(boolean managedByFlushManager) {
+  public void setManagedByFlushManager(boolean managedByFlushManager) {
     this.managedByFlushManager = managedByFlushManager;
   }
 
-  int getFlushingMemTableSize() {
+  public int getFlushingMemTableSize() {
     return flushingMemTables.size();
   }
 
@@ -496,7 +497,7 @@ public class TsFileProcessor {
     return writer;
   }
 
-  String getStorageGroupName() {
+  public String getStorageGroupName() {
     return storageGroupName;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 2a9d19a..0c70887 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.adapter.IoTDBConfigDynamicAdapter;
 import org.apache.iotdb.db.cost.statistic.Measurement;
 import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.flush.FlushManager;
 import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.monitor.StatMonitor;
@@ -90,6 +91,7 @@ public class IoTDB implements IoTDBMBean {
     }
 
     initMManager();
+    registerManager.register(FlushManager.getInstance());
     registerManager.register(StorageEngine.getInstance());
     registerManager.register(MultiFileLogNodeManager.getInstance());
     registerManager.register(JMXService.getInstance());
diff --git a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
index 4f872fd..bff3e49 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
@@ -32,7 +32,8 @@ public enum ServiceType {
   FILE_READER_MANAGER_SERVICE("File reader manager ServerService", ""),
   SYNC_SERVICE("SYNC ServerService", ""),
   PERFORMANCE_STATISTIC_SERVICE("PERFORMANCE_STATISTIC_SERVICE","PERFORMANCE_STATISTIC_SERVICE"),
-  TVLIST_ALLOCATOR_SERVICE("TVList Allocator", "");
+  TVLIST_ALLOCATOR_SERVICE("TVList Allocator", ""),
+  FLUSH_SERVICE("Flush ServerService", "");
 
   private String name;
   private String jmxName;
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/MemEst/MemEstToolCmd.java b/server/src/main/java/org/apache/iotdb/db/tools/MemEst/MemEstToolCmd.java
index 9c5d96d..6159232 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/MemEst/MemEstToolCmd.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/MemEst/MemEstToolCmd.java
@@ -73,12 +73,12 @@ public class MemEstToolCmd implements Runnable {
       MManager.getInstance().clear();
 
       long sgCnt = 1;
-      long tsCnt = 1;
+      long tsCnt = 0;
       try {
         for (; sgCnt <= sgNum; sgCnt++) {
           IoTDBConfigDynamicAdapter.getInstance().addOrDeleteStorageGroup(1);
         }
-        for (; tsCnt <= tsNum; tsCnt++) {
+        for (; tsCnt < tsNum; tsCnt++) {
           IoTDBConfigDynamicAdapter.getInstance().addOrDeleteTimeSeries(1);
           if (maxTsNum == 0) {
             maxTsNumValid = tsCnt / sgNum + 1;
@@ -91,7 +91,7 @@ public class MemEstToolCmd implements Runnable {
 
       } catch (ConfigAdjusterException e) {
         if (sgCnt > sgNum) {
-          maxProcess = Math.max(maxProcess, tsCnt * 100 / tsNum);
+          maxProcess = Math.max(maxProcess, (tsCnt + 1) * 100 / tsNum);
           System.out
               .print(String.format("Memory estimation progress : %d%%\r", maxProcess));
         }
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index c6dc976..d012e3d 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -27,7 +27,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
-import org.apache.iotdb.db.engine.memtable.MemTableFlushTask;
+import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
 import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.engine.version.VersionController;
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskTest.java b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskTest.java
index e2b0155..6306338 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskTest.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
 import java.io.File;
 import java.util.concurrent.ExecutionException;
 import org.apache.iotdb.db.engine.MetadataManagerHelper;
+import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;