You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/01/13 03:20:51 UTC
[iotdb] branch master updated: [IOTDB-1094] Improve some code
writing and name typo (#2382)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4acc515 [IOTDB-1094] Improve some code writing and name typo (#2382)
4acc515 is described below
commit 4acc515f4196b9438435e1a80c6086163f7c153c
Author: Qi Yu <yu...@gmail.com>
AuthorDate: Wed Jan 13 11:20:24 2021 +0800
[IOTDB-1094] Improve some code writing and name typo (#2382)
---
.../apache/iotdb/db/engine/flush/FlushManager.java | 24 +++++++----
.../iotdb/db/engine/flush/MemTableFlushTask.java | 48 ++++++++++++----------
2 files changed, 42 insertions(+), 30 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
index 7ea8e1b..4a9b329 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
@@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory;
public class FlushManager implements FlushManagerMBean, IService {
- private static final Logger logger = LoggerFactory.getLogger(FlushManager.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(FlushManager.class);
private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private ConcurrentLinkedDeque<TsFileProcessor> tsFileProcessorQueue = new ConcurrentLinkedDeque<>();
@@ -92,10 +92,14 @@ public class FlushManager implements FlushManagerMBean, IService {
@Override
public void runMayThrow() {
TsFileProcessor tsFileProcessor = tsFileProcessorQueue.poll();
+ if (null == tsFileProcessor) {
+ return;
+ }
+
tsFileProcessor.flushOneMemTable();
tsFileProcessor.setManagedByFlushManager(false);
- if (logger.isDebugEnabled()) {
- logger.debug("Flush Thread re-register TSProcessor {} to the queue.",
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Flush Thread re-register TSProcessor {} to the queue.",
tsFileProcessor.getTsFileResource().getTsFile().getAbsolutePath());
}
registerTsFileProcessor(tsFileProcessor);
@@ -104,7 +108,7 @@ public class FlushManager implements FlushManagerMBean, IService {
try {
StatMonitor.getInstance().saveStatValue(tsFileProcessor.getStorageGroupName());
} catch (StorageEngineException | MetadataException e) {
- logger.error("Inserting monitor series data error.", e);
+ LOGGER.error("Inserting monitor series data error.", e);
}
}
}
@@ -119,24 +123,26 @@ public class FlushManager implements FlushManagerMBean, IService {
if (!tsFileProcessor.isManagedByFlushManager()
&& tsFileProcessor.getFlushingMemTableSize() > 0) {
tsFileProcessorQueue.add(tsFileProcessor);
- if (logger.isDebugEnabled()) {
- logger.debug(
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
"{} begin to submit a flush thread, flushing memtable size: {}, queue size: {}",
tsFileProcessor.getTsFileResource().getTsFile().getAbsolutePath(),
tsFileProcessor.getFlushingMemTableSize(), tsFileProcessorQueue.size());
}
tsFileProcessor.setManagedByFlushManager(true);
flushPool.submit(new FlushThread());
- } else if (logger.isDebugEnabled()) {
+ }
+
+ if (LOGGER.isDebugEnabled()) {
if (tsFileProcessor.isManagedByFlushManager()) {
- logger.debug(
+ LOGGER.debug(
"{} is already in the flushPool, the first one: {}, the given processor flushMemtable number = {}",
tsFileProcessor.getTsFileResource().getTsFile().getAbsolutePath(),
tsFileProcessorQueue.isEmpty() ? "empty now"
: tsFileProcessorQueue.getFirst().getStorageGroupName(),
tsFileProcessor.getFlushingMemTableSize());
} else {
- logger.debug("No flushing memetable to do, register TsProcessor {} failed.",
+ LOGGER.debug("No flushing memetable to do, register TsProcessor {} failed.",
tsFileProcessor.getTsFileResource().getTsFile().getAbsolutePath());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 11c5303..7168a0d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.engine.flush;
import java.io.IOException;
+import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -39,8 +40,8 @@ import org.slf4j.LoggerFactory;
public class MemTableFlushTask {
- private static final Logger logger = LoggerFactory.getLogger(MemTableFlushTask.class);
- private static final FlushSubTaskPoolManager subTaskPoolManager = FlushSubTaskPoolManager
+ private static final Logger LOGGER = LoggerFactory.getLogger(MemTableFlushTask.class);
+ private static final FlushSubTaskPoolManager SUB_TASK_POOL_MANAGER = FlushSubTaskPoolManager
.getInstance();
private final Future<?> encodingTaskFuture;
private final Future<?> ioTaskFuture;
@@ -65,9 +66,9 @@ public class MemTableFlushTask {
this.memTable = memTable;
this.writer = writer;
this.storageGroup = storageGroup;
- this.encodingTaskFuture = subTaskPoolManager.submit(encodingTask);
- this.ioTaskFuture = subTaskPoolManager.submit(ioTask);
- logger.debug("flush task of Storage group {} memtable {} is created ",
+ this.encodingTaskFuture = SUB_TASK_POOL_MANAGER.submit(encodingTask);
+ this.ioTaskFuture = SUB_TASK_POOL_MANAGER.submit(ioTask);
+ LOGGER.debug("flush task of Storage group {} memtable {} is created ",
storageGroup, memTable.getVersion());
}
@@ -75,28 +76,33 @@ public class MemTableFlushTask {
* the function for flushing memtable.
*/
public void syncFlushMemTable()
- throws ExecutionException, InterruptedException, IOException {
- logger.info("The memTable size of SG {} is {}, the avg series points num in chunk is {} ",
+ throws ExecutionException, InterruptedException {
+ LOGGER.info("The memTable size of SG {} is {}, the avg series points num in chunk is {} ",
storageGroup,
memTable.memSize(),
memTable.getTotalPointsNum() / memTable.getSeriesNumber());
long start = System.currentTimeMillis();
long sortTime = 0;
- for (String deviceId : memTable.getMemTableMap().keySet()) {
- encodingTaskQueue.add(new StartFlushGroupIOTask(deviceId));
- for (String measurementId : memTable.getMemTableMap().get(deviceId).keySet()) {
+ //for map do not use get(key) to iteratate
+ for (Map.Entry<String, Map<String, IWritableMemChunk>> memTableEntry : memTable.getMemTableMap().entrySet()) {
+ encodingTaskQueue.add(new StartFlushGroupIOTask(memTableEntry.getKey()));
+
+ final Map<String, IWritableMemChunk> value = memTableEntry.getValue();
+ for (Map.Entry<String, IWritableMemChunk> iWritableMemChunkEntry : value.entrySet()) {
long startTime = System.currentTimeMillis();
- IWritableMemChunk series = memTable.getMemTableMap().get(deviceId).get(measurementId);
+ IWritableMemChunk series = iWritableMemChunkEntry.getValue();
MeasurementSchema desc = series.getSchema();
TVList tvList = series.getSortedTVListForFlush();
sortTime += System.currentTimeMillis() - startTime;
encodingTaskQueue.add(new Pair<>(tvList, desc));
}
+
encodingTaskQueue.add(new EndChunkGroupIoTask());
}
+
noMoreEncodingTask = true;
- logger.debug(
+ LOGGER.debug(
"Storage group {} memtable {}, flushing into disk: data sort time cost {} ms.",
storageGroup, memTable.getVersion(), sortTime);
@@ -118,7 +124,7 @@ public class MemTableFlushTask {
throw new ExecutionException(e);
}
- logger.info(
+ LOGGER.info(
"Storage group {} memtable {} flushing a memtable has finished! Time consumption: {}ms",
storageGroup, memTable, System.currentTimeMillis() - start);
}
@@ -154,7 +160,7 @@ public class MemTableFlushTask {
seriesWriterImpl.write(time, tvPairs.getBinary(i));
break;
default:
- logger.error("Storage group {} does not support data type: {}", storageGroup,
+ LOGGER.error("Storage group {} does not support data type: {}", storageGroup,
dataType);
break;
}
@@ -166,7 +172,7 @@ public class MemTableFlushTask {
public void run() {
long memSerializeTime = 0;
boolean noMoreMessages = false;
- logger.debug("Storage group {} memtable {}, starts to encoding data.", storageGroup,
+ LOGGER.debug("Storage group {} memtable {}, starts to encoding data.", storageGroup,
memTable.getVersion());
while (true) {
if (noMoreEncodingTask) {
@@ -180,7 +186,7 @@ public class MemTableFlushTask {
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (@SuppressWarnings("squid:S2142") InterruptedException e) {
- logger.error("Storage group {} memtable {}, encoding task is interrupted.",
+ LOGGER.error("Storage group {} memtable {}, encoding task is interrupted.",
storageGroup, memTable.getVersion(), e);
// generally it is because the thread pool is shutdown so the task should be aborted
break;
@@ -199,7 +205,7 @@ public class MemTableFlushTask {
}
}
noMoreIOTask = true;
- logger.debug("Storage group {}, flushing memtable {} into disk: Encoding data cost "
+ LOGGER.debug("Storage group {}, flushing memtable {} into disk: Encoding data cost "
+ "{} ms.",
storageGroup, memTable.getVersion(), memSerializeTime);
}
@@ -209,7 +215,7 @@ public class MemTableFlushTask {
private Runnable ioTask = () -> {
long ioTime = 0;
boolean returnWhenNoTask = false;
- logger.debug("Storage group {} memtable {}, start io.", storageGroup, memTable.getVersion());
+ LOGGER.debug("Storage group {} memtable {}, start io.", storageGroup, memTable.getVersion());
while (true) {
if (noMoreIOTask) {
returnWhenNoTask = true;
@@ -222,7 +228,7 @@ public class MemTableFlushTask {
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (@SuppressWarnings("squid:S2142") InterruptedException e) {
- logger.error("Storage group {} memtable {}, io task is interrupted.", storageGroup
+ LOGGER.error("Storage group {} memtable {}, io task is interrupted.", storageGroup
, memTable.getVersion());
// generally it is because the thread pool is shutdown so the task should be aborted
break;
@@ -241,14 +247,14 @@ public class MemTableFlushTask {
this.writer.endChunkGroup();
}
} catch (IOException e) {
- logger.error("Storage group {} memtable {}, io task meets error.", storageGroup,
+ LOGGER.error("Storage group {} memtable {}, io task meets error.", storageGroup,
memTable.getVersion(), e);
throw new FlushRunTimeException(e);
}
ioTime += System.currentTimeMillis() - starTime;
}
}
- logger.debug("flushing a memtable {} in storage group {}, io cost {}ms", memTable.getVersion(),
+ LOGGER.debug("flushing a memtable {} in storage group {}, io cost {}ms", memTable.getVersion(),
storageGroup, ioTime);
};