You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ej...@apache.org on 2020/12/07 07:39:15 UTC

[iotdb] branch dev_sliding_mem_table updated: finish sliding window version 2

This is an automated email from the ASF dual-hosted git repository.

ejttianyu pushed a commit to branch dev_sliding_mem_table
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev_sliding_mem_table by this push:
     new 4ed7dd8  finish sliding window version 2
4ed7dd8 is described below

commit 4ed7dd87b4dbddd2b3380956f9bbdb86ffdb62a1
Author: EJTTianyu <16...@qq.com>
AuthorDate: Mon Dec 7 15:38:44 2020 +0800

    finish sliding window version 2
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  4 +--
 .../engine/storagegroup/StorageGroupProcessor.java | 35 ++++++++++--------
 .../db/engine/storagegroup/TsFileProcessor.java    | 41 ++++++++++++----------
 3 files changed, 45 insertions(+), 35 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index a337152..13e9d3b 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -210,9 +210,9 @@ public class IoTDBConfig {
   private int estimatedSeriesSize = 300;
 
   /**
-   * Whether to enable sliding memory table
+   * Whether to enable sliding memory table, to reduce the CI time, the default value is false
    */
-  private boolean enableSlidingMemTable = true;
+  private boolean enableSlidingMemTable = false;
 
   /**
    * Save the flushing memtable in the memory during the period, can help reduce the unseq ratio, Unit: millis.
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index b431823..f3896a6 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1673,22 +1673,13 @@ public class StorageGroupProcessor {
               .computeIfAbsent(processor.getTimeRangeId(), id -> new HashMap<>())
               .put(entry.getKey(), entry.getValue());
         }
-      } else {
-        if (processor.isSequence()) {
-          curPartitionDeviceLatestTime = flushingLatestTimeForEachDevice
-              .get(processor.getTimeRangeId());
-        }
-        for (Entry<String, Long> entry : curPartitionDeviceLatestTime.entrySet()) {
-          partitionLatestFlushedTimeForEachDevice
-              .computeIfAbsent(processor.getTimeRangeId(), id -> new HashMap<>())
-              .put(entry.getKey(), entry.getValue());
-          updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(processor.getTimeRangeId(),
-              entry.getKey(), entry.getValue());
-          if (globalLatestFlushedTimeForEachDevice
-              .getOrDefault(entry.getKey(), Long.MIN_VALUE) < entry.getValue()) {
-            globalLatestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue());
-          }
+        if (processor.isUpdateLatestTime()) {
+          updateLatestTime(processor, curPartitionDeviceLatestTime);
         }
+      } else {
+        curPartitionDeviceLatestTime = flushingLatestTimeForEachDevice
+            .get(processor.getTimeRangeId());
+        updateLatestTime(processor, curPartitionDeviceLatestTime);
       }
     } finally {
       flushUpdateUnLock();
@@ -1696,6 +1687,20 @@ public class StorageGroupProcessor {
     return true;
   }
 
+  private void updateLatestTime(TsFileProcessor processor, Map<String, Long> curPartitionDeviceLatestTime){
+    for (Entry<String, Long> entry : curPartitionDeviceLatestTime.entrySet()) {
+      partitionLatestFlushedTimeForEachDevice
+          .computeIfAbsent(processor.getTimeRangeId(), id -> new HashMap<>())
+          .put(entry.getKey(), entry.getValue());
+      updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(processor.getTimeRangeId(),
+          entry.getKey(), entry.getValue());
+      if (globalLatestFlushedTimeForEachDevice
+          .getOrDefault(entry.getKey(), Long.MIN_VALUE) < entry.getValue()) {
+        globalLatestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue());
+      }
+    }
+  }
+
 
   /**
    * used for upgrading
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 32dec07..a4d0a0a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -111,6 +111,7 @@ public class TsFileProcessor {
   private IMemTable workMemTable;
 
   private boolean isFlushMemTableAlive = false;
+  private boolean updateLatestTime = false;
 
   private final VersionController versionController;
 
@@ -530,9 +531,6 @@ public class TsFileProcessor {
 
       // we have to add the memtable into flushingList first and then set the shouldClose tag.
       // see https://issues.apache.org/jira/browse/IOTDB-510
-//      IMemTable tmpFlushMemTable = flushingMemTable == null || flushingMemTable.memSize() == 0
-//          ? new NotifyFlushMemTable()
-//          : flushingMemTable;
       IMemTable tmpWorkMemTable = workMemTable == null || workMemTable.memSize() == 0
           ? new NotifyFlushMemTable()
           : workMemTable;
@@ -540,8 +538,7 @@ public class TsFileProcessor {
       try {
         // When invoke closing TsFile after insert data to memTable, we shouldn't flush until invoke
         // flushing memTable in System module.
-//        addAMemtableIntoFlushingList(tmpFlushMemTable);
-        addAMemtableIntoFlushingList(tmpWorkMemTable);
+        addAMemtableIntoFlushingList(tmpWorkMemTable, true);
         shouldClose = true;
         tsFileResource.setCloseFlag();
       } catch (Exception e) {
@@ -569,16 +566,12 @@ public class TsFileProcessor {
           .debug(FLUSH_QUERY_WRITE_LOCKED, storageGroupName, tsFileResource.getTsFile().getName());
     }
     try {
-      if (flushingMemTable == null) {
-        tmpMemTable = workMemTable == null ? new NotifyFlushMemTable() : workMemTable;
-      } else {
-        tmpMemTable = flushingMemTable;
-      }
+      tmpMemTable = workMemTable == null ? new NotifyFlushMemTable() : workMemTable;
       if (logger.isDebugEnabled() && tmpMemTable.isSignalMemTable()) {
         logger.debug("{}: {} add a signal memtable into flushing memtable list when sync flush",
             storageGroupName, tsFileResource.getTsFile().getName());
       }
-      addAMemtableIntoFlushingList(tmpMemTable);
+      addAMemtableIntoFlushingList(tmpMemTable, false);
     } finally {
       flushQueryLock.writeLock().unlock();
       if (logger.isDebugEnabled()) {
@@ -611,6 +604,15 @@ public class TsFileProcessor {
    * put the working memtable into flushing list and set the working memtable to null
    */
   public void asyncFlush() {
+    if (config.isEnableSlidingMemTable()){
+      while (isManagedByFlushManager() && flushingMemTable != null) {
+        try {
+          TimeUnit.MILLISECONDS.sleep(waitingTimeWhenInsertBlocked);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+    }
     flushQueryLock.writeLock().lock();
     if (logger.isDebugEnabled()) {
       logger
@@ -622,12 +624,8 @@ public class TsFileProcessor {
       }
       logger.info("Async flush a memtable to tsfile: {}",
           tsFileResource.getTsFile().getAbsolutePath());
-      if (config.isEnableSlidingMemTable()){
-        while (flushingMemTable != null) {
-          TimeUnit.MILLISECONDS.sleep(waitingTimeWhenInsertBlocked);
-        }
-      }
-      addAMemtableIntoFlushingList(workMemTable);
+
+      addAMemtableIntoFlushingList(workMemTable, false);
     } catch (Exception e) {
       logger.error("{}: {} add a memtable into flushing list failed", storageGroupName,
           tsFileResource.getTsFile().getName(), e);
@@ -645,7 +643,7 @@ public class TsFileProcessor {
    * queue, set the current working memtable as null and then register the tsfileProcessor into the
    * flushManager again.
    */
-  private void addAMemtableIntoFlushingList(IMemTable tobeFlushed) throws IOException {
+  private void addAMemtableIntoFlushingList(IMemTable tobeFlushed, boolean isClosing) throws IOException {
     if (!tobeFlushed.isSignalMemTable() && tobeFlushed.memSize() == 0) {
       logger.warn("This normal memtable is empty, skip it in flush. {}: {} Memetable info: {}",
           storageGroupName, tsFileResource.getTsFile().getName(), tobeFlushed.getMemTableMap());
@@ -673,6 +671,9 @@ public class TsFileProcessor {
       flushingMemTable = workMemTable;
       isFlushMemTableAlive = true;
     }
+    if(isClosing){
+      updateLatestTime = true;
+    }
     updateLatestFlushTimeCallback.call(this);
     workMemTable = null;
     shouldFlush = false;
@@ -1049,4 +1050,8 @@ public class TsFileProcessor {
   public boolean isShouldClose() {
     return shouldClose;
   }
+
+  public boolean isUpdateLatestTime() {
+    return updateLatestTime;
+  }
 }