You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/06/20 14:03:33 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated: fix sync flush bug

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
     new 7b423a3  fix sync flush bug
7b423a3 is described below

commit 7b423a3ccc48dc4fdf625161ffc46b0a3261db20
Author: lta <li...@163.com>
AuthorDate: Thu Jun 20 22:03:06 2019 +0800

    fix sync flush bug
---
 .../iotdb/db/engine/filenodeV2/FlushManager.java   | 16 ++++-----
 .../filenodeV2/UnsealedTsFileProcessorV2.java      | 41 ++++++++++++----------
 .../filenodeV2/UnsealedTsFileProcessorV2Test.java  |  1 -
 3 files changed, 30 insertions(+), 28 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FlushManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FlushManager.java
index 0fd2b94..ebcac70 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FlushManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FlushManager.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.engine.filenodeV2;
 import java.io.IOException;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Future;
 import org.apache.iotdb.db.engine.pool.FlushPoolManager;
 
 public class FlushManager {
@@ -30,28 +31,27 @@ public class FlushManager {
   private FlushPoolManager flushPool = FlushPoolManager.getInstance();
 
   private Runnable flushThread = () -> {
-    UnsealedTsFileProcessorV2 udfProcessor = unsealedTsFileProcessorQueue.poll();
+    UnsealedTsFileProcessorV2 unsealedTsFileProcessor = unsealedTsFileProcessorQueue.poll();
     try {
-      udfProcessor.flushOneMemTable();
+      unsealedTsFileProcessor.flushOneMemTable();
     } catch (IOException e) {
       // TODO do sth
     }
-    udfProcessor.setManagedByFlushManager(false);
-    registerUnsealedTsFileProcessor(udfProcessor);
+    unsealedTsFileProcessor.setManagedByFlushManager(false);
+    registerUnsealedTsFileProcessor(unsealedTsFileProcessor);
   };
 
   /**
    * Add BufferWriteProcessor to asyncFlush manager
    */
-  public boolean registerUnsealedTsFileProcessor(UnsealedTsFileProcessorV2 unsealedTsFileProcessor) {
+  public Future registerUnsealedTsFileProcessor(UnsealedTsFileProcessorV2 unsealedTsFileProcessor) {
     synchronized (unsealedTsFileProcessor) {
       if (!unsealedTsFileProcessor.isManagedByFlushManager() && unsealedTsFileProcessor.getFlushingMemTableSize() > 0) {
         unsealedTsFileProcessorQueue.add(unsealedTsFileProcessor);
         unsealedTsFileProcessor.setManagedByFlushManager(true);
-        flushPool.submit(flushThread);
-        return true;
+        return flushPool.submit(flushThread);
       }
-      return false;
+      return null;
     }
   }
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
index a324929..a5b7b2b 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
@@ -158,23 +158,30 @@ public class UnsealedTsFileProcessorV2 {
       flushingMemTables.addLast(workMemTable);
       FlushManager.getInstance().registerUnsealedTsFileProcessor(this);
       workMemTable = null;
-    }finally {
+    } finally {
       flushQueryLock.writeLock().unlock();
     }
   }
 
   // only for test
   public void syncFlush() {
+    IMemTable tmpMemTable;
+    flushQueryLock.writeLock().lock();
     try {
-      IMemTable tmpMemTable = workMemTable == null ? new EmptyMemTable() : workMemTable;
+      tmpMemTable = workMemTable == null ? new EmptyMemTable() : workMemTable;
       flushingMemTables.addLast(tmpMemTable);
       FlushManager.getInstance().registerUnsealedTsFileProcessor(this);
       workMemTable = null;
-      synchronized (tmpMemTable) {
+    } finally {
+      flushQueryLock.writeLock().unlock();
+    }
+
+    synchronized (tmpMemTable) {
+      try {
         tmpMemTable.wait();
+      } catch (InterruptedException e) {
+        LOGGER.error("wait flush finished meets error", e);
       }
-    } catch (InterruptedException e) {
-      LOGGER.error("wait flush finished meets error", e);
     }
   }
 
@@ -190,7 +197,7 @@ public class UnsealedTsFileProcessorV2 {
   }
 
   public synchronized void asyncClose() {
-    flushingMemTables.add(workMemTable);
+    flushingMemTables.add(workMemTable == null ? new EmptyMemTable() : workMemTable);
     workMemTable = null;
     shouldClose = true;
     FlushManager.getInstance().registerUnsealedTsFileProcessor(this);
@@ -217,20 +224,16 @@ public class UnsealedTsFileProcessorV2 {
     IMemTable memTableToFlush;
     memTableToFlush = flushingMemTables.getFirst();
 
-
     // null memtable only appears when calling asyncClose()
-    if (memTableToFlush != null) {
-      if(memTableToFlush.isManagedByMemPool()) {
-        MemTableFlushTaskV2 flushTask = new MemTableFlushTaskV2(writer, storageGroupName,
-            this::releaseFlushedMemTable);
-        flushTask.flushMemTable(fileSchema, memTableToFlush, versionController.nextVersion());
-      }
-      // for sync flush
-      synchronized (memTableToFlush){
-        memTableToFlush.notify();
-      }
+    if (memTableToFlush.isManagedByMemPool()) {
+      MemTableFlushTaskV2 flushTask = new MemTableFlushTaskV2(writer, storageGroupName,
+          this::releaseFlushedMemTable);
+      flushTask.flushMemTable(fileSchema, memTableToFlush, versionController.nextVersion());
+    }
+    // for sync flush
+    synchronized (memTableToFlush) {
+      memTableToFlush.notify();
     }
-
 
     if (shouldClose && flushingMemTables.isEmpty()) {
       endFile();
@@ -278,7 +281,7 @@ public class UnsealedTsFileProcessorV2 {
   }
 
   public int getFlushingMemTableSize() {
-      return flushingMemTables.size();
+    return flushingMemTables.size();
   }
 
   /**
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
index 8017168..f66268c 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
@@ -169,5 +169,4 @@ public class UnsealedTsFileProcessorV2Test {
     assertTrue(processor.getTsFileResource().isClosed());
 
   }
-
 }
\ No newline at end of file