You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/06/18 07:21:35 UTC

[incubator-iotdb] 03/03: add flush mem pool

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit d08e317bb14e3eb225f6b793e2e1e10775216481
Author: lta <li...@163.com>
AuthorDate: Tue Jun 18 15:10:56 2019 +0800

    add flush mem pool
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 13 +++++
 .../engine/bufferwrite/BufferWriteProcessor.java   | 14 ++---
 .../bufferwriteV2/BufferWriteProcessorV2.java      | 46 +++++++++++++--
 .../db/engine/bufferwriteV2/FlushManager.java      | 67 ++++++++++++++++++++++
 .../iotdb/db/engine/filenode/FileNodeManager.java  |  4 +-
 .../db/engine/filenode/FileNodeProcessor.java      |  4 +-
 .../db/engine/filenodeV2/FileNodeManagerV2.java    | 18 ++++++
 .../db/engine/filenodeV2/FileNodeProcessorV2.java  | 18 ++++++
 .../iotdb/db/engine/filenodeV2/FlushManager.java   | 51 ----------------
 .../iotdb/db/engine/filenodeV2/MetadataAgent.java  | 18 ++++++
 .../db/engine/filenodeV2/TsFileResourceV2.java     | 18 ++++++
 .../iotdb/db/engine/memtable/MemTablePool.java     | 44 +++++++++++---
 .../db/engine/overflow/io/OverflowProcessor.java   |  8 +--
 .../{FlushManager.java => FlushPoolManager.java}   | 10 ++--
 .../{MergeManager.java => MergePoolManager.java}   |  8 +--
 15 files changed, 253 insertions(+), 88 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 275ee80..ebaa947 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -171,6 +171,11 @@ public class IoTDBConfig {
    */
   private int concurrentFlushThread = Runtime.getRuntime().availableProcessors();
 
+  /**
+   * Maximum number of active mem tables.
+   */
+  private int maxActiveMemTableSize = 100;
+
   private ZoneId zoneID = ZoneId.systemDefault();
   /**
    * BufferWriteProcessor and OverflowProcessor will immediately flushMetadata if this threshold is
@@ -829,4 +834,12 @@ public class IoTDBConfig {
   public void setRpcImplClassName(String rpcImplClassName) {
     this.rpcImplClassName = rpcImplClassName;
   }
+
+  public int getMaxActiveMemTableSize() {
+    return maxActiveMemTableSize;
+  }
+
+  public void setMaxActiveMemTableSize(int maxActiveMemTableSize) {
+    this.maxActiveMemTableSize = maxActiveMemTableSize;
+  }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
index 9bca6ca..3c74cec 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
@@ -44,7 +44,7 @@ import org.apache.iotdb.db.engine.memtable.MemSeriesLazyMerger;
 import org.apache.iotdb.db.engine.memtable.MemTableFlushTask;
 import org.apache.iotdb.db.engine.memtable.MemTablePool;
 import org.apache.iotdb.db.engine.modification.Deletion;
-import org.apache.iotdb.db.engine.pool.FlushManager;
+import org.apache.iotdb.db.engine.pool.FlushPoolManager;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.BufferWriteProcessorException;
@@ -443,18 +443,18 @@ public class BufferWriteProcessor extends Processor {
       if (isCloseTaskCalled) {
         LOGGER.info(
             "flushMetadata memtable for bufferwrite processor {} synchronously for close task.",
-            getProcessorName(), FlushManager.getInstance().getWaitingTasksNumber(),
-            FlushManager.getInstance().getCorePoolSize());
+            getProcessorName(), FlushPoolManager.getInstance().getWaitingTasksNumber(),
+            FlushPoolManager.getInstance().getCorePoolSize());
         flushTask("synchronously", tmpMemTableToFlush, version, flushId);
         flushFuture = new ImmediateFuture<>(true);
       } else {
         if (LOGGER.isInfoEnabled()) {
           LOGGER.info(
               "Begin to submit flushMetadata task for bufferwrite processor {}, current Flush Queue is {}, core pool size is {}.",
-              getProcessorName(), FlushManager.getInstance().getWaitingTasksNumber(),
-              FlushManager.getInstance().getCorePoolSize());
+              getProcessorName(), FlushPoolManager.getInstance().getWaitingTasksNumber(),
+              FlushPoolManager.getInstance().getCorePoolSize());
         }
-        flushFuture = FlushManager.getInstance().submit(() -> flushTask("asynchronously",
+        flushFuture = FlushPoolManager.getInstance().submit(() -> flushTask("asynchronously",
             tmpMemTableToFlush, version, flushId));
       }
 
@@ -482,7 +482,7 @@ public class BufferWriteProcessor extends Processor {
     try {
       // flushMetadata data (if there are flushing task, flushMetadata() will be blocked) and wait for finishing flushMetadata async
       LOGGER.info("Submit a BufferWrite ({}) close task.", getProcessorName());
-      closeFuture = new BWCloseFuture(FlushManager.getInstance().submit(() -> closeTask()));
+      closeFuture = new BWCloseFuture(FlushPoolManager.getInstance().submit(() -> closeTask()));
       //now, we omit the future of the closeTask.
     } catch (Exception e) {
       LOGGER
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/BufferWriteProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/BufferWriteProcessorV2.java
index 7214e53..4bfa2db 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/BufferWriteProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/BufferWriteProcessorV2.java
@@ -1,9 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.iotdb.db.engine.bufferwriteV2;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.concurrent.ConcurrentLinkedDeque;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.filenodeV2.TsFileResourceV2;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
@@ -24,8 +41,11 @@ public class BufferWriteProcessorV2 {
   private FileSchema fileSchema;
 
   private final String storageGroupName;
+
   private TsFileResourceV2 tsFileResource;
 
+  private volatile boolean isManagedByFlushManager;
+
   /**
    * true: to be closed
    */
@@ -33,7 +53,7 @@ public class BufferWriteProcessorV2 {
 
   private IMemTable workMemTable;
 
-  private final List<IMemTable> flushingMemTables = new ArrayList<>();
+  private final ConcurrentLinkedDeque<IMemTable> flushingMemTables = new ConcurrentLinkedDeque<>();
 
   public BufferWriteProcessorV2(String storageGroupName, File file, FileSchema fileSchema) throws IOException {
     this.storageGroupName = storageGroupName;
@@ -79,16 +99,30 @@ public class BufferWriteProcessorV2 {
    * put the workMemtable into flushing list and set null
    */
   public void flush() {
-    synchronized (flushingMemTables) {
-      flushingMemTables.add(workMemTable);
-    }
+    flushingMemTables.addLast(workMemTable);
     workMemTable = null;
   }
 
+  public void flushOneMemTable(){
+    IMemTable memTableToFlush = flushingMemTables.pollFirst();
+
+  }
+
   public boolean shouldClose() {
     long fileSize = tsFileResource.getFileSize();
     long fileSizeThreshold = IoTDBDescriptor.getInstance().getConfig().getBufferwriteFileSizeThreshold();
     return fileSize > fileSizeThreshold;
   }
 
+  public boolean isManagedByFlushManager() {
+    return isManagedByFlushManager;
+  }
+
+  public void setManagedByFlushManager(boolean managedByFlushManager) {
+    isManagedByFlushManager = managedByFlushManager;
+  }
+
+  public int getFlushingMemTableSize() {
+    return flushingMemTables.size();
+  }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/FlushManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/FlushManager.java
new file mode 100644
index 0000000..57e81c2
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/FlushManager.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.bufferwriteV2;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.iotdb.db.engine.pool.FlushPoolManager;
+
+public class FlushManager {
+
+  private ConcurrentLinkedQueue<BufferWriteProcessorV2> bwpQueue = new ConcurrentLinkedQueue<>();
+
+  private FlushPoolManager flushPool = FlushPoolManager.getInstance();
+
+  private Runnable flushAction = () -> {
+    BufferWriteProcessorV2 bwp = bwpQueue.poll();
+    bwp.flushOneMemTable();
+    bwp.setManagedByFlushManager(false);
+    addBWP(bwp);
+  };
+
+  /**
+   * Add BufferWriteProcessor to flush manager
+   */
+  private boolean addBWP(BufferWriteProcessorV2 bwp) {
+    synchronized (bwp) {
+      if (!bwp.isManagedByFlushManager() && bwp.getFlushingMemTableSize() > 0) {
+        bwpQueue.add(bwp);
+        bwp.setManagedByFlushManager(true);
+        flushPool.submit(flushAction);
+        return true;
+      }
+      return false;
+    }
+  }
+
+  private FlushManager() {
+  }
+
+  public static FlushManager getInstance() {
+    return InstanceHolder.instance;
+  }
+
+  private static class InstanceHolder {
+
+    private InstanceHolder() {
+    }
+
+    private static FlushManager instance = new FlushManager();
+  }
+
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
index f3878a0..a9a8064 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
@@ -42,7 +42,7 @@ import org.apache.iotdb.db.engine.Processor;
 import org.apache.iotdb.db.engine.bufferwrite.BufferWriteProcessor;
 import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
 import org.apache.iotdb.db.engine.overflow.io.OverflowProcessor;
-import org.apache.iotdb.db.engine.pool.FlushManager;
+import org.apache.iotdb.db.engine.pool.FlushPoolManager;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.BufferWriteProcessorException;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
@@ -1200,7 +1200,7 @@ public class FileNodeManager implements IStatistic, IService {
       // if the flushMetadata thread pool is not full ( or half full), start a new
       // flushMetadata task
       case SAFE:
-        if (FlushManager.getInstance().getActiveCnt() < 0.5 * FlushManager.getInstance()
+        if (FlushPoolManager.getInstance().getActiveCnt() < 0.5 * FlushPoolManager.getInstance()
             .getThreadCnt()) {
           try {
             flushTop(0.01f);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index fb46ebc..43086c4 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -60,7 +60,7 @@ import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.overflow.io.OverflowProcessor;
-import org.apache.iotdb.db.engine.pool.MergeManager;
+import org.apache.iotdb.db.engine.pool.MergePoolManager;
 import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSource;
 import org.apache.iotdb.db.engine.querycontext.OverflowInsertFile;
 import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
@@ -996,7 +996,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       Runnable mergeThread;
       mergeThread = new MergeRunnale();
       LOGGER.info("Submit the merge task, the merge filenode is {}", getProcessorName());
-      return MergeManager.getInstance().submit(mergeThread);
+      return MergePoolManager.getInstance().submit(mergeThread);
     } else {
       if (!isOverflowed) {
         LOGGER.info(
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
index f61149a..1e183b3 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.iotdb.db.engine.filenodeV2;
 
 import java.io.File;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index 753db83..085357e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.iotdb.db.engine.filenodeV2;
 
 import java.io.File;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FlushManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FlushManager.java
deleted file mode 100644
index a485766..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FlushManager.java
+++ /dev/null
@@ -1,51 +0,0 @@
-package org.apache.iotdb.db.engine.filenodeV2;
-
-import java.util.Deque;
-import java.util.Queue;
-import java.util.concurrent.ThreadFactory;
-import sun.nio.ch.ThreadPool;
-
-public class FlushManager {
-
-  private Queue<BWP> queue;
-
-  private ThreadPool flushPool;
-
-  private static final Object object;
-
-  public FlushManager(int n) {
-    this.flushPool = createFlushThreads(n, flushThread);
-  }
-
-  private boolean addBWP(BWP){
-    synchronized (BWP) {
-      // 对同一个BWP至多一个线程执行此操作
-      if (!BWP.isManagedByFlushManager() && BWP.taskSize() > 0) {
-        synchronized (queue) {
-          queue.add(BWP);
-        }
-        BWP.setManagedByFlushManager(true);
-        flushPool.submit(flushThread);
-//        object.notify();
-        return true;
-      }
-      return false;
-    }
-  }
-
-  Runnable flushThread = new Runnable(){
-    @Override
-    public void run() {
-//      object.wait();
-      synchronized (queue) {
-        BWP = queue.poll();
-      }
-      flushOneMemTable(BWP);
-      // 对同一个BWP至多一个线程执行此操作
-      BWP.setManagedByFlushManager(false);
-      addBWP(BWP);
-    }
-  };
-
-
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/MetadataAgent.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/MetadataAgent.java
index c9845f7..7c725f1 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/MetadataAgent.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/MetadataAgent.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.iotdb.db.engine.filenodeV2;
 
 import java.util.List;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
index 1cfd8e2..e7d981f 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.iotdb.db.engine.filenodeV2;
 
 import java.io.File;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
index e50be7f..c85719b 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
@@ -1,23 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.iotdb.db.engine.memtable;
 
 import java.util.ArrayDeque;
 import java.util.Deque;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class MemTablePool {
+
   private static final Logger LOGGER = LoggerFactory.getLogger(MemTablePool.class);
 
-  private Deque<IMemTable> emptyMemTables;
-  // >= number of storage group * 2
-  private int capacity = 20;
+  private static final Deque<IMemTable> emptyMemTables = new ArrayDeque<>();
+
+  /**
+   * >= number of storage group * 2
+   */
+  private static final int capacity = IoTDBDescriptor.getInstance().getConfig()
+      .getMaxActiveMemTableSize();
+
   private int size = 0;
-  private static final int WAIT_TIME = 2000;
 
-  private static final MemTablePool INSTANCE = new MemTablePool();
+  private static final int WAIT_TIME = 2000;
 
   private MemTablePool() {
-    emptyMemTables = new ArrayDeque<>();
   }
 
   public IMemTable getEmptyMemTable(Object applier) {
@@ -63,7 +86,14 @@ public class MemTablePool {
   }
 
   public static MemTablePool getInstance() {
-    return INSTANCE;
+    return InstanceHolder.INSTANCE;
   }
 
+  private static class InstanceHolder {
+
+    private InstanceHolder() {
+    }
+
+    private static final MemTablePool INSTANCE = new MemTablePool();
+  }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
index 3171b4c..0d8554a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
@@ -49,7 +49,7 @@ import org.apache.iotdb.db.engine.memtable.MemTablePool;
 import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
 import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
-import org.apache.iotdb.db.engine.pool.FlushManager;
+import org.apache.iotdb.db.engine.pool.FlushPoolManager;
 import org.apache.iotdb.db.engine.querycontext.MergeSeriesDataSource;
 import org.apache.iotdb.db.engine.querycontext.OverflowInsertFile;
 import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
@@ -617,12 +617,12 @@ public class OverflowProcessor extends Processor {
       IMemTable tmpMemTableToFlush = workSupport;
       workSupport = MemTablePool.getInstance().getEmptyMemTable(this);
       flushId++;
-      flushFuture = FlushManager.getInstance().submit(() -> flushTask("asynchronously",
+      flushFuture = FlushPoolManager.getInstance().submit(() -> flushTask("asynchronously",
           tmpMemTableToFlush, flushId, this::removeFlushedMemTable));
 
       // switch from work to flush
 //      switchWorkToFlush();
-//      flushFuture = FlushManager.getInstance().submit(() ->
+//      flushFuture = FlushPoolManager.getInstance().submit(() ->
 //          flushTask("asynchronously", walTaskId));
     } else {
 //      flushFuture = new ImmediateFuture(true);
@@ -795,7 +795,7 @@ public class OverflowProcessor extends Processor {
 //
 //      }else {
 //        isFlushing = true;
-////        flushFuture = FlushManager.getInstance().submit(() ->
+////        flushFuture = FlushPoolManager.getInstance().submit(() ->
 //            flushTask("asynchronously", walTaskId));
 //      }
 //    } finally {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushPoolManager.java
similarity index 95%
rename from iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushManager.java
rename to iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushPoolManager.java
index ead8e89..9632522 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushPoolManager.java
@@ -30,20 +30,20 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.ProcessorException;
 
-public class FlushManager {
+public class FlushPoolManager {
 
   private static final int EXIT_WAIT_TIME = 60 * 1000;
 
   private ExecutorService pool;
   private int threadCnt;
 
-  private FlushManager() {
+  private FlushPoolManager() {
     IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
     this.threadCnt = config.getConcurrentFlushThread();
     pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, ThreadName.FLUSH_SERVICE.getName());
   }
 
-  public static FlushManager getInstance() {
+  public static FlushPoolManager getInstance() {
     return InstanceHolder.instance;
   }
 
@@ -59,7 +59,7 @@ public class FlushManager {
     pool = Executors.newFixedThreadPool(config.getConcurrentFlushThread());
   }
 
-  public FlushManager(ExecutorService pool) {
+  public FlushPoolManager(ExecutorService pool) {
     this.pool = pool;
   }
 
@@ -134,7 +134,7 @@ public class FlushManager {
     private InstanceHolder(){
       //allowed to do nothing
     }
-    private static FlushManager instance = new FlushManager();
+    private static FlushPoolManager instance = new FlushPoolManager();
   }
 
   public int getWaitingTasksNumber() {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergePoolManager.java
similarity index 95%
rename from iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergeManager.java
rename to iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergePoolManager.java
index 44874bb..5585119 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergePoolManager.java
@@ -29,18 +29,18 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.ProcessorException;
 
-public class MergeManager {
+public class MergePoolManager {
 
   private ExecutorService pool;
   private int threadCnt;
 
-  private MergeManager() {
+  private MergePoolManager() {
     IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
     this.threadCnt = config.getMergeConcurrentThreads();
     pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, ThreadName.MERGE_SERVICE.getName());
   }
 
-  public static MergeManager getInstance() {
+  public static MergePoolManager getInstance() {
     return InstanceHolder.instance;
   }
 
@@ -121,6 +121,6 @@ public class MergeManager {
     private InstanceHolder(){
       //allowed to do nothing
     }
-    private static MergeManager instance = new MergeManager();
+    private static MergePoolManager instance = new MergePoolManager();
   }
 }