You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ka...@apache.org on 2019/06/28 03:42:37 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated: add reusable ChunkBufferPool for GC

This is an automated email from the ASF dual-hosted git repository.

kangrong pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
     new 3588300  add reusable ChunkBufferPool for GC
     new 308fbc3  Merge branch 'feature_async_close_tsfile' of github.com:apache/incubator-iotdb into feature_async_close_tsfile
3588300 is described below

commit 358830012394473f9af39df53d0894f0ff8f1056
Author: kr11 <30...@qq.com>
AuthorDate: Fri Jun 28 11:27:33 2019 +0800

    add reusable ChunkBufferPool for GC
---
 .../iotdb/db/engine/memtable/FlushTaskPool.java    | 117 +++++++++++++++++++++
 .../db/engine/memtable/MemTableFlushTaskV2.java    |   5 +-
 .../iotdb/tsfile/write/chunk/ChunkBuffer.java      |  18 +++-
 3 files changed, 137 insertions(+), 3 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/FlushTaskPool.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/FlushTaskPool.java
new file mode 100644
index 0000000..7096150
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/FlushTaskPool.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership.  The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with the License.  You may obtain
+ * a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied.  See the License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.memtable;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.tsfile.write.chunk.ChunkBuffer;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Each flush task allocates new {@linkplain ChunkBuffer} which might be very large and lead to
+ * high-cost GC. In new design, we try to reuse ChunkBuffer objects by FlushTaskPool, referring to
+ * {@linkplain MemTablePool}.
+ *
+ * Only for TEST up to now.
+ *
+ * @author kangrong
+ */
+public class FlushTaskPool {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(FlushTaskPool.class);
+
+  private static final Deque<ChunkBuffer> availableChunkBuffer = new ArrayDeque<>();
+
+  /**
+   * the number of required FlushTasks is no more than {@linkplain MemTablePool}.
+   */
+  private static final int capacity = IoTDBDescriptor.getInstance().getConfig()
+      .getMemtableNumber();
+
+  private int size = 0;
+
+  private static final int WAIT_TIME = 2000;
+
+  private FlushTaskPool() {
+  }
+
+  public ChunkBuffer getEmptyChunkBuffer(Object applier, MeasurementSchema schema) {
+    synchronized (availableChunkBuffer) {
+      if (availableChunkBuffer.isEmpty() && size < capacity) {
+        size++;
+        LOGGER.info("For fask, generated a new ChunkBuffer for {}, system ChunkBuffer size: {}, stack size: {}",
+            applier, size, availableChunkBuffer.size());
+        return new ChunkBuffer(schema);
+      } else if (!availableChunkBuffer.isEmpty()) {
+        LOGGER
+            .info("ReusableChunkBuffer size: {}, stack size: {}, then get a ChunkBuffer from stack for {}",
+                size, availableChunkBuffer.size(), applier);
+        ChunkBuffer chunkBuffer =  availableChunkBuffer.pop();
+        chunkBuffer.reInit(schema);
+        return chunkBuffer;
+      }
+
+      // wait until some one has released a ChunkBuffer
+      int waitCount = 1;
+      while (true) {
+        if (!availableChunkBuffer.isEmpty()) {
+          LOGGER.info(
+              "ReusableChunkBuffer size: {}, stack size: {}, then get a ChunkBuffer from stack for {}",
+              size, availableChunkBuffer.size(), applier);
+          return availableChunkBuffer.pop();
+        }
+        try {
+          availableChunkBuffer.wait(WAIT_TIME);
+        } catch (InterruptedException e) {
+          LOGGER.error("{} fails to wait fot ReusableChunkBuffer {}, continue to wait", applier, e);
+        }
+        LOGGER.info("{} has waited for a ReusableChunkBuffer for {}ms", applier, waitCount++ * WAIT_TIME);
+      }
+    }
+  }
+
+  public void putBack(ChunkBuffer chunkBuffer) {
+    synchronized (availableChunkBuffer) {
+      chunkBuffer.reset();
+      availableChunkBuffer.push(chunkBuffer);
+      availableChunkBuffer.notify();
+      LOGGER.info("a memtable returned, stack size {}", availableChunkBuffer.size());
+    }
+  }
+
+  public void putBack(ChunkBuffer chunkBuffer, String storageGroup) {
+    synchronized (availableChunkBuffer) {
+      chunkBuffer.reset();
+      availableChunkBuffer.push(chunkBuffer);
+      availableChunkBuffer.notify();
+      LOGGER.info("{} return a memtable, stack size {}", storageGroup, availableChunkBuffer.size());
+    }
+  }
+
+  public static FlushTaskPool getInstance() {
+    return InstanceHolder.INSTANCE;
+  }
+
+  private static class InstanceHolder {
+
+    private InstanceHolder() {
+    }
+
+    private static final FlushTaskPool INSTANCE = new FlushTaskPool();
+  }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
index 0df3bc0..011d3ac 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
@@ -135,7 +135,8 @@ public class MemTableFlushTaskV2 {
             } else {
               long starTime = System.currentTimeMillis();
               Pair<TVList, MeasurementSchema> encodingMessage = (Pair<TVList, MeasurementSchema>) task;
-              ChunkBuffer chunkBuffer = new ChunkBuffer(encodingMessage.right);
+              ChunkBuffer chunkBuffer = FlushTaskPool.getInstance().getEmptyChunkBuffer(this, encodingMessage.right);
+
               IChunkWriter seriesWriter = new ChunkWriterImpl(encodingMessage.right, chunkBuffer,
                   PAGE_SIZE_THRESHOLD);
               try {
@@ -146,6 +147,8 @@ public class MemTableFlushTaskV2 {
                 LOGGER.error("Storage group {} memtable {}, encoding task error.", storageGroup,
                     memTable.getVersion(), e);
                 throw new RuntimeException(e);
+              } finally {
+                FlushTaskPool.getInstance().putBack(chunkBuffer);
               }
               memSerializeTime += System.currentTimeMillis() - starTime;
             }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkBuffer.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkBuffer.java
index 4f452a6..8fa0d8a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkBuffer.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkBuffer.java
@@ -41,8 +41,8 @@ import org.slf4j.LoggerFactory;
 public class ChunkBuffer {
 
   private static final Logger LOG = LoggerFactory.getLogger(ChunkBuffer.class);
-  private final ICompressor compressor;
-  private final MeasurementSchema schema;
+  private ICompressor compressor;
+  private MeasurementSchema schema;
 
   private int numOfPages;
 
@@ -209,6 +209,17 @@ public class ChunkBuffer {
   }
 
   /**
+   * reset exist data in page for next stage.
+   */
+  public void reInit(MeasurementSchema schema) {
+    reset();
+    this.schema = schema;
+    this.compressor = ICompressor.getCompressor(schema.getCompressor());
+    numOfPages = 0;
+    maxTimestamp = 0;
+  }
+
+  /**
    * estimate max page memory size.
    *
    * @return the max possible allocated size currently
@@ -231,4 +242,7 @@ public class ChunkBuffer {
     return pageBuffer.size();
   }
 
+  public void setSchema(MeasurementSchema schema) {
+    this.schema = schema;
+  }
 }