You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ej...@apache.org on 2020/12/11 03:45:15 UTC

[iotdb] 05/06: finish sliding mem table version 5

This is an automated email from the ASF dual-hosted git repository.

ejttianyu pushed a commit to branch sliding_mem_table
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 066a3a749b16440fe68df9d122180acae5c4859f
Author: EJTTianyu <16...@qq.com>
AuthorDate: Wed Dec 9 18:56:43 2020 +0800

    finish sliding mem table version 5
---
 .../org/apache/iotdb/db/engine/flush/FlushManager.java     |  6 ++----
 .../db/engine/storagegroup/StorageGroupProcessor.java      | 14 ++++++++------
 .../iotdb/db/engine/storagegroup/TsFileProcessor.java      | 11 ++++-------
 .../db/engine/storagegroup/StorageGroupProcessorTest.java  | 12 ++++++++----
 .../iotdb/db/engine/storagegroup/TsFileProcessorTest.java  |  8 ++++----
 5 files changed, 26 insertions(+), 25 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
index 2a40384..b4041d3 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
@@ -108,10 +108,8 @@ public class FlushManager implements FlushManagerMBean, IService {
         }
       }
       if (tsFileProcessor.isSequence() && IoTDBDescriptor.getInstance().getConfig()
-          .isEnableSlidingMemTable()) {
-        tsFileProcessor.getUpdateLatestFlushTimeCallback().call(tsFileProcessor);
-        tsFileProcessor.setFlushingMemTable(null);
-        tsFileProcessor.setFlushMemTableAlive(false);
+          .isEnableSlidingMemTable() && tsFileProcessor.isFlushMemTableAlive()) {
+        tsFileProcessor.getUpdateLatestFlushTimeCallback().call(tsFileProcessor, true);
       }
       tsFileProcessor.flushOneMemTable();
       tsFileProcessor.setManagedByFlushManager(false);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index b3b1f91..7025017 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1676,11 +1676,11 @@ public class StorageGroupProcessor {
     }
   }
 
-  private boolean unsequenceFlushCallback(TsFileProcessor processor) {
+  private boolean unsequenceFlushCallback(TsFileProcessor processor, boolean updateLatest) {
     return true;
   }
 
-  private boolean updateLatestFlushTimeCallback(TsFileProcessor processor) {
+  private boolean updateLatestFlushTimeCallback(TsFileProcessor processor, boolean updateLatest) {
     flushUpdateLock();
     try {
       // update the largest timestamp in the last flushing memtable
@@ -1694,14 +1694,14 @@ public class StorageGroupProcessor {
         return false;
       }
 
-      if (processor.isFlushMemTableAlive()) {
-        // if flushing mem table is alive, use latestTimeForEachDevice to update flushingLatestTimeForEachDevice
+      if (config.isEnableSlidingMemTable() && !updateLatest) {
+        // use latestTimeForEachDevice to update flushingLatestTimeForEachDevice
         for (Entry<String, Long> entry : curPartitionDeviceLatestTime.entrySet()) {
           flushingLatestTimeForEachDevice
               .computeIfAbsent(processor.getTimeRangeId(), id -> new HashMap<>())
               .put(entry.getKey(), entry.getValue());
         }
-        // when the processor is closing, update partitionLatestFlushedTimeForEachDevice
+        // when the processor is closing, update partitionLatestFlushedTimeForEachDevice immediately
         if (processor.isUpdateLatestTime()) {
           updateLatestTime(processor, curPartitionDeviceLatestTime);
         }
@@ -1710,6 +1710,8 @@ public class StorageGroupProcessor {
         if (config.isEnableSlidingMemTable()) {
           curPartitionDeviceLatestTime = flushingLatestTimeForEachDevice
               .get(processor.getTimeRangeId());
+          processor.setFlushingMemTable(null);
+          processor.setFlushMemTableAlive(false);
         }
         updateLatestTime(processor, curPartitionDeviceLatestTime);
       }
@@ -2574,7 +2576,7 @@ public class StorageGroupProcessor {
   @FunctionalInterface
   public interface UpdateEndTimeCallBack {
 
-    boolean call(TsFileProcessor caller);
+    boolean call(TsFileProcessor caller, boolean updateLatest);
   }
 
   @FunctionalInterface
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index c09d660..547d0e8 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -596,12 +596,9 @@ public class TsFileProcessor {
    */
   public void asyncFlush() {
     if (config.isEnableSlidingMemTable()){
-      while (flushingMemTable != null) {
-        try {
-          TimeUnit.MILLISECONDS.sleep(waitingTimeWhenInsertBlocked);
-        } catch (InterruptedException e) {
-          logger.error("async flush failed because the flushing mem table is still alive", e);
-        }
+      if (flushingMemTable != null) {
+        // if previous flushing mem table is still alive, update partitionLatestFlushedTimeForEachDevice
+        updateLatestFlushTimeCallback.call(this, true);
       }
     }
     flushQueryLock.writeLock().lock();
@@ -665,7 +662,7 @@ public class TsFileProcessor {
     if(isClosing){
       updateLatestTime = true;
     }
-    updateLatestFlushTimeCallback.call(this);
+    updateLatestFlushTimeCallback.call(this, false);
     workMemTable = null;
     shouldFlush = false;
     FlushManager.getInstance().registerTsFileProcessor(this);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index 707b76f..d786d1f 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -139,9 +139,10 @@ public class StorageGroupProcessorTest {
 
   @Test
   public void testSlidingMemTable()
-      throws WriteProcessException, IOException, MetadataException {
-    TSRecord record;
+      throws WriteProcessException, QueryProcessException, MetadataException {
+    IoTDBDescriptor.getInstance().getConfig().setEnableSlidingMemTable(true);
 
+    TSRecord record;
     for (int j = 5; j <= 10; j++) {
       record = new TSRecord(j, deviceId);
       record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
@@ -157,9 +158,12 @@ public class StorageGroupProcessorTest {
       record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
       processor.insert(new InsertRowPlan(record));
     }
-//    System.exit(0);
     processor.syncCloseAllWorkingTsFileProcessors();
-
+    QueryDataSource queryDataSource = processor
+        .query(new PartialPath(deviceId), measurementId, context,
+            null, null);
+    Assert.assertEquals(1, queryDataSource.getSeqResources().size());
+    IoTDBDescriptor.getInstance().getConfig().setEnableSlidingMemTable(false);
   }
 
   @Test
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
index 9ba80bb..e73c135 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
@@ -87,7 +87,7 @@ public class TsFileProcessorTest {
     logger.info("testWriteAndFlush begin..");
     processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath), sgInfo,
         SysTimeVersionController.INSTANCE, this::closeTsFileProcessor,
-        (tsFileProcessor) -> true, true);
+        (tsFileProcessor, updateLatest) -> true, true);
 
     TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
     processor.setTsFileProcessorInfo(tsFileProcessorInfo);
@@ -143,7 +143,7 @@ public class TsFileProcessorTest {
     logger.info("testWriteAndRestoreMetadata begin..");
     processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath), sgInfo,
         SysTimeVersionController.INSTANCE, this::closeTsFileProcessor,
-        (tsFileProcessor) -> true, true);
+        (tsFileProcessor, updateLatest) -> true, true);
 
     TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
     processor.setTsFileProcessorInfo(tsFileProcessorInfo);
@@ -226,7 +226,7 @@ public class TsFileProcessorTest {
     logger.info("testWriteAndRestoreMetadata begin..");
     processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath), sgInfo,
         SysTimeVersionController.INSTANCE, this::closeTsFileProcessor,
-        (tsFileProcessor) -> true, true);
+        (tsFileProcessor, updateLatest) -> true, true);
 
     TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
     processor.setTsFileProcessorInfo(tsFileProcessorInfo);
@@ -267,7 +267,7 @@ public class TsFileProcessorTest {
     logger.info("testWriteAndRestoreMetadata begin..");
     processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath), sgInfo,
         SysTimeVersionController.INSTANCE, this::closeTsFileProcessor,
-        (tsFileProcessor) -> true, true);
+        (tsFileProcessor, updateLatest) -> true, true);
 
     TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
     processor.setTsFileProcessorInfo(tsFileProcessorInfo);