You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/06/28 08:15:07 UTC
[incubator-iotdb] branch feature_async_close_tsfile updated: rename
FlushTaskPool to ChunkBufferPool and add chunk buffer pool test
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
new febd351 rename FlushTaskPool to ChunkBufferPool and add chunk buffer pool test
febd351 is described below
commit febd3518ceea9215884ec74d626192ffac09b0c7
Author: lta <li...@163.com>
AuthorDate: Fri Jun 28 16:12:45 2019 +0800
rename FlushTaskPool to ChunkBufferPool and add chunk buffer pool test
---
.../{FlushTaskPool.java => ChunkBufferPool.java} | 16 ++---
.../iotdb/db/engine/memtable/EmptyMemTable.java | 18 +++++
.../db/engine/memtable/MemTableFlushTaskV2.java | 7 +-
.../iotdb/db/engine/memtable/MemTablePool.java | 3 +
.../db/engine/memtable/ChunkBufferPoolTest.java | 81 ++++++++++++++++++++++
.../iotdb/db/engine/memtable/MemTablePoolTest.java | 23 +++++-
.../db/utils/datastructure/LongTVListTest.java | 18 +++++
.../iotdb/tsfile/write/chunk/ChunkBuffer.java | 4 +-
8 files changed, 155 insertions(+), 15 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/FlushTaskPool.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/ChunkBufferPool.java
similarity index 88%
rename from iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/FlushTaskPool.java
rename to iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/ChunkBufferPool.java
index 7096150..e55f7d3 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/FlushTaskPool.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/ChunkBufferPool.java
@@ -24,16 +24,16 @@ import org.slf4j.LoggerFactory;
/**
* Each flush task allocates new {@linkplain ChunkBuffer} which might be very large and lead to
- * high-cost GC. In new design, we try to reuse ChunkBuffer objects by FlushTaskPool, referring to
+ * high-cost GC. In new design, we try to reuse ChunkBuffer objects by ChunkBufferPool, referring to
* {@linkplain MemTablePool}.
*
* Only for TEST up to now.
*
* @author kangrong
*/
-public class FlushTaskPool {
+public class ChunkBufferPool {
- private static final Logger LOGGER = LoggerFactory.getLogger(FlushTaskPool.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(ChunkBufferPool.class);
private static final Deque<ChunkBuffer> availableChunkBuffer = new ArrayDeque<>();
@@ -47,7 +47,7 @@ public class FlushTaskPool {
private static final int WAIT_TIME = 2000;
- private FlushTaskPool() {
+ private ChunkBufferPool() {
}
public ChunkBuffer getEmptyChunkBuffer(Object applier, MeasurementSchema schema) {
@@ -90,7 +90,7 @@ public class FlushTaskPool {
chunkBuffer.reset();
availableChunkBuffer.push(chunkBuffer);
availableChunkBuffer.notify();
- LOGGER.info("a memtable returned, stack size {}", availableChunkBuffer.size());
+ LOGGER.info("a chunk buffer returned, stack size {}", availableChunkBuffer.size());
}
}
@@ -99,11 +99,11 @@ public class FlushTaskPool {
chunkBuffer.reset();
availableChunkBuffer.push(chunkBuffer);
availableChunkBuffer.notify();
- LOGGER.info("{} return a memtable, stack size {}", storageGroup, availableChunkBuffer.size());
+ LOGGER.info("{} return a chunk buffer, stack size {}", storageGroup, availableChunkBuffer.size());
}
}
- public static FlushTaskPool getInstance() {
+ public static ChunkBufferPool getInstance() {
return InstanceHolder.INSTANCE;
}
@@ -112,6 +112,6 @@ public class FlushTaskPool {
private InstanceHolder() {
}
- private static final FlushTaskPool INSTANCE = new FlushTaskPool();
+ private static final ChunkBufferPool INSTANCE = new ChunkBufferPool();
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/EmptyMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/EmptyMemTable.java
index 88d5945..631fb30 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/EmptyMemTable.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/EmptyMemTable.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.iotdb.db.engine.memtable;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
index fc96fcf..feea3e4 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
@@ -15,14 +15,12 @@
package org.apache.iotdb.db.engine.memtable;
import java.io.IOException;
-import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.engine.pool.FlushSubTaskPoolManager;
-import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -138,7 +136,8 @@ public class MemTableFlushTaskV2 {
} else {
long starTime = System.currentTimeMillis();
Pair<TVList, MeasurementSchema> encodingMessage = (Pair<TVList, MeasurementSchema>) task;
- ChunkBuffer chunkBuffer = FlushTaskPool.getInstance().getEmptyChunkBuffer(this, encodingMessage.right);
+ ChunkBuffer chunkBuffer = ChunkBufferPool
+ .getInstance().getEmptyChunkBuffer(this, encodingMessage.right);
IChunkWriter seriesWriter = new ChunkWriterImpl(encodingMessage.right, chunkBuffer,
PAGE_SIZE_THRESHOLD);
@@ -153,7 +152,7 @@ public class MemTableFlushTaskV2 {
memTable.getVersion(), e);
throw new RuntimeException(e);
} finally {
- FlushTaskPool.getInstance().putBack(chunkBuffer);
+ ChunkBufferPool.getInstance().putBack(chunkBuffer);
}
memSerializeTime += System.currentTimeMillis() - starTime;
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
index 31ec7ac..6972c2e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
@@ -86,6 +86,9 @@ public class MemTablePool {
}
}
+ /**
+ * Only for test
+ */
public void putBack(IMemTable memTable, String storageGroup) {
synchronized (availableMemTables) {
memTable.clear();
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/ChunkBufferPoolTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/ChunkBufferPoolTest.java
new file mode 100644
index 0000000..c71bb85
--- /dev/null
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/ChunkBufferPoolTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.memtable;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.chunk.ChunkBuffer;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ChunkBufferPoolTest {
+
+ private ConcurrentLinkedQueue<ChunkBuffer> chunkBuffers;
+
+ @Before
+ public void setUp() throws Exception {
+ chunkBuffers = new ConcurrentLinkedQueue();
+ new ReturnThread().start();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testGetAndRelease() {
+ for (int i = 0; i < 50; i++) {
+ ChunkBuffer chunk = ChunkBufferPool.getInstance().getEmptyChunkBuffer("test case",
+ new MeasurementSchema("node", TSDataType.INT32, TSEncoding.PLAIN,
+ CompressionType.SNAPPY));
+ chunkBuffers.add(chunk);
+ }
+ }
+
+ class ReturnThread extends Thread {
+
+ @Override
+ public void run() {
+ while (true) {
+ ChunkBuffer chunkBuffer = chunkBuffers.poll();
+ if (chunkBuffer == null) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ continue;
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ chunkBuffers.remove(chunkBuffer);
+ ChunkBufferPool.getInstance().putBack(chunkBuffer, "test case");
+ }
+ }
+ }
+
+
+}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTablePoolTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTablePoolTest.java
index b743207..3dbf681 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTablePoolTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTablePoolTest.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.iotdb.db.engine.memtable;
import java.util.TreeMap;
@@ -22,10 +40,13 @@ public class MemTablePoolTest {
@Test
public void testGetAndRelease() {
- for (int i = 0; i < 100; i++) {
+ long time = System.currentTimeMillis();
+ for (int i = 0; i < 50; i++) {
IMemTable memTable = MemTablePool.getInstance().getEmptyMemTable("test case");
memTables.add(memTable);
}
+ time -= System.currentTimeMillis();
+ System.out.println("memtable pool use deque and synchronized consume:" + time);
}
@Test
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/utils/datastructure/LongTVListTest.java b/iotdb/src/test/java/org/apache/iotdb/db/utils/datastructure/LongTVListTest.java
index ee65a55..001db8b 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/utils/datastructure/LongTVListTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/utils/datastructure/LongTVListTest.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.iotdb.db.utils.datastructure;
import java.util.ArrayList;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkBuffer.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkBuffer.java
index 8fa0d8a..290c516 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkBuffer.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkBuffer.java
@@ -99,7 +99,7 @@ public class ChunkBuffer {
}
this.maxTimestamp = maxTimestamp;
int uncompressedSize = data.remaining();
- int compressedSize = 0;
+ int compressedSize;
int compressedPosition = 0;
byte[] compressedBytes = null;
@@ -117,7 +117,7 @@ public class ChunkBuffer {
}
}
- int headerSize = 0;
+ int headerSize;
// write the page header to IOWriter
try {