You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/06/28 08:15:07 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated: rename FlushTaskPool to ChunkBufferPool and add chunk buffer pool test

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
     new febd351  rename FlushTaskPool to ChunkBufferPool and add chunk buffer pool test
febd351 is described below

commit febd3518ceea9215884ec74d626192ffac09b0c7
Author: lta <li...@163.com>
AuthorDate: Fri Jun 28 16:12:45 2019 +0800

    rename FlushTaskPool to ChunkBufferPool and add chunk buffer pool test
---
 .../{FlushTaskPool.java => ChunkBufferPool.java}   | 16 ++---
 .../iotdb/db/engine/memtable/EmptyMemTable.java    | 18 +++++
 .../db/engine/memtable/MemTableFlushTaskV2.java    |  7 +-
 .../iotdb/db/engine/memtable/MemTablePool.java     |  3 +
 .../db/engine/memtable/ChunkBufferPoolTest.java    | 81 ++++++++++++++++++++++
 .../iotdb/db/engine/memtable/MemTablePoolTest.java | 23 +++++-
 .../db/utils/datastructure/LongTVListTest.java     | 18 +++++
 .../iotdb/tsfile/write/chunk/ChunkBuffer.java      |  4 +-
 8 files changed, 155 insertions(+), 15 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/FlushTaskPool.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/ChunkBufferPool.java
similarity index 88%
rename from iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/FlushTaskPool.java
rename to iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/ChunkBufferPool.java
index 7096150..e55f7d3 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/FlushTaskPool.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/ChunkBufferPool.java
@@ -24,16 +24,16 @@ import org.slf4j.LoggerFactory;
 
 /**
  * Each flush task allocates new {@linkplain ChunkBuffer} which might be very large and lead to
- * high-cost GC. In new design, we try to reuse ChunkBuffer objects by FlushTaskPool, referring to
+ * high-cost GC. In new design, we try to reuse ChunkBuffer objects by ChunkBufferPool, referring to
  * {@linkplain MemTablePool}.
  *
  * Only for TEST up to now.
  *
  * @author kangrong
  */
-public class FlushTaskPool {
+public class ChunkBufferPool {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(FlushTaskPool.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(ChunkBufferPool.class);
 
   private static final Deque<ChunkBuffer> availableChunkBuffer = new ArrayDeque<>();
 
@@ -47,7 +47,7 @@ public class FlushTaskPool {
 
   private static final int WAIT_TIME = 2000;
 
-  private FlushTaskPool() {
+  private ChunkBufferPool() {
   }
 
   public ChunkBuffer getEmptyChunkBuffer(Object applier, MeasurementSchema schema) {
@@ -90,7 +90,7 @@ public class FlushTaskPool {
       chunkBuffer.reset();
       availableChunkBuffer.push(chunkBuffer);
       availableChunkBuffer.notify();
-      LOGGER.info("a memtable returned, stack size {}", availableChunkBuffer.size());
+      LOGGER.info("a chunk buffer returned, stack size {}", availableChunkBuffer.size());
     }
   }
 
@@ -99,11 +99,11 @@ public class FlushTaskPool {
       chunkBuffer.reset();
       availableChunkBuffer.push(chunkBuffer);
       availableChunkBuffer.notify();
-      LOGGER.info("{} return a memtable, stack size {}", storageGroup, availableChunkBuffer.size());
+      LOGGER.info("{} return a chunk buffer, stack size {}", storageGroup, availableChunkBuffer.size());
     }
   }
 
-  public static FlushTaskPool getInstance() {
+  public static ChunkBufferPool getInstance() {
     return InstanceHolder.INSTANCE;
   }
 
@@ -112,6 +112,6 @@ public class FlushTaskPool {
     private InstanceHolder() {
     }
 
-    private static final FlushTaskPool INSTANCE = new FlushTaskPool();
+    private static final ChunkBufferPool INSTANCE = new ChunkBufferPool();
   }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/EmptyMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/EmptyMemTable.java
index 88d5945..631fb30 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/EmptyMemTable.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/EmptyMemTable.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.iotdb.db.engine.memtable;
 
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
index fc96fcf..feea3e4 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
@@ -15,14 +15,12 @@
 package org.apache.iotdb.db.engine.memtable;
 
 import java.io.IOException;
-import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.function.Consumer;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.engine.pool.FlushSubTaskPoolManager;
-import org.apache.iotdb.db.utils.TimeValuePair;
 import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -138,7 +136,8 @@ public class MemTableFlushTaskV2 {
             } else {
               long starTime = System.currentTimeMillis();
               Pair<TVList, MeasurementSchema> encodingMessage = (Pair<TVList, MeasurementSchema>) task;
-              ChunkBuffer chunkBuffer = FlushTaskPool.getInstance().getEmptyChunkBuffer(this, encodingMessage.right);
+              ChunkBuffer chunkBuffer = ChunkBufferPool
+                  .getInstance().getEmptyChunkBuffer(this, encodingMessage.right);
 
               IChunkWriter seriesWriter = new ChunkWriterImpl(encodingMessage.right, chunkBuffer,
                   PAGE_SIZE_THRESHOLD);
@@ -153,7 +152,7 @@ public class MemTableFlushTaskV2 {
                     memTable.getVersion(), e);
                 throw new RuntimeException(e);
               } finally {
-                FlushTaskPool.getInstance().putBack(chunkBuffer);
+                ChunkBufferPool.getInstance().putBack(chunkBuffer);
               }
               memSerializeTime += System.currentTimeMillis() - starTime;
             }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
index 31ec7ac..6972c2e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
@@ -86,6 +86,9 @@ public class MemTablePool {
     }
   }
 
+  /**
+   * Only for test
+   */
   public void putBack(IMemTable memTable, String storageGroup) {
     synchronized (availableMemTables) {
       memTable.clear();
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/ChunkBufferPoolTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/ChunkBufferPoolTest.java
new file mode 100644
index 0000000..c71bb85
--- /dev/null
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/ChunkBufferPoolTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.memtable;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.chunk.ChunkBuffer;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ChunkBufferPoolTest {
+
+  private ConcurrentLinkedQueue<ChunkBuffer> chunkBuffers;
+
+  @Before
+  public void setUp() throws Exception {
+    chunkBuffers = new ConcurrentLinkedQueue();
+    new ReturnThread().start();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+  }
+
+  @Test
+  public void testGetAndRelease() {
+    for (int i = 0; i < 50; i++) {
+      ChunkBuffer chunk = ChunkBufferPool.getInstance().getEmptyChunkBuffer("test case",
+          new MeasurementSchema("node", TSDataType.INT32, TSEncoding.PLAIN,
+              CompressionType.SNAPPY));
+      chunkBuffers.add(chunk);
+    }
+  }
+
+  class ReturnThread extends Thread {
+
+    @Override
+    public void run() {
+      while (true) {
+        ChunkBuffer chunkBuffer = chunkBuffers.poll();
+        if (chunkBuffer == null) {
+          try {
+            Thread.sleep(10);
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          }
+          continue;
+        }
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+        chunkBuffers.remove(chunkBuffer);
+        ChunkBufferPool.getInstance().putBack(chunkBuffer, "test case");
+      }
+    }
+  }
+
+
+}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTablePoolTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTablePoolTest.java
index b743207..3dbf681 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTablePoolTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTablePoolTest.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.iotdb.db.engine.memtable;
 
 import java.util.TreeMap;
@@ -22,10 +40,13 @@ public class MemTablePoolTest {
 
   @Test
   public void testGetAndRelease() {
-    for (int i = 0; i < 100; i++) {
+    long time = System.currentTimeMillis();
+    for (int i = 0; i < 50; i++) {
       IMemTable memTable = MemTablePool.getInstance().getEmptyMemTable("test case");
       memTables.add(memTable);
     }
+    time -= System.currentTimeMillis();
+    System.out.println("memtable pool use deque and synchronized consume:" + time);
   }
 
   @Test
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/utils/datastructure/LongTVListTest.java b/iotdb/src/test/java/org/apache/iotdb/db/utils/datastructure/LongTVListTest.java
index ee65a55..001db8b 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/utils/datastructure/LongTVListTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/utils/datastructure/LongTVListTest.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.iotdb.db.utils.datastructure;
 
 import java.util.ArrayList;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkBuffer.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkBuffer.java
index 8fa0d8a..290c516 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkBuffer.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkBuffer.java
@@ -99,7 +99,7 @@ public class ChunkBuffer {
     }
     this.maxTimestamp = maxTimestamp;
     int uncompressedSize = data.remaining();
-    int compressedSize = 0;
+    int compressedSize;
     int compressedPosition = 0;
     byte[] compressedBytes = null;
 
@@ -117,7 +117,7 @@ public class ChunkBuffer {
       }
     }
 
-    int headerSize = 0;
+    int headerSize;
 
     // write the page header to IOWriter
     try {