You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/06/18 07:21:32 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated (1c85a1d -> d08e317)

This is an automated email from the ASF dual-hosted git repository.

lta pushed a change to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


    from 1c85a1d  refactor FileNodeManager and add flushmanager
     new b0db03a  modify mem table pool
     new cc54759  Merge branch 'feature_async_close_tsfile' of github.com:apache/incubator-iotdb into feature_async_close_tsfile
     new d08e317  add flush mem pool

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 13 +++
 .../engine/bufferwrite/BufferWriteProcessor.java   | 14 ++--
 .../bufferwriteV2/BufferWriteProcessorV2.java      | 46 +++++++++--
 .../db/engine/bufferwriteV2/FlushManager.java      | 67 +++++++++++++++
 .../iotdb/db/engine/filenode/FileNodeManager.java  |  4 +-
 .../db/engine/filenode/FileNodeProcessor.java      |  4 +-
 .../db/engine/filenodeV2/FileNodeManagerV2.java    | 18 ++++
 .../db/engine/filenodeV2/FileNodeProcessorV2.java  | 18 ++++
 .../iotdb/db/engine/filenodeV2/FlushManager.java   | 51 ------------
 .../iotdb/db/engine/filenodeV2/MetadataAgent.java  | 18 ++++
 .../db/engine/filenodeV2/TsFileResourceV2.java     | 18 ++++
 .../iotdb/db/engine/memtable/MemTablePool.java     | 96 ++++++++++++++--------
 .../db/engine/overflow/io/OverflowProcessor.java   |  8 +-
 .../{FlushManager.java => FlushPoolManager.java}   | 10 +--
 .../{MergeManager.java => MergePoolManager.java}   |  8 +-
 15 files changed, 276 insertions(+), 117 deletions(-)
 create mode 100644 iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/FlushManager.java
 delete mode 100644 iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FlushManager.java
 rename iotdb/src/main/java/org/apache/iotdb/db/engine/pool/{FlushManager.java => FlushPoolManager.java} (95%)
 rename iotdb/src/main/java/org/apache/iotdb/db/engine/pool/{MergeManager.java => MergePoolManager.java} (95%)


[incubator-iotdb] 01/03: modify mem table pool

Posted by lt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit b0db03a0616d0c3d27d718bd221b4450e574410e
Author: lta <li...@163.com>
AuthorDate: Mon Jun 17 21:42:09 2019 +0800

    modify mem table pool
---
 .../iotdb/db/engine/memtable/MemTablePool.java     | 58 ++++++++++------------
 1 file changed, 26 insertions(+), 32 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
index a1b8e7c..e50be7f 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
@@ -1,22 +1,23 @@
 package org.apache.iotdb.db.engine.memtable;
 
-import java.util.Stack;
-import org.apache.iotdb.tsfile.common.constant.SystemConstant;
+import java.util.ArrayDeque;
+import java.util.Deque;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class MemTablePool {
   private static final Logger LOGGER = LoggerFactory.getLogger(MemTablePool.class);
 
-  private Stack<IMemTable> emptyMemTables;
+  private Deque<IMemTable> emptyMemTables;
   // >= number of storage group * 2
   private int capacity = 20;
   private int size = 0;
+  private static final int WAIT_TIME = 2000;
 
   private static final MemTablePool INSTANCE = new MemTablePool();
 
-  public MemTablePool() {
-    emptyMemTables = new Stack<>();
+  private MemTablePool() {
+    emptyMemTables = new ArrayDeque<>();
   }
 
   public IMemTable getEmptyMemTable(Object applier) {
@@ -26,44 +27,37 @@ public class MemTablePool {
         LOGGER.info("generated a new memtable for {}, system memtable size: {}, stack size: {}",
             applier, size, emptyMemTables.size());
         return new PrimitiveMemTable();
-      } else if (!emptyMemTables.isEmpty()){
-        LOGGER.info("system memtable size: {}, stack size: {}, then get a memtable from stack for {}",
-            size, emptyMemTables.size(), applier);
+      } else if (!emptyMemTables.isEmpty()) {
+        LOGGER
+            .info("system memtable size: {}, stack size: {}, then get a memtable from stack for {}",
+                size, emptyMemTables.size(), applier);
         return emptyMemTables.pop();
       }
-    }
-    // wait until some one has released a memtable
-    long waitStartTime = System.currentTimeMillis();
-    long lastPrintIdx = 0;
-    while (true) {
-      if(!emptyMemTables.isEmpty()) {
-        synchronized (emptyMemTables) {
-          if (!emptyMemTables.isEmpty()){
-            LOGGER.info("system memtable size: {}, stack size: {}, then get a memtable from stack for {}",
-                size, emptyMemTables.size(), applier);
-            return emptyMemTables.pop();
-          }
+
+      // wait until some one has released a memtable
+      int waitCount = 1;
+      while (true) {
+        if (!emptyMemTables.isEmpty()) {
+          LOGGER.info(
+              "system memtable size: {}, stack size: {}, then get a memtable from stack for {}",
+              size, emptyMemTables.size(), applier);
+          return emptyMemTables.pop();
         }
-      }
-      try {
-        Thread.sleep(20);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        LOGGER.error("Unexpected interruption", e);
-      }
-      long waitedTime = System.currentTimeMillis() - waitStartTime;
-      if (waitedTime / 2000 > lastPrintIdx) {
-        lastPrintIdx = waitedTime / 2000;
-        LOGGER.info("{} has waited for a memtable for {}ms", applier, waitedTime);
+        try {
+          emptyMemTables.wait(WAIT_TIME);
+        } catch (InterruptedException e) {
+          LOGGER.error("{} fails to wait fot memtables {}, continue to wait", applier, e);
+        }
+        LOGGER.info("{} has waited for a memtable for {}ms", applier, waitCount * WAIT_TIME);
       }
     }
   }
 
-
   public void release(IMemTable memTable) {
     synchronized (emptyMemTables) {
       memTable.clear();
       emptyMemTables.push(memTable);
+      emptyMemTables.notify();
       LOGGER.info("a memtable returned, stack size {}", emptyMemTables.size());
     }
   }


[incubator-iotdb] 03/03: add flush mem pool

Posted by lt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit d08e317bb14e3eb225f6b793e2e1e10775216481
Author: lta <li...@163.com>
AuthorDate: Tue Jun 18 15:10:56 2019 +0800

    add flush mem pool
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 13 +++++
 .../engine/bufferwrite/BufferWriteProcessor.java   | 14 ++---
 .../bufferwriteV2/BufferWriteProcessorV2.java      | 46 +++++++++++++--
 .../db/engine/bufferwriteV2/FlushManager.java      | 67 ++++++++++++++++++++++
 .../iotdb/db/engine/filenode/FileNodeManager.java  |  4 +-
 .../db/engine/filenode/FileNodeProcessor.java      |  4 +-
 .../db/engine/filenodeV2/FileNodeManagerV2.java    | 18 ++++++
 .../db/engine/filenodeV2/FileNodeProcessorV2.java  | 18 ++++++
 .../iotdb/db/engine/filenodeV2/FlushManager.java   | 51 ----------------
 .../iotdb/db/engine/filenodeV2/MetadataAgent.java  | 18 ++++++
 .../db/engine/filenodeV2/TsFileResourceV2.java     | 18 ++++++
 .../iotdb/db/engine/memtable/MemTablePool.java     | 44 +++++++++++---
 .../db/engine/overflow/io/OverflowProcessor.java   |  8 +--
 .../{FlushManager.java => FlushPoolManager.java}   | 10 ++--
 .../{MergeManager.java => MergePoolManager.java}   |  8 +--
 15 files changed, 253 insertions(+), 88 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 275ee80..ebaa947 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -171,6 +171,11 @@ public class IoTDBConfig {
    */
   private int concurrentFlushThread = Runtime.getRuntime().availableProcessors();
 
+  /**
+   * Maximum number of active mem tables.
+   */
+  private int maxActiveMemTableSize = 100;
+
   private ZoneId zoneID = ZoneId.systemDefault();
   /**
    * BufferWriteProcessor and OverflowProcessor will immediately flushMetadata if this threshold is
@@ -829,4 +834,12 @@ public class IoTDBConfig {
   public void setRpcImplClassName(String rpcImplClassName) {
     this.rpcImplClassName = rpcImplClassName;
   }
+
+  public int getMaxActiveMemTableSize() {
+    return maxActiveMemTableSize;
+  }
+
+  public void setMaxActiveMemTableSize(int maxActiveMemTableSize) {
+    this.maxActiveMemTableSize = maxActiveMemTableSize;
+  }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
index 9bca6ca..3c74cec 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
@@ -44,7 +44,7 @@ import org.apache.iotdb.db.engine.memtable.MemSeriesLazyMerger;
 import org.apache.iotdb.db.engine.memtable.MemTableFlushTask;
 import org.apache.iotdb.db.engine.memtable.MemTablePool;
 import org.apache.iotdb.db.engine.modification.Deletion;
-import org.apache.iotdb.db.engine.pool.FlushManager;
+import org.apache.iotdb.db.engine.pool.FlushPoolManager;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.BufferWriteProcessorException;
@@ -443,18 +443,18 @@ public class BufferWriteProcessor extends Processor {
       if (isCloseTaskCalled) {
         LOGGER.info(
             "flushMetadata memtable for bufferwrite processor {} synchronously for close task.",
-            getProcessorName(), FlushManager.getInstance().getWaitingTasksNumber(),
-            FlushManager.getInstance().getCorePoolSize());
+            getProcessorName(), FlushPoolManager.getInstance().getWaitingTasksNumber(),
+            FlushPoolManager.getInstance().getCorePoolSize());
         flushTask("synchronously", tmpMemTableToFlush, version, flushId);
         flushFuture = new ImmediateFuture<>(true);
       } else {
         if (LOGGER.isInfoEnabled()) {
           LOGGER.info(
               "Begin to submit flushMetadata task for bufferwrite processor {}, current Flush Queue is {}, core pool size is {}.",
-              getProcessorName(), FlushManager.getInstance().getWaitingTasksNumber(),
-              FlushManager.getInstance().getCorePoolSize());
+              getProcessorName(), FlushPoolManager.getInstance().getWaitingTasksNumber(),
+              FlushPoolManager.getInstance().getCorePoolSize());
         }
-        flushFuture = FlushManager.getInstance().submit(() -> flushTask("asynchronously",
+        flushFuture = FlushPoolManager.getInstance().submit(() -> flushTask("asynchronously",
             tmpMemTableToFlush, version, flushId));
       }
 
@@ -482,7 +482,7 @@ public class BufferWriteProcessor extends Processor {
     try {
       // flushMetadata data (if there are flushing task, flushMetadata() will be blocked) and wait for finishing flushMetadata async
       LOGGER.info("Submit a BufferWrite ({}) close task.", getProcessorName());
-      closeFuture = new BWCloseFuture(FlushManager.getInstance().submit(() -> closeTask()));
+      closeFuture = new BWCloseFuture(FlushPoolManager.getInstance().submit(() -> closeTask()));
       //now, we omit the future of the closeTask.
     } catch (Exception e) {
       LOGGER
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/BufferWriteProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/BufferWriteProcessorV2.java
index 7214e53..4bfa2db 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/BufferWriteProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/BufferWriteProcessorV2.java
@@ -1,9 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.iotdb.db.engine.bufferwriteV2;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.concurrent.ConcurrentLinkedDeque;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.filenodeV2.TsFileResourceV2;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
@@ -24,8 +41,11 @@ public class BufferWriteProcessorV2 {
   private FileSchema fileSchema;
 
   private final String storageGroupName;
+
   private TsFileResourceV2 tsFileResource;
 
+  private volatile boolean isManagedByFlushManager;
+
   /**
    * true: to be closed
    */
@@ -33,7 +53,7 @@ public class BufferWriteProcessorV2 {
 
   private IMemTable workMemTable;
 
-  private final List<IMemTable> flushingMemTables = new ArrayList<>();
+  private final ConcurrentLinkedDeque<IMemTable> flushingMemTables = new ConcurrentLinkedDeque<>();
 
   public BufferWriteProcessorV2(String storageGroupName, File file, FileSchema fileSchema) throws IOException {
     this.storageGroupName = storageGroupName;
@@ -79,16 +99,30 @@ public class BufferWriteProcessorV2 {
    * put the workMemtable into flushing list and set null
    */
   public void flush() {
-    synchronized (flushingMemTables) {
-      flushingMemTables.add(workMemTable);
-    }
+    flushingMemTables.addLast(workMemTable);
     workMemTable = null;
   }
 
+  public void flushOneMemTable(){
+    IMemTable memTableToFlush = flushingMemTables.pollFirst();
+
+  }
+
   public boolean shouldClose() {
     long fileSize = tsFileResource.getFileSize();
     long fileSizeThreshold = IoTDBDescriptor.getInstance().getConfig().getBufferwriteFileSizeThreshold();
     return fileSize > fileSizeThreshold;
   }
 
+  public boolean isManagedByFlushManager() {
+    return isManagedByFlushManager;
+  }
+
+  public void setManagedByFlushManager(boolean managedByFlushManager) {
+    isManagedByFlushManager = managedByFlushManager;
+  }
+
+  public int getFlushingMemTableSize() {
+    return flushingMemTables.size();
+  }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/FlushManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/FlushManager.java
new file mode 100644
index 0000000..57e81c2
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/FlushManager.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.bufferwriteV2;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.iotdb.db.engine.pool.FlushPoolManager;
+
+public class FlushManager {
+
+  private ConcurrentLinkedQueue<BufferWriteProcessorV2> bwpQueue = new ConcurrentLinkedQueue<>();
+
+  private FlushPoolManager flushPool = FlushPoolManager.getInstance();
+
+  private Runnable flushAction = () -> {
+    BufferWriteProcessorV2 bwp = bwpQueue.poll();
+    bwp.flushOneMemTable();
+    bwp.setManagedByFlushManager(false);
+    addBWP(bwp);
+  };
+
+  /**
+   * Add BufferWriteProcessor to flush manager
+   */
+  private boolean addBWP(BufferWriteProcessorV2 bwp) {
+    synchronized (bwp) {
+      if (!bwp.isManagedByFlushManager() && bwp.getFlushingMemTableSize() > 0) {
+        bwpQueue.add(bwp);
+        bwp.setManagedByFlushManager(true);
+        flushPool.submit(flushAction);
+        return true;
+      }
+      return false;
+    }
+  }
+
+  private FlushManager() {
+  }
+
+  public static FlushManager getInstance() {
+    return InstanceHolder.instance;
+  }
+
+  private static class InstanceHolder {
+
+    private InstanceHolder() {
+    }
+
+    private static FlushManager instance = new FlushManager();
+  }
+
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
index f3878a0..a9a8064 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
@@ -42,7 +42,7 @@ import org.apache.iotdb.db.engine.Processor;
 import org.apache.iotdb.db.engine.bufferwrite.BufferWriteProcessor;
 import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
 import org.apache.iotdb.db.engine.overflow.io.OverflowProcessor;
-import org.apache.iotdb.db.engine.pool.FlushManager;
+import org.apache.iotdb.db.engine.pool.FlushPoolManager;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.BufferWriteProcessorException;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
@@ -1200,7 +1200,7 @@ public class FileNodeManager implements IStatistic, IService {
       // if the flushMetadata thread pool is not full ( or half full), start a new
       // flushMetadata task
       case SAFE:
-        if (FlushManager.getInstance().getActiveCnt() < 0.5 * FlushManager.getInstance()
+        if (FlushPoolManager.getInstance().getActiveCnt() < 0.5 * FlushPoolManager.getInstance()
             .getThreadCnt()) {
           try {
             flushTop(0.01f);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index fb46ebc..43086c4 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -60,7 +60,7 @@ import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.overflow.io.OverflowProcessor;
-import org.apache.iotdb.db.engine.pool.MergeManager;
+import org.apache.iotdb.db.engine.pool.MergePoolManager;
 import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSource;
 import org.apache.iotdb.db.engine.querycontext.OverflowInsertFile;
 import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
@@ -996,7 +996,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       Runnable mergeThread;
       mergeThread = new MergeRunnale();
       LOGGER.info("Submit the merge task, the merge filenode is {}", getProcessorName());
-      return MergeManager.getInstance().submit(mergeThread);
+      return MergePoolManager.getInstance().submit(mergeThread);
     } else {
       if (!isOverflowed) {
         LOGGER.info(
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
index f61149a..1e183b3 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.iotdb.db.engine.filenodeV2;
 
 import java.io.File;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index 753db83..085357e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.iotdb.db.engine.filenodeV2;
 
 import java.io.File;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FlushManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FlushManager.java
deleted file mode 100644
index a485766..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FlushManager.java
+++ /dev/null
@@ -1,51 +0,0 @@
-package org.apache.iotdb.db.engine.filenodeV2;
-
-import java.util.Deque;
-import java.util.Queue;
-import java.util.concurrent.ThreadFactory;
-import sun.nio.ch.ThreadPool;
-
-public class FlushManager {
-
-  private Queue<BWP> queue;
-
-  private ThreadPool flushPool;
-
-  private static final Object object;
-
-  public FlushManager(int n) {
-    this.flushPool = createFlushThreads(n, flushThread);
-  }
-
-  private boolean addBWP(BWP){
-    synchronized (BWP) {
-      // 对同一个BWP至多一个线程执行此操作
-      if (!BWP.isManagedByFlushManager() && BWP.taskSize() > 0) {
-        synchronized (queue) {
-          queue.add(BWP);
-        }
-        BWP.setManagedByFlushManager(true);
-        flushPool.submit(flushThread);
-//        object.notify();
-        return true;
-      }
-      return false;
-    }
-  }
-
-  Runnable flushThread = new Runnable(){
-    @Override
-    public void run() {
-//      object.wait();
-      synchronized (queue) {
-        BWP = queue.poll();
-      }
-      flushOneMemTable(BWP);
-      // 对同一个BWP至多一个线程执行此操作
-      BWP.setManagedByFlushManager(false);
-      addBWP(BWP);
-    }
-  };
-
-
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/MetadataAgent.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/MetadataAgent.java
index c9845f7..7c725f1 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/MetadataAgent.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/MetadataAgent.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.iotdb.db.engine.filenodeV2;
 
 import java.util.List;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
index 1cfd8e2..e7d981f 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.iotdb.db.engine.filenodeV2;
 
 import java.io.File;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
index e50be7f..c85719b 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
@@ -1,23 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.iotdb.db.engine.memtable;
 
 import java.util.ArrayDeque;
 import java.util.Deque;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class MemTablePool {
+
   private static final Logger LOGGER = LoggerFactory.getLogger(MemTablePool.class);
 
-  private Deque<IMemTable> emptyMemTables;
-  // >= number of storage group * 2
-  private int capacity = 20;
+  private static final Deque<IMemTable> emptyMemTables = new ArrayDeque<>();
+
+  /**
+   * >= number of storage group * 2
+   */
+  private static final int capacity = IoTDBDescriptor.getInstance().getConfig()
+      .getMaxActiveMemTableSize();
+
   private int size = 0;
-  private static final int WAIT_TIME = 2000;
 
-  private static final MemTablePool INSTANCE = new MemTablePool();
+  private static final int WAIT_TIME = 2000;
 
   private MemTablePool() {
-    emptyMemTables = new ArrayDeque<>();
   }
 
   public IMemTable getEmptyMemTable(Object applier) {
@@ -63,7 +86,14 @@ public class MemTablePool {
   }
 
   public static MemTablePool getInstance() {
-    return INSTANCE;
+    return InstanceHolder.INSTANCE;
   }
 
+  private static class InstanceHolder {
+
+    private InstanceHolder() {
+    }
+
+    private static final MemTablePool INSTANCE = new MemTablePool();
+  }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
index 3171b4c..0d8554a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
@@ -49,7 +49,7 @@ import org.apache.iotdb.db.engine.memtable.MemTablePool;
 import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
 import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
-import org.apache.iotdb.db.engine.pool.FlushManager;
+import org.apache.iotdb.db.engine.pool.FlushPoolManager;
 import org.apache.iotdb.db.engine.querycontext.MergeSeriesDataSource;
 import org.apache.iotdb.db.engine.querycontext.OverflowInsertFile;
 import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
@@ -617,12 +617,12 @@ public class OverflowProcessor extends Processor {
       IMemTable tmpMemTableToFlush = workSupport;
       workSupport = MemTablePool.getInstance().getEmptyMemTable(this);
       flushId++;
-      flushFuture = FlushManager.getInstance().submit(() -> flushTask("asynchronously",
+      flushFuture = FlushPoolManager.getInstance().submit(() -> flushTask("asynchronously",
           tmpMemTableToFlush, flushId, this::removeFlushedMemTable));
 
       // switch from work to flush
 //      switchWorkToFlush();
-//      flushFuture = FlushManager.getInstance().submit(() ->
+//      flushFuture = FlushPoolManager.getInstance().submit(() ->
 //          flushTask("asynchronously", walTaskId));
     } else {
 //      flushFuture = new ImmediateFuture(true);
@@ -795,7 +795,7 @@ public class OverflowProcessor extends Processor {
 //
 //      }else {
 //        isFlushing = true;
-////        flushFuture = FlushManager.getInstance().submit(() ->
+////        flushFuture = FlushPoolManager.getInstance().submit(() ->
 //            flushTask("asynchronously", walTaskId));
 //      }
 //    } finally {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushPoolManager.java
similarity index 95%
rename from iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushManager.java
rename to iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushPoolManager.java
index ead8e89..9632522 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushPoolManager.java
@@ -30,20 +30,20 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.ProcessorException;
 
-public class FlushManager {
+public class FlushPoolManager {
 
   private static final int EXIT_WAIT_TIME = 60 * 1000;
 
   private ExecutorService pool;
   private int threadCnt;
 
-  private FlushManager() {
+  private FlushPoolManager() {
     IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
     this.threadCnt = config.getConcurrentFlushThread();
     pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, ThreadName.FLUSH_SERVICE.getName());
   }
 
-  public static FlushManager getInstance() {
+  public static FlushPoolManager getInstance() {
     return InstanceHolder.instance;
   }
 
@@ -59,7 +59,7 @@ public class FlushManager {
     pool = Executors.newFixedThreadPool(config.getConcurrentFlushThread());
   }
 
-  public FlushManager(ExecutorService pool) {
+  public FlushPoolManager(ExecutorService pool) {
     this.pool = pool;
   }
 
@@ -134,7 +134,7 @@ public class FlushManager {
     private InstanceHolder(){
       //allowed to do nothing
     }
-    private static FlushManager instance = new FlushManager();
+    private static FlushPoolManager instance = new FlushPoolManager();
   }
 
   public int getWaitingTasksNumber() {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergePoolManager.java
similarity index 95%
rename from iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergeManager.java
rename to iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergePoolManager.java
index 44874bb..5585119 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergePoolManager.java
@@ -29,18 +29,18 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.ProcessorException;
 
-public class MergeManager {
+public class MergePoolManager {
 
   private ExecutorService pool;
   private int threadCnt;
 
-  private MergeManager() {
+  private MergePoolManager() {
     IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
     this.threadCnt = config.getMergeConcurrentThreads();
     pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, ThreadName.MERGE_SERVICE.getName());
   }
 
-  public static MergeManager getInstance() {
+  public static MergePoolManager getInstance() {
     return InstanceHolder.instance;
   }
 
@@ -121,6 +121,6 @@ public class MergeManager {
     private InstanceHolder(){
       //allowed to do nothing
     }
-    private static MergeManager instance = new MergeManager();
+    private static MergePoolManager instance = new MergePoolManager();
   }
 }


[incubator-iotdb] 02/03: Merge branch 'feature_async_close_tsfile' of github.com:apache/incubator-iotdb into feature_async_close_tsfile

Posted by lt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit cc54759d67b54904f1107709da27363ab08ef1b5
Merge: b0db03a 1c85a1d
Author: lta <li...@163.com>
AuthorDate: Tue Jun 18 13:31:15 2019 +0800

    Merge branch 'feature_async_close_tsfile' of github.com:apache/incubator-iotdb into feature_async_close_tsfile

 .../bufferwriteV2/BufferWriteProcessorV2.java      |  94 +++++++++++++
 .../db/engine/filenode/FileNodeProcessor.java      |   4 +-
 .../db/engine/filenodeV2/FileNodeManagerV2.java    | 153 +++++++++++++++++++++
 .../db/engine/filenodeV2/FileNodeProcessorV2.java  | 126 +++++++++++++++++
 .../iotdb/db/engine/filenodeV2/FlushManager.java   |  51 +++++++
 .../iotdb/db/engine/filenodeV2/MetadataAgent.java  |  33 +++++
 .../db/engine/filenodeV2/TsFileResourceV2.java     |  22 +++
 .../iotdb/db/engine/memtable/AbstractMemTable.java |  26 +++-
 .../apache/iotdb/db/engine/memtable/IMemTable.java |  19 +--
 .../db/engine/memtable/IWritableMemChunk.java      |   2 +-
 .../iotdb/db/engine/memtable/WritableMemChunk.java |   2 +-
 .../FileNodeManagerBenchmark.java                  |   2 +-
 service-rpc/src/main/thrift/sync.thrift            |   2 +-
 .../iotdb/tsfile/file/metadata/TsFileMetaData.java |   2 +-
 14 files changed, 521 insertions(+), 17 deletions(-)