You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ej...@apache.org on 2020/12/11 03:45:12 UTC

[iotdb] 02/06: finish sliding window version 2

This is an automated email from the ASF dual-hosted git repository.

ejttianyu pushed a commit to branch sliding_mem_table
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 17a465612befe912648ce699ab57ff7aa1fd5483
Author: EJTTianyu <16...@qq.com>
AuthorDate: Mon Dec 7 15:38:44 2020 +0800

    finish sliding window version 2
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  4 +--
 .../engine/storagegroup/StorageGroupProcessor.java | 35 +++++++++++--------
 .../db/engine/storagegroup/TsFileProcessor.java    | 40 +++++++++++++---------
 3 files changed, 45 insertions(+), 34 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 33546f0..c5ad200 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -216,9 +216,9 @@ public class IoTDBConfig {
   private int estimatedSeriesSize = 300;
 
   /**
-   * Whether to enable sliding memory table
+   * Whether to enable sliding memory table, to reduce the CI time, the default value is false
    */
-  private boolean enableSlidingMemTable = true;
+  private boolean enableSlidingMemTable = false;
 
   /**
    * Save the flushing memtable in the memory during the period, can help reduce the unseq ratio, Unit: millis.
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 32c3ac7..b670184 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1699,22 +1699,13 @@ public class StorageGroupProcessor {
               .computeIfAbsent(processor.getTimeRangeId(), id -> new HashMap<>())
               .put(entry.getKey(), entry.getValue());
         }
-      } else {
-        if (processor.isSequence()) {
-          curPartitionDeviceLatestTime = flushingLatestTimeForEachDevice
-              .get(processor.getTimeRangeId());
-        }
-        for (Entry<String, Long> entry : curPartitionDeviceLatestTime.entrySet()) {
-          partitionLatestFlushedTimeForEachDevice
-              .computeIfAbsent(processor.getTimeRangeId(), id -> new HashMap<>())
-              .put(entry.getKey(), entry.getValue());
-          updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(processor.getTimeRangeId(),
-              entry.getKey(), entry.getValue());
-          if (globalLatestFlushedTimeForEachDevice
-              .getOrDefault(entry.getKey(), Long.MIN_VALUE) < entry.getValue()) {
-            globalLatestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue());
-          }
+        if (processor.isUpdateLatestTime()) {
+          updateLatestTime(processor, curPartitionDeviceLatestTime);
         }
+      } else {
+        curPartitionDeviceLatestTime = flushingLatestTimeForEachDevice
+            .get(processor.getTimeRangeId());
+        updateLatestTime(processor, curPartitionDeviceLatestTime);
       }
     } finally {
       flushUpdateUnLock();
@@ -1722,6 +1713,20 @@ public class StorageGroupProcessor {
     return true;
   }
 
+  private void updateLatestTime(TsFileProcessor processor, Map<String, Long> curPartitionDeviceLatestTime){
+    for (Entry<String, Long> entry : curPartitionDeviceLatestTime.entrySet()) {
+      partitionLatestFlushedTimeForEachDevice
+          .computeIfAbsent(processor.getTimeRangeId(), id -> new HashMap<>())
+          .put(entry.getKey(), entry.getValue());
+      updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(processor.getTimeRangeId(),
+          entry.getKey(), entry.getValue());
+      if (globalLatestFlushedTimeForEachDevice
+          .getOrDefault(entry.getKey(), Long.MIN_VALUE) < entry.getValue()) {
+        globalLatestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue());
+      }
+    }
+  }
+
 
   /**
    * used for upgrading
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 3cd1fbe..0d70d54 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -109,6 +109,7 @@ public class TsFileProcessor {
   private IMemTable workMemTable;
 
   private boolean isFlushMemTableAlive = false;
+  private boolean updateLatestTime = false;
 
   private final VersionController versionController;
 
@@ -521,9 +522,6 @@ public class TsFileProcessor {
 
       // we have to add the memtable into flushingList first and then set the shouldClose tag.
       // see https://issues.apache.org/jira/browse/IOTDB-510
-//      IMemTable tmpFlushMemTable = flushingMemTable == null || flushingMemTable.memSize() == 0
-//          ? new NotifyFlushMemTable()
-//          : flushingMemTable;
       IMemTable tmpWorkMemTable = workMemTable == null || workMemTable.memSize() == 0
           ? new NotifyFlushMemTable()
           : workMemTable;
@@ -531,7 +529,7 @@ public class TsFileProcessor {
       try {
         // When invoke closing TsFile after insert data to memTable, we shouldn't flush until invoke
         // flushing memTable in System module.
-        addAMemtableIntoFlushingList(tmpWorkMemTable);
+        addAMemtableIntoFlushingList(tmpWorkMemTable, true);
         logger.info("Memtable {} has been added to flushing list", tmpWorkMemTable);
         shouldClose = true;
       } catch (Exception e) {
@@ -559,16 +557,12 @@ public class TsFileProcessor {
           .debug(FLUSH_QUERY_WRITE_LOCKED, storageGroupName, tsFileResource.getTsFile().getName());
     }
     try {
-      if (flushingMemTable == null) {
-        tmpMemTable = workMemTable == null ? new NotifyFlushMemTable() : workMemTable;
-      } else {
-        tmpMemTable = flushingMemTable;
-      }
+      tmpMemTable = workMemTable == null ? new NotifyFlushMemTable() : workMemTable;
       if (logger.isDebugEnabled() && tmpMemTable.isSignalMemTable()) {
         logger.debug("{}: {} add a signal memtable into flushing memtable list when sync flush",
             storageGroupName, tsFileResource.getTsFile().getName());
       }
-      addAMemtableIntoFlushingList(tmpMemTable);
+      addAMemtableIntoFlushingList(tmpMemTable, false);
     } finally {
       flushQueryLock.writeLock().unlock();
       if (logger.isDebugEnabled()) {
@@ -601,6 +595,15 @@ public class TsFileProcessor {
    * put the working memtable into flushing list and set the working memtable to null
    */
   public void asyncFlush() {
+    if (config.isEnableSlidingMemTable()){
+      while (isManagedByFlushManager() && flushingMemTable != null) {
+        try {
+          TimeUnit.MILLISECONDS.sleep(waitingTimeWhenInsertBlocked);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+    }
     flushQueryLock.writeLock().lock();
     if (logger.isDebugEnabled()) {
       logger
@@ -612,12 +615,8 @@ public class TsFileProcessor {
       }
       logger.info("Async flush a memtable to tsfile: {}",
           tsFileResource.getTsFile().getAbsolutePath());
-      if (config.isEnableSlidingMemTable()){
-        while (flushingMemTable != null) {
-          TimeUnit.MILLISECONDS.sleep(waitingTimeWhenInsertBlocked);
-        }
-      }
-      addAMemtableIntoFlushingList(workMemTable);
+
+      addAMemtableIntoFlushingList(workMemTable, false);
     } catch (Exception e) {
       logger.error("{}: {} add a memtable into flushing list failed", storageGroupName,
           tsFileResource.getTsFile().getName(), e);
@@ -635,7 +634,7 @@ public class TsFileProcessor {
    * queue, set the current working memtable as null and then register the tsfileProcessor into the
    * flushManager again.
    */
-  private void addAMemtableIntoFlushingList(IMemTable tobeFlushed) throws IOException {
+  private void addAMemtableIntoFlushingList(IMemTable tobeFlushed, boolean isClosing) throws IOException {
     if (!tobeFlushed.isSignalMemTable() && tobeFlushed.memSize() == 0) {
       logger.warn("This normal memtable is empty, skip it in flush. {}: {} Memetable info: {}",
           storageGroupName, tsFileResource.getTsFile().getName(), tobeFlushed.getMemTableMap());
@@ -663,6 +662,9 @@ public class TsFileProcessor {
       flushingMemTable = workMemTable;
       isFlushMemTableAlive = true;
     }
+    if(isClosing){
+      updateLatestTime = true;
+    }
     updateLatestFlushTimeCallback.call(this);
     workMemTable = null;
     shouldFlush = false;
@@ -1049,4 +1051,8 @@ public class TsFileProcessor {
   public boolean isShouldClose() {
     return shouldClose;
   }
+
+  public boolean isUpdateLatestTime() {
+    return updateLatestTime;
+  }
 }