You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/19 03:39:45 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated: add force flush

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
     new 8c82ad9  add force flush
8c82ad9 is described below

commit 8c82ad915d500838c08affd8c523c33bfb774538
Author: qiaojialin <64...@qq.com>
AuthorDate: Wed Jun 19 11:39:33 2019 +0800

    add force flush
---
 .../bufferwriteV2/BufferWriteProcessorV2.java      | 42 +++++++++------
 .../db/engine/bufferwriteV2/FlushManager.java      |  7 ++-
 .../db/engine/filenodeV2/FileNodeProcessorV2.java  | 60 ++++++++++++++++------
 .../db/engine/filenodeV2/TsFileResourceV2.java     |  1 +
 4 files changed, 76 insertions(+), 34 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/BufferWriteProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/BufferWriteProcessorV2.java
index 3ecf5e7..b5c97d8 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/BufferWriteProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/BufferWriteProcessorV2.java
@@ -52,12 +52,12 @@ public class BufferWriteProcessorV2 {
 
   private volatile boolean managedByFlushManager;
 
-  private ReadWriteLock lock = new ReentrantReadWriteLock();
+  private ReadWriteLock flushQueryLock = new ReentrantReadWriteLock();
 
   /**
    * true: to be closed
    */
-  private volatile boolean closing;
+  private volatile boolean shouldClose;
 
   private IMemTable workMemTable;
 
@@ -114,12 +114,12 @@ public class BufferWriteProcessorV2 {
    * @param memTable
    */
   private void removeFlushedMemTable(Object memTable) {
-    lock.writeLock().lock();
-    writer.makeMetadataVisible();
+    flushQueryLock.writeLock().lock();
     try {
+      writer.makeMetadataVisible();
       flushingMemTables.remove(memTable);
     } finally {
-      lock.writeLock().unlock();
+      flushQueryLock.writeLock().unlock();
     }
   }
 
@@ -137,23 +137,26 @@ public class BufferWriteProcessorV2 {
     workMemTable = null;
   }
 
-  public void flushOneMemTable() {
+  public void flushOneMemTable() throws IOException {
     IMemTable memTableToFlush = flushingMemTables.pollFirst();
-    MemTableFlushTaskV2 flushTask = new MemTableFlushTaskV2(writer, storageGroupName, this::removeFlushedMemTable);
-    flushTask.flushMemTable(fileSchema, memTableToFlush, versionController.nextVersion());
-
-    if (closing && flushingMemTables.isEmpty()) {
+    // null memtable only appears when calling forceClose()
+    if (memTableToFlush != null) {
+      MemTableFlushTaskV2 flushTask = new MemTableFlushTaskV2(writer, storageGroupName, this::removeFlushedMemTable);
+      flushTask.flushMemTable(fileSchema, memTableToFlush, versionController.nextVersion());
+    }
 
+    if (shouldClose && flushingMemTables.isEmpty()) {
+      endFile();
     }
   }
 
-  public void close() throws IOException {
+  private void endFile() throws IOException {
     long closeStartTime = System.currentTimeMillis();
     writer.endFile(fileSchema);
     //FIXME suppose the flushMetadata-thread-pool is 2.
-    // then if a flushMetadata task and a close task are running in the same time
-    // and the close task is faster, then writer == null, and the flushMetadata task will throw nullpointer
-    // exception. Add "synchronized" keyword on both flushMetadata and close may solve the issue.
+    // then if a flushMetadata task and a endFile task are running in the same time
+    // and the endFile task is faster, then writer == null, and the flushMetadata task will throw nullpointer
+    // exception. Add "synchronized" keyword on both flushMetadata and endFile may solve the issue.
     writer = null;
 
     // remove this processor from Closing list in FileNodeProcessor
@@ -174,14 +177,21 @@ public class BufferWriteProcessorV2 {
     }
   }
 
+  public void forceClose() {
+    flushingMemTables.add(workMemTable);
+    workMemTable = null;
+    shouldClose = true;
+    FlushManager.getInstance().registerBWProcessor(this);
+  }
+
   public boolean shouldClose() {
     long fileSize = tsFileResource.getFileSize();
     long fileSizeThreshold = IoTDBDescriptor.getInstance().getConfig().getBufferwriteFileSizeThreshold();
     return fileSize > fileSizeThreshold;
   }
 
-  public void setClosing() {
-    closing = true;
+  public void close() {
+    shouldClose = true;
   }
 
   public boolean isManagedByFlushManager() {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/FlushManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/FlushManager.java
index 085dbfe..b8345ce 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/FlushManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/FlushManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.engine.bufferwriteV2;
 
+import java.io.IOException;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import org.apache.iotdb.db.engine.pool.FlushPoolManager;
 
@@ -29,7 +30,11 @@ public class FlushManager {
 
   private Runnable flushThread = () -> {
     BufferWriteProcessorV2 bwProcessor = bwProcessorQueue.poll();
-    bwProcessor.flushOneMemTable();
+    try {
+      bwProcessor.flushOneMemTable();
+    } catch (IOException e) {
+      // TODO do sth
+    }
     bwProcessor.setManagedByFlushManager(false);
     registerBWProcessor(bwProcessor);
   };
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index ef46166..1bf61ae 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -170,7 +170,6 @@ public class FileNodeProcessorV2 {
 
 
   public boolean insert(TSRecord tsRecord) {
-
     lock.writeLock().lock();
     boolean result = true;
 
@@ -181,7 +180,7 @@ public class FileNodeProcessorV2 {
         String baseDir = directories.getNextFolderForTsfile();
         String filePath = Paths.get(baseDir, storageGroup, tsRecord.time + "").toString();
         workBufferWriteProcessor = new BufferWriteProcessorV2(storageGroup, new File(filePath),
-            fileSchema, versionController, this::closeBufferWriteProcessor);
+            fileSchema, versionController, this::closeBufferWriteProcessorCallBack);
         newFileNodes.add(workBufferWriteProcessor.getTsFileResource());
         // TODO check if the disk is full
       }
@@ -202,19 +201,7 @@ public class FileNodeProcessorV2 {
 
         // check memtable size and may asyncFlush the workMemtable
         if (workBufferWriteProcessor.shouldFlush()) {
-          workBufferWriteProcessor.asyncFlush();
-
-          // update the largest timestamp in the last flushing memtable
-          for (Entry<String, Long> entry : latestTimeMap.entrySet()) {
-            latestFlushTimeMap.put(entry.getKey(), entry.getValue());
-          }
-        }
-
-        // check file size and may close the BufferWrite
-        if (workBufferWriteProcessor.shouldClose()) {
-          closingBufferWriteProcessor.add(workBufferWriteProcessor);
-          workBufferWriteProcessor.setClosing();
-          workBufferWriteProcessor = null;
+          flushAndCheckClose();
         }
 
       } else {
@@ -229,11 +216,39 @@ public class FileNodeProcessorV2 {
     return result;
   }
 
+  /**
+   * ensure there must be a flush thread submitted after close() is called,
+   * therefore the close task will be executed by a flush thread.
+   * -- said by qiaojialin
+   *
+   * only called by insert(), thread-safety should be ensured by caller
+   */
+  private void flushAndCheckClose() {
+    boolean shouldClose = false;
+    // check file size and may close the BufferWrite
+    if (workBufferWriteProcessor.shouldClose()) {
+      closingBufferWriteProcessor.add(workBufferWriteProcessor);
+      workBufferWriteProcessor.close();
+      shouldClose = true;
+    }
+
+    workBufferWriteProcessor.asyncFlush();
+
+    if (shouldClose) {
+      workBufferWriteProcessor = null;
+    }
+
+    // update the largest timestamp in the last flushing memtable
+    for (Entry<String, Long> entry : latestTimeMap.entrySet()) {
+      latestFlushTimeMap.put(entry.getKey(), entry.getValue());
+    }
+  }
+
 
   /**
    * return the memtable to MemTablePool and make metadata in writer visible
    */
-  private void closeBufferWriteProcessor(Object bufferWriteProcessor) {
+  private void closeBufferWriteProcessorCallBack(Object bufferWriteProcessor) {
     closingBufferWriteProcessor.remove((BufferWriteProcessorV2) bufferWriteProcessor);
     synchronized (fileNodeProcessorStore) {
       fileNodeProcessorStore.setLastUpdateTimeMap(latestTimeMap);
@@ -257,5 +272,16 @@ public class FileNodeProcessorV2 {
     }
   }
 
-
+  public void forceClose() {
+    lock.writeLock().lock();
+    try {
+      if (workBufferWriteProcessor != null) {
+        closingBufferWriteProcessor.add(workBufferWriteProcessor);
+        workBufferWriteProcessor.forceClose();
+        workBufferWriteProcessor = null;
+      }
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
index a459f3e..b49c00a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
@@ -38,6 +38,7 @@ public class TsFileResourceV2 {
   private Map<String, Long> startTimeMap;
 
   // device -> end time
+  // null if it's an unsealed tsfile
   private Map<String, Long> endTimeMap;
 
   private transient ModificationFile modFile;