You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ka...@apache.org on 2019/06/28 03:42:37 UTC
[incubator-iotdb] branch feature_async_close_tsfile updated: add
reusable ChunkBufferPool for GC
This is an automated email from the ASF dual-hosted git repository.
kangrong pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
new 3588300 add reusable ChunkBufferPool for GC
new 308fbc3 Merge branch 'feature_async_close_tsfile' of github.com:apache/incubator-iotdb into feature_async_close_tsfile
3588300 is described below
commit 358830012394473f9af39df53d0894f0ff8f1056
Author: kr11 <30...@qq.com>
AuthorDate: Fri Jun 28 11:27:33 2019 +0800
add reusable ChunkBufferPool for GC
---
.../iotdb/db/engine/memtable/FlushTaskPool.java | 117 +++++++++++++++++++++
.../db/engine/memtable/MemTableFlushTaskV2.java | 5 +-
.../iotdb/tsfile/write/chunk/ChunkBuffer.java | 18 +++-
3 files changed, 137 insertions(+), 3 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/FlushTaskPool.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/FlushTaskPool.java
new file mode 100644
index 0000000..7096150
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/FlushTaskPool.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with the License. You may obtain
+ * a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.memtable;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.tsfile.write.chunk.ChunkBuffer;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Each flush task allocates new {@linkplain ChunkBuffer} which might be very large and lead to
+ * high-cost GC. In new design, we try to reuse ChunkBuffer objects by FlushTaskPool, referring to
+ * {@linkplain MemTablePool}.
+ *
+ * Only for TEST up to now.
+ *
+ * @author kangrong
+ */
+public class FlushTaskPool {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(FlushTaskPool.class);
+
+ private static final Deque<ChunkBuffer> availableChunkBuffer = new ArrayDeque<>();
+
+ /**
+ * the number of required FlushTasks is no more than {@linkplain MemTablePool}.
+ */
+ private static final int capacity = IoTDBDescriptor.getInstance().getConfig()
+ .getMemtableNumber();
+
+ private int size = 0;
+
+ private static final int WAIT_TIME = 2000;
+
+ private FlushTaskPool() {
+ }
+
+ public ChunkBuffer getEmptyChunkBuffer(Object applier, MeasurementSchema schema) {
+ synchronized (availableChunkBuffer) {
+ if (availableChunkBuffer.isEmpty() && size < capacity) {
+ size++;
+ LOGGER.info("For fask, generated a new ChunkBuffer for {}, system ChunkBuffer size: {}, stack size: {}",
+ applier, size, availableChunkBuffer.size());
+ return new ChunkBuffer(schema);
+ } else if (!availableChunkBuffer.isEmpty()) {
+ LOGGER
+ .info("ReusableChunkBuffer size: {}, stack size: {}, then get a ChunkBuffer from stack for {}",
+ size, availableChunkBuffer.size(), applier);
+ ChunkBuffer chunkBuffer = availableChunkBuffer.pop();
+ chunkBuffer.reInit(schema);
+ return chunkBuffer;
+ }
+
+ // wait until some one has released a ChunkBuffer
+ int waitCount = 1;
+ while (true) {
+ if (!availableChunkBuffer.isEmpty()) {
+ LOGGER.info(
+ "ReusableChunkBuffer size: {}, stack size: {}, then get a ChunkBuffer from stack for {}",
+ size, availableChunkBuffer.size(), applier);
+ return availableChunkBuffer.pop();
+ }
+ try {
+ availableChunkBuffer.wait(WAIT_TIME);
+ } catch (InterruptedException e) {
+ LOGGER.error("{} fails to wait fot ReusableChunkBuffer {}, continue to wait", applier, e);
+ }
+ LOGGER.info("{} has waited for a ReusableChunkBuffer for {}ms", applier, waitCount++ * WAIT_TIME);
+ }
+ }
+ }
+
+ public void putBack(ChunkBuffer chunkBuffer) {
+ synchronized (availableChunkBuffer) {
+ chunkBuffer.reset();
+ availableChunkBuffer.push(chunkBuffer);
+ availableChunkBuffer.notify();
+ LOGGER.info("a memtable returned, stack size {}", availableChunkBuffer.size());
+ }
+ }
+
+ public void putBack(ChunkBuffer chunkBuffer, String storageGroup) {
+ synchronized (availableChunkBuffer) {
+ chunkBuffer.reset();
+ availableChunkBuffer.push(chunkBuffer);
+ availableChunkBuffer.notify();
+ LOGGER.info("{} return a memtable, stack size {}", storageGroup, availableChunkBuffer.size());
+ }
+ }
+
+ public static FlushTaskPool getInstance() {
+ return InstanceHolder.INSTANCE;
+ }
+
+ private static class InstanceHolder {
+
+ private InstanceHolder() {
+ }
+
+ private static final FlushTaskPool INSTANCE = new FlushTaskPool();
+ }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
index 0df3bc0..011d3ac 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
@@ -135,7 +135,8 @@ public class MemTableFlushTaskV2 {
} else {
long starTime = System.currentTimeMillis();
Pair<TVList, MeasurementSchema> encodingMessage = (Pair<TVList, MeasurementSchema>) task;
- ChunkBuffer chunkBuffer = new ChunkBuffer(encodingMessage.right);
+ ChunkBuffer chunkBuffer = FlushTaskPool.getInstance().getEmptyChunkBuffer(this, encodingMessage.right);
+
IChunkWriter seriesWriter = new ChunkWriterImpl(encodingMessage.right, chunkBuffer,
PAGE_SIZE_THRESHOLD);
try {
@@ -146,6 +147,8 @@ public class MemTableFlushTaskV2 {
LOGGER.error("Storage group {} memtable {}, encoding task error.", storageGroup,
memTable.getVersion(), e);
throw new RuntimeException(e);
+ } finally {
+ FlushTaskPool.getInstance().putBack(chunkBuffer);
}
memSerializeTime += System.currentTimeMillis() - starTime;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkBuffer.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkBuffer.java
index 4f452a6..8fa0d8a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkBuffer.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkBuffer.java
@@ -41,8 +41,8 @@ import org.slf4j.LoggerFactory;
public class ChunkBuffer {
private static final Logger LOG = LoggerFactory.getLogger(ChunkBuffer.class);
- private final ICompressor compressor;
- private final MeasurementSchema schema;
+ private ICompressor compressor;
+ private MeasurementSchema schema;
private int numOfPages;
@@ -209,6 +209,17 @@ public class ChunkBuffer {
}
/**
+ * reset exist data in page for next stage.
+ */
+ public void reInit(MeasurementSchema schema) {
+ reset();
+ this.schema = schema;
+ this.compressor = ICompressor.getCompressor(schema.getCompressor());
+ numOfPages = 0;
+ maxTimestamp = 0;
+ }
+
+ /**
* estimate max page memory size.
*
* @return the max possible allocated size currently
@@ -231,4 +242,7 @@ public class ChunkBuffer {
return pageBuffer.size();
}
+ public void setSchema(MeasurementSchema schema) {
+ this.schema = schema;
+ }
}