You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/07/24 03:35:33 UTC
[incubator-iotdb] 01/01: reconstruct flush pool class
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch manage_flush_pool
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit fa888914fdbc3d01f297fa7572bfd6461931a5d8
Author: lta <li...@163.com>
AuthorDate: Wed Jul 24 11:34:32 2019 +0800
reconstruct flush pool class
---
.../{storagegroup => flush}/FlushManager.java | 32 ++++++-
.../{memtable => flush}/MemTableFlushTask.java | 7 +-
.../{memtable => flush}/NotifyFlushMemTable.java | 5 +-
.../pool/AbstractPoolManager.java} | 65 ++++++-------
.../engine/flush/pool/FlushSubTaskPoolManager.java | 79 ++++++++++++++++
.../db/engine/flush/pool/FlushTaskPoolManager.java | 77 +++++++++++++++
.../iotdb/db/engine/memtable/AbstractMemTable.java | 3 +-
.../iotdb/db/engine/pool/FlushPoolManager.java | 104 ---------------------
.../db/engine/storagegroup/TsFileProcessor.java | 19 ++--
.../java/org/apache/iotdb/db/service/IoTDB.java | 2 +
.../org/apache/iotdb/db/service/ServiceType.java | 3 +-
.../iotdb/db/tools/MemEst/MemEstToolCmd.java | 6 +-
.../writelog/recover/TsFileRecoverPerformer.java | 2 +-
.../db/engine/memtable/MemTableFlushTaskTest.java | 1 +
14 files changed, 239 insertions(+), 166 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/FlushManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
similarity index 70%
rename from server/src/main/java/org/apache/iotdb/db/engine/storagegroup/FlushManager.java
rename to server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
index 6141ab5..f996c18 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/FlushManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
@@ -16,20 +16,42 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.engine.storagegroup;
+package org.apache.iotdb.db.engine.flush;
import java.util.concurrent.ConcurrentLinkedDeque;
-import org.apache.iotdb.db.engine.pool.FlushPoolManager;
+import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
+import org.apache.iotdb.db.engine.flush.pool.FlushTaskPoolManager;
+import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.service.IService;
+import org.apache.iotdb.db.service.ServiceType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class FlushManager {
+public class FlushManager implements IService {
private static final Logger logger = LoggerFactory.getLogger(FlushManager.class);
private ConcurrentLinkedDeque<TsFileProcessor> tsFileProcessorQueue = new ConcurrentLinkedDeque<>();
- private FlushPoolManager flushPool = FlushPoolManager.getInstance();
+ private FlushTaskPoolManager flushPool = FlushTaskPoolManager.getInstance();
+
+ @Override
+ public void start() throws StartupException {
+ FlushSubTaskPoolManager.getInstance().start();
+ FlushTaskPoolManager.getInstance().start();
+ }
+
+ @Override
+ public void stop() {
+ FlushSubTaskPoolManager.getInstance().stop();
+ FlushTaskPoolManager.getInstance().stop();
+ }
+
+ @Override
+ public ServiceType getID() {
+ return ServiceType.FLUSH_SERVICE;
+ }
class FlushThread implements Runnable {
@@ -46,7 +68,7 @@ public class FlushManager {
* Add BufferWriteProcessor to asyncTryToFlush manager
*/
@SuppressWarnings("squid:S2445")
- void registerTsFileProcessor(TsFileProcessor tsFileProcessor) {
+ public void registerTsFileProcessor(TsFileProcessor tsFileProcessor) {
synchronized (tsFileProcessor) {
if (!tsFileProcessor.isManagedByFlushManager() && tsFileProcessor.getFlushingMemTableSize() > 0) {
logger.info("storage group {} begin to submit a flush thread, flushing memtable size: {}",
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
similarity index 97%
rename from server/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java
rename to server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index e4cddf5..b6cdb97 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -12,13 +12,16 @@
* or implied. See the License for the specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.engine.memtable;
+package org.apache.iotdb.db.engine.flush;
import java.io.IOException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import org.apache.iotdb.db.engine.pool.FlushSubTaskPoolManager;
+import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
+import org.apache.iotdb.db.engine.memtable.ChunkBufferPool;
+import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
import org.apache.iotdb.db.exception.FlushRunTimeException;
import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/NotifyFlushMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java
similarity index 85%
rename from server/src/main/java/org/apache/iotdb/db/engine/memtable/NotifyFlushMemTable.java
rename to server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java
index 1862509..295dadf 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/NotifyFlushMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java
@@ -16,8 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.engine.memtable;
+package org.apache.iotdb.db.engine.flush;
+import org.apache.iotdb.db.engine.memtable.AbstractMemTable;
+import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/pool/FlushSubTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java
similarity index 53%
rename from server/src/main/java/org/apache/iotdb/db/engine/pool/FlushSubTaskPoolManager.java
rename to server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java
index 243f052..cd11ec1 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/pool/FlushSubTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java
@@ -16,50 +16,40 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.engine.pool;
+
+package org.apache.iotdb.db.engine.flush.pool;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.db.concurrent.ThreadName;
-import org.apache.iotdb.db.exception.ProcessorException;
-
-public class FlushSubTaskPoolManager {
-
- private static final int EXIT_WAIT_TIME = 60 * 1000;
-
- private ExecutorService pool;
+import org.apache.iotdb.db.exception.StartupException;
+import org.slf4j.Logger;
- private FlushSubTaskPoolManager() {
- this.pool = IoTDBThreadPoolFactory
- .newCachedThreadPool(ThreadName.FLUSH_SUB_TASK_SERVICE.getName());
- }
+public abstract class AbstractPoolManager {
+
+ private static final int WAIT_TIMEOUT = 2000;
- public static FlushSubTaskPoolManager getInstance() {
- return FlushSubTaskPoolManager.InstanceHolder.instance;
- }
+ protected ExecutorService pool;
/**
* Block new flush submits and exit when all RUNNING THREADS AND TASKS IN THE QUEUE end.
- *
- * @param block if set to true, this method will wait for timeOut milliseconds.
- * @param timeout block time out in milliseconds.
- * @throws ProcessorException if timeOut is reached or being interrupted while waiting to exit.
*/
- public void close(boolean block, long timeout) throws ProcessorException {
- pool.shutdown();
- if (block) {
+ public void close() {
+ Logger logger = getLogger();
+ pool.shutdownNow();
+ long totalWaitTime = WAIT_TIMEOUT;
+ logger.info("Waiting for {} thread pool to shut down.", getName());
+ while (!pool.isTerminated()) {
try {
- if (!pool.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
- throw new ProcessorException("Flush thread pool doesn't exit after "
- + EXIT_WAIT_TIME + " ms");
+ if (!pool.awaitTermination(WAIT_TIMEOUT, TimeUnit.MILLISECONDS)) {
+ logger.info("{} thread pool doesn't exit after {}ms.", getName(),
+ + totalWaitTime);
}
+ totalWaitTime += WAIT_TIMEOUT;
} catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new ProcessorException("Interrupted while waiting flush thread pool to exit. ", e);
+ logger.error("Interrupted while waiting {} thread pool to exit. ", getName(), e);
}
}
}
@@ -76,15 +66,6 @@ public class FlushSubTaskPoolManager {
return ((ThreadPoolExecutor) pool).getActiveCount();
}
- private static class InstanceHolder {
-
- private InstanceHolder() {
- //allowed to do nothing
- }
-
- private static FlushSubTaskPoolManager instance = new FlushSubTaskPoolManager();
- }
-
public int getWaitingTasksNumber() {
return ((ThreadPoolExecutor) pool).getQueue().size();
}
@@ -92,4 +73,12 @@ public class FlushSubTaskPoolManager {
public int getCorePoolSize() {
return ((ThreadPoolExecutor) pool).getCorePoolSize();
}
+
+ public abstract Logger getLogger();
+
+ public abstract void start();
+
+ public abstract void stop();
+
+ public abstract String getName();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java
new file mode 100644
index 0000000..448fb49
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.flush.pool;
+
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.service.IService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlushSubTaskPoolManager extends AbstractPoolManager {
+
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(FlushSubTaskPoolManager.class);
+
+ private FlushSubTaskPoolManager() {
+ this.pool = IoTDBThreadPoolFactory
+ .newCachedThreadPool(ThreadName.FLUSH_SUB_TASK_SERVICE.getName());
+ }
+
+ public static FlushSubTaskPoolManager getInstance() {
+ return FlushSubTaskPoolManager.InstanceHolder.instance;
+ }
+
+ @Override
+ public Logger getLogger() {
+ return LOGGER;
+ }
+
+ @Override
+ public String getName() {
+ return "flush sub task";
+ }
+
+ @Override
+ public void start() {
+ if (pool == null) {
+ this.pool = IoTDBThreadPoolFactory
+ .newCachedThreadPool(ThreadName.FLUSH_SUB_TASK_SERVICE.getName());
+ }
+ LOGGER.info("Flush encoding sub task manager started.");
+ }
+
+ @Override
+ public void stop() {
+ if (pool != null) {
+ close();
+ pool = null;
+ }
+ LOGGER.info("Flush encoding sub task manager stopped");
+ }
+
+ private static class InstanceHolder {
+
+ private InstanceHolder() {
+ //allowed to do nothing
+ }
+
+ private static FlushSubTaskPoolManager instance = new FlushSubTaskPoolManager();
+ }
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushTaskPoolManager.java
new file mode 100644
index 0000000..aec372c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushTaskPoolManager.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.flush.pool;
+
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlushTaskPoolManager extends AbstractPoolManager {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(FlushTaskPoolManager.class);
+
+ private FlushTaskPoolManager() {
+ int threadCnt = IoTDBDescriptor.getInstance().getConfig().getConcurrentFlushThread();
+ pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, ThreadName.FLUSH_SERVICE.getName());
+ }
+
+ public static FlushTaskPoolManager getInstance() {
+ return InstanceHolder.instance;
+ }
+
+ @Override
+ public Logger getLogger() {
+ return LOGGER;
+ }
+
+ @Override
+ public String getName() {
+ return "flush task";
+ }
+
+ @Override
+ public void start() {
+ if (pool == null) {
+ int threadCnt = IoTDBDescriptor.getInstance().getConfig().getConcurrentFlushThread();
+ pool = IoTDBThreadPoolFactory
+ .newFixedThreadPool(threadCnt, ThreadName.FLUSH_SERVICE.getName());
+ }
+ LOGGER.info("Flush task manager started.");
+ }
+
+ @Override
+ public void stop() {
+ if (pool != null) {
+ close();
+ pool = null;
+ }
+ LOGGER.info("Flush task manager stopped");
+ }
+
+ private static class InstanceHolder {
+
+ private InstanceHolder() {
+ //allowed to do nothing
+ }
+
+ private static FlushTaskPoolManager instance = new FlushTaskPoolManager();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index d25e913..a4fd2a9 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -27,9 +27,8 @@ import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.utils.MemUtils;
-import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.db.rescon.TVListAllocator;
+import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
public abstract class AbstractMemTable implements IMemTable {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/pool/FlushPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/pool/FlushPoolManager.java
deleted file mode 100644
index fcc560b..0000000
--- a/server/src/main/java/org/apache/iotdb/db/engine/pool/FlushPoolManager.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.pool;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.db.concurrent.ThreadName;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.ProcessorException;
-
-public class FlushPoolManager {
-
- private static final int EXIT_WAIT_TIME = 60 * 1000;
-
- private ExecutorService pool;
- private int threadCnt;
-
- private FlushPoolManager() {
- IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- this.threadCnt = config.getConcurrentFlushThread();
- this.pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, ThreadName.FLUSH_SERVICE.getName());
- }
-
- public static FlushPoolManager getInstance() {
- return InstanceHolder.instance;
- }
-
- /**
- * Block new flush submits and exit when all RUNNING THREADS AND TASKS IN THE QUEUE end.
- *
- * @param block
- * if set to true, this method will wait for timeOut milliseconds.
- * @param timeout
- * block time out in milliseconds.
- * @throws ProcessorException
- * if timeOut is reached or being interrupted while waiting to exit.
- */
- public void close(boolean block, long timeout) throws ProcessorException {
- pool.shutdown();
- if (block) {
- try {
- if (!pool.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
- throw new ProcessorException("Flush thread pool doesn't exit after "
- + EXIT_WAIT_TIME + " ms");
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new ProcessorException("Interrupted while waiting flush thread pool to exit. ", e);
- }
- }
- }
-
- public synchronized Future<?> submit(Runnable task) {
- return pool.submit(task);
- }
-
- public synchronized <T>Future<T> submit(Callable<T> task){
- return pool.submit(task);
- }
-
- public int getActiveCnt() {
- return ((ThreadPoolExecutor) pool).getActiveCount();
- }
-
- public int getThreadCnt() {
- return threadCnt;
- }
-
- private static class InstanceHolder {
- private InstanceHolder(){
- //allowed to do nothing
- }
- private static FlushPoolManager instance = new FlushPoolManager();
- }
-
- public int getWaitingTasksNumber() {
- return ((ThreadPoolExecutor) pool).getQueue().size();
- }
-
- public int getCorePoolSize() {
- return ((ThreadPoolExecutor) pool).getCorePoolSize();
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index d27db69..34dcbac 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -31,11 +31,11 @@ import java.util.function.Supplier;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.adapter.CompressionRatio;
-import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.flush.FlushManager;
+import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
+import org.apache.iotdb.db.engine.flush.NotifyFlushMemTable;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.MemSeriesLazyMerger;
-import org.apache.iotdb.db.engine.memtable.MemTableFlushTask;
-import org.apache.iotdb.db.engine.memtable.NotifyFlushMemTable;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
@@ -364,7 +364,7 @@ public class TsFileProcessor {
* Take the first MemTable from the flushingMemTables and flush it. Called by a flush thread of
* the flush manager pool
*/
- void flushOneMemTable() {
+ public void flushOneMemTable() {
IMemTable memTableToFlush;
memTableToFlush = flushingMemTables.getFirst();
@@ -417,7 +417,8 @@ public class TsFileProcessor {
}
endFile();
} catch (IOException | TsFileProcessorException e) {
- logger.error("meet error when flush FileMetadata to {}, change system mode to read-only", tsFileResource.getFile().getAbsolutePath());
+ logger.error("meet error when flush FileMetadata to {}, change system mode to read-only",
+ tsFileResource.getFile().getAbsolutePath());
IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
try {
writer.reset();
@@ -458,7 +459,7 @@ public class TsFileProcessor {
}
- boolean isManagedByFlushManager() {
+ public boolean isManagedByFlushManager() {
return managedByFlushManager;
}
@@ -480,11 +481,11 @@ public class TsFileProcessor {
}
}
- void setManagedByFlushManager(boolean managedByFlushManager) {
+ public void setManagedByFlushManager(boolean managedByFlushManager) {
this.managedByFlushManager = managedByFlushManager;
}
- int getFlushingMemTableSize() {
+ public int getFlushingMemTableSize() {
return flushingMemTables.size();
}
@@ -496,7 +497,7 @@ public class TsFileProcessor {
return writer;
}
- String getStorageGroupName() {
+ public String getStorageGroupName() {
return storageGroupName;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 2a9d19a..0c70887 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.adapter.IoTDBConfigDynamicAdapter;
import org.apache.iotdb.db.cost.statistic.Measurement;
import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.flush.FlushManager;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.monitor.StatMonitor;
@@ -90,6 +91,7 @@ public class IoTDB implements IoTDBMBean {
}
initMManager();
+ registerManager.register(FlushManager.getInstance());
registerManager.register(StorageEngine.getInstance());
registerManager.register(MultiFileLogNodeManager.getInstance());
registerManager.register(JMXService.getInstance());
diff --git a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
index 4f872fd..bff3e49 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
@@ -32,7 +32,8 @@ public enum ServiceType {
FILE_READER_MANAGER_SERVICE("File reader manager ServerService", ""),
SYNC_SERVICE("SYNC ServerService", ""),
PERFORMANCE_STATISTIC_SERVICE("PERFORMANCE_STATISTIC_SERVICE","PERFORMANCE_STATISTIC_SERVICE"),
- TVLIST_ALLOCATOR_SERVICE("TVList Allocator", "");
+ TVLIST_ALLOCATOR_SERVICE("TVList Allocator", ""),
+ FLUSH_SERVICE("Flush ServerService", "");
private String name;
private String jmxName;
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/MemEst/MemEstToolCmd.java b/server/src/main/java/org/apache/iotdb/db/tools/MemEst/MemEstToolCmd.java
index 9c5d96d..6159232 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/MemEst/MemEstToolCmd.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/MemEst/MemEstToolCmd.java
@@ -73,12 +73,12 @@ public class MemEstToolCmd implements Runnable {
MManager.getInstance().clear();
long sgCnt = 1;
- long tsCnt = 1;
+ long tsCnt = 0;
try {
for (; sgCnt <= sgNum; sgCnt++) {
IoTDBConfigDynamicAdapter.getInstance().addOrDeleteStorageGroup(1);
}
- for (; tsCnt <= tsNum; tsCnt++) {
+ for (; tsCnt < tsNum; tsCnt++) {
IoTDBConfigDynamicAdapter.getInstance().addOrDeleteTimeSeries(1);
if (maxTsNum == 0) {
maxTsNumValid = tsCnt / sgNum + 1;
@@ -91,7 +91,7 @@ public class MemEstToolCmd implements Runnable {
} catch (ConfigAdjusterException e) {
if (sgCnt > sgNum) {
- maxProcess = Math.max(maxProcess, tsCnt * 100 / tsNum);
+ maxProcess = Math.max(maxProcess, (tsCnt + 1) * 100 / tsNum);
System.out
.print(String.format("Memory estimation progress : %d%%\r", maxProcess));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index c6dc976..d012e3d 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -27,7 +27,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.apache.iotdb.db.engine.memtable.IMemTable;
-import org.apache.iotdb.db.engine.memtable.MemTableFlushTask;
+import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.version.VersionController;
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskTest.java b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskTest.java
index e2b0155..6306338 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskTest.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.util.concurrent.ExecutionException;
import org.apache.iotdb.db.engine.MetadataManagerHelper;
+import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;