You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/06/18 07:21:35 UTC
[incubator-iotdb] 03/03: add flush mem pool
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit d08e317bb14e3eb225f6b793e2e1e10775216481
Author: lta <li...@163.com>
AuthorDate: Tue Jun 18 15:10:56 2019 +0800
add flush mem pool
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 13 +++++
.../engine/bufferwrite/BufferWriteProcessor.java | 14 ++---
.../bufferwriteV2/BufferWriteProcessorV2.java | 46 +++++++++++++--
.../db/engine/bufferwriteV2/FlushManager.java | 67 ++++++++++++++++++++++
.../iotdb/db/engine/filenode/FileNodeManager.java | 4 +-
.../db/engine/filenode/FileNodeProcessor.java | 4 +-
.../db/engine/filenodeV2/FileNodeManagerV2.java | 18 ++++++
.../db/engine/filenodeV2/FileNodeProcessorV2.java | 18 ++++++
.../iotdb/db/engine/filenodeV2/FlushManager.java | 51 ----------------
.../iotdb/db/engine/filenodeV2/MetadataAgent.java | 18 ++++++
.../db/engine/filenodeV2/TsFileResourceV2.java | 18 ++++++
.../iotdb/db/engine/memtable/MemTablePool.java | 44 +++++++++++---
.../db/engine/overflow/io/OverflowProcessor.java | 8 +--
.../{FlushManager.java => FlushPoolManager.java} | 10 ++--
.../{MergeManager.java => MergePoolManager.java} | 8 +--
15 files changed, 253 insertions(+), 88 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 275ee80..ebaa947 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -171,6 +171,11 @@ public class IoTDBConfig {
*/
private int concurrentFlushThread = Runtime.getRuntime().availableProcessors();
+ /**
+ * Maximum number of active mem tables.
+ */
+ private int maxActiveMemTableSize = 100;
+
private ZoneId zoneID = ZoneId.systemDefault();
/**
* BufferWriteProcessor and OverflowProcessor will immediately flushMetadata if this threshold is
@@ -829,4 +834,12 @@ public class IoTDBConfig {
public void setRpcImplClassName(String rpcImplClassName) {
this.rpcImplClassName = rpcImplClassName;
}
+
+ public int getMaxActiveMemTableSize() {
+ return maxActiveMemTableSize;
+ }
+
+ public void setMaxActiveMemTableSize(int maxActiveMemTableSize) {
+ this.maxActiveMemTableSize = maxActiveMemTableSize;
+ }
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
index 9bca6ca..3c74cec 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
@@ -44,7 +44,7 @@ import org.apache.iotdb.db.engine.memtable.MemSeriesLazyMerger;
import org.apache.iotdb.db.engine.memtable.MemTableFlushTask;
import org.apache.iotdb.db.engine.memtable.MemTablePool;
import org.apache.iotdb.db.engine.modification.Deletion;
-import org.apache.iotdb.db.engine.pool.FlushManager;
+import org.apache.iotdb.db.engine.pool.FlushPoolManager;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.BufferWriteProcessorException;
@@ -443,18 +443,18 @@ public class BufferWriteProcessor extends Processor {
if (isCloseTaskCalled) {
LOGGER.info(
"flushMetadata memtable for bufferwrite processor {} synchronously for close task.",
- getProcessorName(), FlushManager.getInstance().getWaitingTasksNumber(),
- FlushManager.getInstance().getCorePoolSize());
+ getProcessorName(), FlushPoolManager.getInstance().getWaitingTasksNumber(),
+ FlushPoolManager.getInstance().getCorePoolSize());
flushTask("synchronously", tmpMemTableToFlush, version, flushId);
flushFuture = new ImmediateFuture<>(true);
} else {
if (LOGGER.isInfoEnabled()) {
LOGGER.info(
"Begin to submit flushMetadata task for bufferwrite processor {}, current Flush Queue is {}, core pool size is {}.",
- getProcessorName(), FlushManager.getInstance().getWaitingTasksNumber(),
- FlushManager.getInstance().getCorePoolSize());
+ getProcessorName(), FlushPoolManager.getInstance().getWaitingTasksNumber(),
+ FlushPoolManager.getInstance().getCorePoolSize());
}
- flushFuture = FlushManager.getInstance().submit(() -> flushTask("asynchronously",
+ flushFuture = FlushPoolManager.getInstance().submit(() -> flushTask("asynchronously",
tmpMemTableToFlush, version, flushId));
}
@@ -482,7 +482,7 @@ public class BufferWriteProcessor extends Processor {
try {
// flushMetadata data (if there are flushing task, flushMetadata() will be blocked) and wait for finishing flushMetadata async
LOGGER.info("Submit a BufferWrite ({}) close task.", getProcessorName());
- closeFuture = new BWCloseFuture(FlushManager.getInstance().submit(() -> closeTask()));
+ closeFuture = new BWCloseFuture(FlushPoolManager.getInstance().submit(() -> closeTask()));
//now, we omit the future of the closeTask.
} catch (Exception e) {
LOGGER
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/BufferWriteProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/BufferWriteProcessorV2.java
index 7214e53..4bfa2db 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/BufferWriteProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/BufferWriteProcessorV2.java
@@ -1,9 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.iotdb.db.engine.bufferwriteV2;
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.concurrent.ConcurrentLinkedDeque;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.filenodeV2.TsFileResourceV2;
import org.apache.iotdb.db.engine.memtable.IMemTable;
@@ -24,8 +41,11 @@ public class BufferWriteProcessorV2 {
private FileSchema fileSchema;
private final String storageGroupName;
+
private TsFileResourceV2 tsFileResource;
+ private volatile boolean isManagedByFlushManager;
+
/**
* true: to be closed
*/
@@ -33,7 +53,7 @@ public class BufferWriteProcessorV2 {
private IMemTable workMemTable;
- private final List<IMemTable> flushingMemTables = new ArrayList<>();
+ private final ConcurrentLinkedDeque<IMemTable> flushingMemTables = new ConcurrentLinkedDeque<>();
public BufferWriteProcessorV2(String storageGroupName, File file, FileSchema fileSchema) throws IOException {
this.storageGroupName = storageGroupName;
@@ -79,16 +99,30 @@ public class BufferWriteProcessorV2 {
* put the workMemtable into flushing list and set null
*/
public void flush() {
- synchronized (flushingMemTables) {
- flushingMemTables.add(workMemTable);
- }
+ flushingMemTables.addLast(workMemTable);
workMemTable = null;
}
+ public void flushOneMemTable(){
+ IMemTable memTableToFlush = flushingMemTables.pollFirst();
+
+ }
+
public boolean shouldClose() {
long fileSize = tsFileResource.getFileSize();
long fileSizeThreshold = IoTDBDescriptor.getInstance().getConfig().getBufferwriteFileSizeThreshold();
return fileSize > fileSizeThreshold;
}
+ public boolean isManagedByFlushManager() {
+ return isManagedByFlushManager;
+ }
+
+ public void setManagedByFlushManager(boolean managedByFlushManager) {
+ isManagedByFlushManager = managedByFlushManager;
+ }
+
+ public int getFlushingMemTableSize() {
+ return flushingMemTables.size();
+ }
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/FlushManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/FlushManager.java
new file mode 100644
index 0000000..57e81c2
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/FlushManager.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.bufferwriteV2;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.iotdb.db.engine.pool.FlushPoolManager;
+
+public class FlushManager {
+
+ private ConcurrentLinkedQueue<BufferWriteProcessorV2> bwpQueue = new ConcurrentLinkedQueue<>();
+
+ private FlushPoolManager flushPool = FlushPoolManager.getInstance();
+
+ private Runnable flushAction = () -> {
+ BufferWriteProcessorV2 bwp = bwpQueue.poll();
+ bwp.flushOneMemTable();
+ bwp.setManagedByFlushManager(false);
+ addBWP(bwp);
+ };
+
+ /**
+ * Add BufferWriteProcessor to flush manager
+ */
+ private boolean addBWP(BufferWriteProcessorV2 bwp) {
+ synchronized (bwp) {
+ if (!bwp.isManagedByFlushManager() && bwp.getFlushingMemTableSize() > 0) {
+ bwpQueue.add(bwp);
+ bwp.setManagedByFlushManager(true);
+ flushPool.submit(flushAction);
+ return true;
+ }
+ return false;
+ }
+ }
+
+ private FlushManager() {
+ }
+
+ public static FlushManager getInstance() {
+ return InstanceHolder.instance;
+ }
+
+ private static class InstanceHolder {
+
+ private InstanceHolder() {
+ }
+
+ private static FlushManager instance = new FlushManager();
+ }
+
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
index f3878a0..a9a8064 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
@@ -42,7 +42,7 @@ import org.apache.iotdb.db.engine.Processor;
import org.apache.iotdb.db.engine.bufferwrite.BufferWriteProcessor;
import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
import org.apache.iotdb.db.engine.overflow.io.OverflowProcessor;
-import org.apache.iotdb.db.engine.pool.FlushManager;
+import org.apache.iotdb.db.engine.pool.FlushPoolManager;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.BufferWriteProcessorException;
import org.apache.iotdb.db.exception.FileNodeManagerException;
@@ -1200,7 +1200,7 @@ public class FileNodeManager implements IStatistic, IService {
// if the flushMetadata thread pool is not full ( or half full), start a new
// flushMetadata task
case SAFE:
- if (FlushManager.getInstance().getActiveCnt() < 0.5 * FlushManager.getInstance()
+ if (FlushPoolManager.getInstance().getActiveCnt() < 0.5 * FlushPoolManager.getInstance()
.getThreadCnt()) {
try {
flushTop(0.01f);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index fb46ebc..43086c4 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -60,7 +60,7 @@ import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.overflow.io.OverflowProcessor;
-import org.apache.iotdb.db.engine.pool.MergeManager;
+import org.apache.iotdb.db.engine.pool.MergePoolManager;
import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSource;
import org.apache.iotdb.db.engine.querycontext.OverflowInsertFile;
import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
@@ -996,7 +996,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
Runnable mergeThread;
mergeThread = new MergeRunnale();
LOGGER.info("Submit the merge task, the merge filenode is {}", getProcessorName());
- return MergeManager.getInstance().submit(mergeThread);
+ return MergePoolManager.getInstance().submit(mergeThread);
} else {
if (!isOverflowed) {
LOGGER.info(
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
index f61149a..1e183b3 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.iotdb.db.engine.filenodeV2;
import java.io.File;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index 753db83..085357e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.iotdb.db.engine.filenodeV2;
import java.io.File;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FlushManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FlushManager.java
deleted file mode 100644
index a485766..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FlushManager.java
+++ /dev/null
@@ -1,51 +0,0 @@
-package org.apache.iotdb.db.engine.filenodeV2;
-
-import java.util.Deque;
-import java.util.Queue;
-import java.util.concurrent.ThreadFactory;
-import sun.nio.ch.ThreadPool;
-
-public class FlushManager {
-
- private Queue<BWP> queue;
-
- private ThreadPool flushPool;
-
- private static final Object object;
-
- public FlushManager(int n) {
- this.flushPool = createFlushThreads(n, flushThread);
- }
-
- private boolean addBWP(BWP){
- synchronized (BWP) {
- // 对同一个BWP至多一个线程执行此操作
- if (!BWP.isManagedByFlushManager() && BWP.taskSize() > 0) {
- synchronized (queue) {
- queue.add(BWP);
- }
- BWP.setManagedByFlushManager(true);
- flushPool.submit(flushThread);
-// object.notify();
- return true;
- }
- return false;
- }
- }
-
- Runnable flushThread = new Runnable(){
- @Override
- public void run() {
-// object.wait();
- synchronized (queue) {
- BWP = queue.poll();
- }
- flushOneMemTable(BWP);
- // 对同一个BWP至多一个线程执行此操作
- BWP.setManagedByFlushManager(false);
- addBWP(BWP);
- }
- };
-
-
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/MetadataAgent.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/MetadataAgent.java
index c9845f7..7c725f1 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/MetadataAgent.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/MetadataAgent.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.iotdb.db.engine.filenodeV2;
import java.util.List;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
index 1cfd8e2..e7d981f 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.iotdb.db.engine.filenodeV2;
import java.io.File;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
index e50be7f..c85719b 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
@@ -1,23 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.iotdb.db.engine.memtable;
import java.util.ArrayDeque;
import java.util.Deque;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MemTablePool {
+
private static final Logger LOGGER = LoggerFactory.getLogger(MemTablePool.class);
- private Deque<IMemTable> emptyMemTables;
- // >= number of storage group * 2
- private int capacity = 20;
+ private static final Deque<IMemTable> emptyMemTables = new ArrayDeque<>();
+
+ /**
+ * >= number of storage group * 2
+ */
+ private static final int capacity = IoTDBDescriptor.getInstance().getConfig()
+ .getMaxActiveMemTableSize();
+
private int size = 0;
- private static final int WAIT_TIME = 2000;
- private static final MemTablePool INSTANCE = new MemTablePool();
+ private static final int WAIT_TIME = 2000;
private MemTablePool() {
- emptyMemTables = new ArrayDeque<>();
}
public IMemTable getEmptyMemTable(Object applier) {
@@ -63,7 +86,14 @@ public class MemTablePool {
}
public static MemTablePool getInstance() {
- return INSTANCE;
+ return InstanceHolder.INSTANCE;
}
+ private static class InstanceHolder {
+
+ private InstanceHolder() {
+ }
+
+ private static final MemTablePool INSTANCE = new MemTablePool();
+ }
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
index 3171b4c..0d8554a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
@@ -49,7 +49,7 @@ import org.apache.iotdb.db.engine.memtable.MemTablePool;
import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.ModificationFile;
-import org.apache.iotdb.db.engine.pool.FlushManager;
+import org.apache.iotdb.db.engine.pool.FlushPoolManager;
import org.apache.iotdb.db.engine.querycontext.MergeSeriesDataSource;
import org.apache.iotdb.db.engine.querycontext.OverflowInsertFile;
import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
@@ -617,12 +617,12 @@ public class OverflowProcessor extends Processor {
IMemTable tmpMemTableToFlush = workSupport;
workSupport = MemTablePool.getInstance().getEmptyMemTable(this);
flushId++;
- flushFuture = FlushManager.getInstance().submit(() -> flushTask("asynchronously",
+ flushFuture = FlushPoolManager.getInstance().submit(() -> flushTask("asynchronously",
tmpMemTableToFlush, flushId, this::removeFlushedMemTable));
// switch from work to flush
// switchWorkToFlush();
-// flushFuture = FlushManager.getInstance().submit(() ->
+// flushFuture = FlushPoolManager.getInstance().submit(() ->
// flushTask("asynchronously", walTaskId));
} else {
// flushFuture = new ImmediateFuture(true);
@@ -795,7 +795,7 @@ public class OverflowProcessor extends Processor {
//
// }else {
// isFlushing = true;
-//// flushFuture = FlushManager.getInstance().submit(() ->
+//// flushFuture = FlushPoolManager.getInstance().submit(() ->
// flushTask("asynchronously", walTaskId));
// }
// } finally {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushPoolManager.java
similarity index 95%
rename from iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushManager.java
rename to iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushPoolManager.java
index ead8e89..9632522 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushPoolManager.java
@@ -30,20 +30,20 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.ProcessorException;
-public class FlushManager {
+public class FlushPoolManager {
private static final int EXIT_WAIT_TIME = 60 * 1000;
private ExecutorService pool;
private int threadCnt;
- private FlushManager() {
+ private FlushPoolManager() {
IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
this.threadCnt = config.getConcurrentFlushThread();
pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, ThreadName.FLUSH_SERVICE.getName());
}
- public static FlushManager getInstance() {
+ public static FlushPoolManager getInstance() {
return InstanceHolder.instance;
}
@@ -59,7 +59,7 @@ public class FlushManager {
pool = Executors.newFixedThreadPool(config.getConcurrentFlushThread());
}
- public FlushManager(ExecutorService pool) {
+ public FlushPoolManager(ExecutorService pool) {
this.pool = pool;
}
@@ -134,7 +134,7 @@ public class FlushManager {
private InstanceHolder(){
//allowed to do nothing
}
- private static FlushManager instance = new FlushManager();
+ private static FlushPoolManager instance = new FlushPoolManager();
}
public int getWaitingTasksNumber() {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergePoolManager.java
similarity index 95%
rename from iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergeManager.java
rename to iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergePoolManager.java
index 44874bb..5585119 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergePoolManager.java
@@ -29,18 +29,18 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.ProcessorException;
-public class MergeManager {
+public class MergePoolManager {
private ExecutorService pool;
private int threadCnt;
- private MergeManager() {
+ private MergePoolManager() {
IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
this.threadCnt = config.getMergeConcurrentThreads();
pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, ThreadName.MERGE_SERVICE.getName());
}
- public static MergeManager getInstance() {
+ public static MergePoolManager getInstance() {
return InstanceHolder.instance;
}
@@ -121,6 +121,6 @@ public class MergeManager {
private InstanceHolder(){
//allowed to do nothing
}
- private static MergeManager instance = new MergeManager();
+ private static MergePoolManager instance = new MergePoolManager();
}
}