You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/01/13 03:20:51 UTC

[iotdb] branch master updated: [IOTDB-1094] Improve some code writing and name typo (#2382)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4acc515  [IOTDB-1094] Improve some code writing and name typo (#2382)
4acc515 is described below

commit 4acc515f4196b9438435e1a80c6086163f7c153c
Author: Qi Yu <yu...@gmail.com>
AuthorDate: Wed Jan 13 11:20:24 2021 +0800

    [IOTDB-1094] Improve some code writing and name typo (#2382)
---
 .../apache/iotdb/db/engine/flush/FlushManager.java | 24 +++++++----
 .../iotdb/db/engine/flush/MemTableFlushTask.java   | 48 ++++++++++++----------
 2 files changed, 42 insertions(+), 30 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
index 7ea8e1b..4a9b329 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
@@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory;
 
 public class FlushManager implements FlushManagerMBean, IService {
 
-  private static final Logger logger = LoggerFactory.getLogger(FlushManager.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(FlushManager.class);
   private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
   private ConcurrentLinkedDeque<TsFileProcessor> tsFileProcessorQueue = new ConcurrentLinkedDeque<>();
@@ -92,10 +92,14 @@ public class FlushManager implements FlushManagerMBean, IService {
     @Override
     public void runMayThrow() {
       TsFileProcessor tsFileProcessor = tsFileProcessorQueue.poll();
+      if (null == tsFileProcessor) {
+        return;
+      }
+
       tsFileProcessor.flushOneMemTable();
       tsFileProcessor.setManagedByFlushManager(false);
-      if (logger.isDebugEnabled()) {
-        logger.debug("Flush Thread re-register TSProcessor {} to the queue.",
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("Flush Thread re-register TSProcessor {} to the queue.",
             tsFileProcessor.getTsFileResource().getTsFile().getAbsolutePath());
       }
       registerTsFileProcessor(tsFileProcessor);
@@ -104,7 +108,7 @@ public class FlushManager implements FlushManagerMBean, IService {
         try {
           StatMonitor.getInstance().saveStatValue(tsFileProcessor.getStorageGroupName());
         } catch (StorageEngineException | MetadataException e) {
-          logger.error("Inserting monitor series data error.", e);
+          LOGGER.error("Inserting monitor series data error.", e);
         }
       }
     }
@@ -119,24 +123,26 @@ public class FlushManager implements FlushManagerMBean, IService {
       if (!tsFileProcessor.isManagedByFlushManager()
           && tsFileProcessor.getFlushingMemTableSize() > 0) {
         tsFileProcessorQueue.add(tsFileProcessor);
-        if (logger.isDebugEnabled()) {
-          logger.debug(
+        if (LOGGER.isDebugEnabled()) {
+          LOGGER.debug(
               "{} begin to submit a flush thread, flushing memtable size: {}, queue size: {}",
               tsFileProcessor.getTsFileResource().getTsFile().getAbsolutePath(),
               tsFileProcessor.getFlushingMemTableSize(), tsFileProcessorQueue.size());
         }
         tsFileProcessor.setManagedByFlushManager(true);
         flushPool.submit(new FlushThread());
-      } else if (logger.isDebugEnabled()) {
+      }
+
+      if (LOGGER.isDebugEnabled()) {
         if (tsFileProcessor.isManagedByFlushManager()) {
-          logger.debug(
+          LOGGER.debug(
               "{} is already in the flushPool, the first one: {}, the given processor flushMemtable number = {}",
               tsFileProcessor.getTsFileResource().getTsFile().getAbsolutePath(),
               tsFileProcessorQueue.isEmpty() ? "empty now"
                   : tsFileProcessorQueue.getFirst().getStorageGroupName(),
               tsFileProcessor.getFlushingMemTableSize());
         } else {
-          logger.debug("No flushing memetable to do, register TsProcessor {} failed.",
+          LOGGER.debug("No flushing memetable to do, register TsProcessor {} failed.",
               tsFileProcessor.getTsFileResource().getTsFile().getAbsolutePath());
         }
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 11c5303..7168a0d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.engine.flush;
 
 import java.io.IOException;
+import java.util.Map;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -39,8 +40,8 @@ import org.slf4j.LoggerFactory;
 
 public class MemTableFlushTask {
 
-  private static final Logger logger = LoggerFactory.getLogger(MemTableFlushTask.class);
-  private static final FlushSubTaskPoolManager subTaskPoolManager = FlushSubTaskPoolManager
+  private static final Logger LOGGER = LoggerFactory.getLogger(MemTableFlushTask.class);
+  private static final FlushSubTaskPoolManager SUB_TASK_POOL_MANAGER = FlushSubTaskPoolManager
       .getInstance();
   private final Future<?> encodingTaskFuture;
   private final Future<?> ioTaskFuture;
@@ -65,9 +66,9 @@ public class MemTableFlushTask {
     this.memTable = memTable;
     this.writer = writer;
     this.storageGroup = storageGroup;
-    this.encodingTaskFuture = subTaskPoolManager.submit(encodingTask);
-    this.ioTaskFuture = subTaskPoolManager.submit(ioTask);
-    logger.debug("flush task of Storage group {} memtable {} is created ",
+    this.encodingTaskFuture = SUB_TASK_POOL_MANAGER.submit(encodingTask);
+    this.ioTaskFuture = SUB_TASK_POOL_MANAGER.submit(ioTask);
+    LOGGER.debug("flush task of Storage group {} memtable {} is created ",
         storageGroup, memTable.getVersion());
   }
 
@@ -75,28 +76,33 @@ public class MemTableFlushTask {
    * the function for flushing memtable.
    */
   public void syncFlushMemTable()
-      throws ExecutionException, InterruptedException, IOException {
-    logger.info("The memTable size of SG {} is {}, the avg series points num in chunk is {} ",
+      throws ExecutionException, InterruptedException {
+    LOGGER.info("The memTable size of SG {} is {}, the avg series points num in chunk is {} ",
         storageGroup,
         memTable.memSize(),
         memTable.getTotalPointsNum() / memTable.getSeriesNumber());
     long start = System.currentTimeMillis();
     long sortTime = 0;
 
-    for (String deviceId : memTable.getMemTableMap().keySet()) {
-      encodingTaskQueue.add(new StartFlushGroupIOTask(deviceId));
-      for (String measurementId : memTable.getMemTableMap().get(deviceId).keySet()) {
+    //for map do not use get(key) to iteratate
+    for (Map.Entry<String, Map<String, IWritableMemChunk>> memTableEntry : memTable.getMemTableMap().entrySet()) {
+      encodingTaskQueue.add(new StartFlushGroupIOTask(memTableEntry.getKey()));
+
+      final Map<String, IWritableMemChunk> value = memTableEntry.getValue();
+      for (Map.Entry<String, IWritableMemChunk> iWritableMemChunkEntry : value.entrySet()) {
         long startTime = System.currentTimeMillis();
-        IWritableMemChunk series = memTable.getMemTableMap().get(deviceId).get(measurementId);
+        IWritableMemChunk series = iWritableMemChunkEntry.getValue();
         MeasurementSchema desc = series.getSchema();
         TVList tvList = series.getSortedTVListForFlush();
         sortTime += System.currentTimeMillis() - startTime;
         encodingTaskQueue.add(new Pair<>(tvList, desc));
       }
+
       encodingTaskQueue.add(new EndChunkGroupIoTask());
     }
+
     noMoreEncodingTask = true;
-    logger.debug(
+    LOGGER.debug(
         "Storage group {} memtable {}, flushing into disk: data sort time cost {} ms.",
         storageGroup, memTable.getVersion(), sortTime);
 
@@ -118,7 +124,7 @@ public class MemTableFlushTask {
       throw new ExecutionException(e);
     }
 
-    logger.info(
+    LOGGER.info(
         "Storage group {} memtable {} flushing a memtable has finished! Time consumption: {}ms",
         storageGroup, memTable, System.currentTimeMillis() - start);
   }
@@ -154,7 +160,7 @@ public class MemTableFlushTask {
             seriesWriterImpl.write(time, tvPairs.getBinary(i));
             break;
           default:
-            logger.error("Storage group {} does not support data type: {}", storageGroup,
+            LOGGER.error("Storage group {} does not support data type: {}", storageGroup,
                 dataType);
             break;
         }
@@ -166,7 +172,7 @@ public class MemTableFlushTask {
     public void run() {
       long memSerializeTime = 0;
       boolean noMoreMessages = false;
-      logger.debug("Storage group {} memtable {}, starts to encoding data.", storageGroup,
+      LOGGER.debug("Storage group {} memtable {}, starts to encoding data.", storageGroup,
           memTable.getVersion());
       while (true) {
         if (noMoreEncodingTask) {
@@ -180,7 +186,7 @@ public class MemTableFlushTask {
           try {
             TimeUnit.MILLISECONDS.sleep(10);
           } catch (@SuppressWarnings("squid:S2142") InterruptedException e) {
-            logger.error("Storage group {} memtable {}, encoding task is interrupted.",
+            LOGGER.error("Storage group {} memtable {}, encoding task is interrupted.",
                 storageGroup, memTable.getVersion(), e);
             // generally it is because the thread pool is shutdown so the task should be aborted
             break;
@@ -199,7 +205,7 @@ public class MemTableFlushTask {
         }
       }
       noMoreIOTask = true;
-      logger.debug("Storage group {}, flushing memtable {} into disk: Encoding data cost "
+      LOGGER.debug("Storage group {}, flushing memtable {} into disk: Encoding data cost "
               + "{} ms.",
           storageGroup, memTable.getVersion(), memSerializeTime);
     }
@@ -209,7 +215,7 @@ public class MemTableFlushTask {
   private Runnable ioTask = () -> {
     long ioTime = 0;
     boolean returnWhenNoTask = false;
-    logger.debug("Storage group {} memtable {}, start io.", storageGroup, memTable.getVersion());
+    LOGGER.debug("Storage group {} memtable {}, start io.", storageGroup, memTable.getVersion());
     while (true) {
       if (noMoreIOTask) {
         returnWhenNoTask = true;
@@ -222,7 +228,7 @@ public class MemTableFlushTask {
         try {
           TimeUnit.MILLISECONDS.sleep(10);
         } catch (@SuppressWarnings("squid:S2142") InterruptedException e) {
-          logger.error("Storage group {} memtable {}, io task is interrupted.", storageGroup
+          LOGGER.error("Storage group {} memtable {}, io task is interrupted.", storageGroup
               , memTable.getVersion());
           // generally it is because the thread pool is shutdown so the task should be aborted
           break;
@@ -241,14 +247,14 @@ public class MemTableFlushTask {
             this.writer.endChunkGroup();
           }
         } catch (IOException e) {
-          logger.error("Storage group {} memtable {}, io task meets error.", storageGroup,
+          LOGGER.error("Storage group {} memtable {}, io task meets error.", storageGroup,
               memTable.getVersion(), e);
           throw new FlushRunTimeException(e);
         }
         ioTime += System.currentTimeMillis() - starTime;
       }
     }
-    logger.debug("flushing a memtable {} in storage group {}, io cost {}ms", memTable.getVersion(),
+    LOGGER.debug("flushing a memtable {} in storage group {}, io cost {}ms", memTable.getVersion(),
         storageGroup, ioTime);
   };