You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/21 06:58:46 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated: fix make metadata visible bug

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
     new 19bb5ad  fix make metadata visible bug
19bb5ad is described below

commit 19bb5ad1cd1ccfc82f457fa42db4dbc258c989e2
Author: qiaojialin <64...@qq.com>
AuthorDate: Fri Jun 21 14:58:35 2019 +0800

    fix make metadata visible bug
---
 .../db/engine/filenodeV2/FileNodeProcessorV2.java  | 84 ++++++++++-----------
 .../filenodeV2/UnsealedTsFileProcessorV2.java      | 88 +++++++++++-----------
 .../db/engine/memtable/MemTableFlushTaskV2.java    |  1 -
 3 files changed, 86 insertions(+), 87 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index e2485d2..f1b05fe 100755
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -287,7 +287,7 @@ public class FileNodeProcessorV2 {
 
   /**
    * ensure there must be a flush thread submitted after setCloseMark() is called, therefore the setCloseMark task
-   * will be executed by a flush thread. -- said by qiaojialin
+   * will be executed by a flush thread.
    *
    * only called by insert(), thread-safety should be ensured by caller
    */
@@ -308,47 +308,6 @@ public class FileNodeProcessorV2 {
     unsealedTsFileProcessor.asyncFlush();
   }
 
-
-  public boolean updateLatestFlushTimeCallback() {
-    lock.writeLock().lock();
-    try {
-      // update the largest timestamp in the last flushing memtable
-      for (Entry<String, Long> entry : latestTimeForEachDevice.entrySet()) {
-        latestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue());
-      }
-    } finally {
-      lock.writeLock().unlock();
-    }
-    return true;
-  }
-
-  /**
-   * put the memtable back to the MemTablePool and make the metadata in writer visible
-   */
-  // TODO please consider concurrency with query and insert method.
-  public void closeUnsealedTsFileProcessorCallback(UnsealedTsFileProcessorV2 unsealedTsFileProcessor) {
-    lock.writeLock().lock();
-    try {
-      if (closingSequenceTsFileProcessor.contains(unsealedTsFileProcessor)) {
-        closingSequenceTsFileProcessor.remove(unsealedTsFileProcessor);
-      } else {
-        closingUnSequenceTsFileProcessor.remove(unsealedTsFileProcessor);
-      }
-      // end time with one start time
-      TsFileResourceV2 resource = unsealedTsFileProcessor.getTsFileResource();
-      synchronized (resource) {
-        for (Entry<String, Long> startTime : resource.getStartTimeMap().entrySet()) {
-          String deviceId = startTime.getKey();
-          resource.getEndTimeMap().put(deviceId, latestTimeForEachDevice.get(deviceId));
-        }
-        resource.setClosed(true);
-      }
-      closeFileNodeCondition.signal();
-    }finally {
-      lock.writeLock().unlock();
-    }
-  }
-
   public void asyncForceClose() {
     lock.writeLock().lock();
     try {
@@ -415,6 +374,47 @@ public class FileNodeProcessorV2 {
   }
 
 
+  public boolean updateLatestFlushTimeCallback() {
+    lock.writeLock().lock();
+    try {
+      // update the largest timestamp in the last flushing memtable
+      for (Entry<String, Long> entry : latestTimeForEachDevice.entrySet()) {
+        latestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue());
+      }
+    } finally {
+      lock.writeLock().unlock();
+    }
+    return true;
+  }
+
+  /**
+   * put the memtable back to the MemTablePool and make the metadata in writer visible
+   */
+  // TODO please consider concurrency with query and insert method.
+  public void closeUnsealedTsFileProcessorCallback(UnsealedTsFileProcessorV2 unsealedTsFileProcessor) {
+    lock.writeLock().lock();
+    try {
+      if (closingSequenceTsFileProcessor.contains(unsealedTsFileProcessor)) {
+        closingSequenceTsFileProcessor.remove(unsealedTsFileProcessor);
+      } else {
+        closingUnSequenceTsFileProcessor.remove(unsealedTsFileProcessor);
+      }
+      // end time with one start time
+      TsFileResourceV2 resource = unsealedTsFileProcessor.getTsFileResource();
+      synchronized (resource) {
+        for (Entry<String, Long> startTime : resource.getStartTimeMap().entrySet()) {
+          String deviceId = startTime.getKey();
+          resource.getEndTimeMap().put(deviceId, latestTimeForEachDevice.get(deviceId));
+        }
+        resource.setClosed(true);
+      }
+      closeFileNodeCondition.signal();
+    }finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+
   public UnsealedTsFileProcessorV2 getWorkSequenceTsFileProcessor() {
     return workSequenceTsFileProcessor;
   }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
index 12c42ef..e365ca6 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
@@ -130,21 +130,6 @@ public class UnsealedTsFileProcessorV2 {
     return tsFileResource;
   }
 
-  /**
-   * return the memtable to MemTablePool and make metadata in writer visible
-   */
-  private void releaseFlushedMemTable(IMemTable memTable) {
-    flushQueryLock.writeLock().lock();
-    try {
-      writer.makeMetadataVisible();
-      flushingMemTables.remove(memTable);
-      MemTablePool.getInstance().putBack(memTable, storageGroupName);
-    } finally {
-      flushQueryLock.writeLock().unlock();
-    }
-  }
-
-
   public boolean shouldFlush() {
     return workMemTable.memSize() > TSFileDescriptor.getInstance().getConfig().groupSizeInByte;
   }
@@ -190,38 +175,21 @@ public class UnsealedTsFileProcessorV2 {
     }
   }
 
-  public boolean shouldClose() {
-    long fileSize = tsFileResource.getFileSize();
-    long fileSizeThreshold = IoTDBDescriptor.getInstance().getConfig()
-        .getBufferwriteFileSizeThreshold();
-    return fileSize > fileSizeThreshold;
-  }
-
-  public void setCloseMark() {
-    shouldClose = true;
-  }
-
-  public synchronized void asyncClose() {
-    flushingMemTables.add(workMemTable == null ? new EmptyMemTable() : workMemTable);
-    FlushManager.getInstance().registerUnsealedTsFileProcessor(this);
-    flushUpdateLatestFlushTimeCallback.get();
-    shouldClose = true;
-    workMemTable = null;
-  }
 
-
-  public void syncClose() {
-    asyncClose();
-    synchronized (flushingMemTables) {
-      try {
-        flushingMemTables.wait();
-      } catch (InterruptedException e) {
-        LOGGER.error("wait close interrupted", e);
-      }
+  /**
+   * return the memtable to MemTablePool and make metadata in writer visible
+   */
+  private void releaseFlushedMemTableCallback(IMemTable memTable) {
+    flushQueryLock.writeLock().lock();
+    try {
+      writer.makeMetadataVisible();
+      flushingMemTables.remove(memTable);
+      MemTablePool.getInstance().putBack(memTable, storageGroupName);
+    } finally {
+      flushQueryLock.writeLock().unlock();
     }
   }
 
-
   /**
    * Take the first MemTable from the flushingMemTables and flush it. Called by a flush thread of
    * the flush manager pool
@@ -233,7 +201,7 @@ public class UnsealedTsFileProcessorV2 {
     // null memtable only appears when calling asyncClose()
     if (memTableToFlush.isManagedByMemPool()) {
       MemTableFlushTaskV2 flushTask = new MemTableFlushTaskV2(writer, storageGroupName,
-          this::releaseFlushedMemTable);
+          this::releaseFlushedMemTableCallback);
       flushTask.flushMemTable(fileSchema, memTableToFlush, versionController.nextVersion());
     }
     // for sync flush
@@ -278,6 +246,37 @@ public class UnsealedTsFileProcessorV2 {
     }
   }
 
+
+  public boolean shouldClose() {
+    long fileSize = tsFileResource.getFileSize();
+    long fileSizeThreshold = IoTDBDescriptor.getInstance().getConfig()
+        .getBufferwriteFileSizeThreshold();
+    return fileSize > fileSizeThreshold;
+  }
+
+  public void setCloseMark() {
+    shouldClose = true;
+  }
+
+  public synchronized void asyncClose() {
+    flushingMemTables.add(workMemTable == null ? new EmptyMemTable() : workMemTable);
+    FlushManager.getInstance().registerUnsealedTsFileProcessor(this);
+    flushUpdateLatestFlushTimeCallback.get();
+    shouldClose = true;
+    workMemTable = null;
+  }
+
+  public void syncClose() {
+    asyncClose();
+    synchronized (flushingMemTables) {
+      try {
+        flushingMemTables.wait();
+      } catch (InterruptedException e) {
+        LOGGER.error("wait close interrupted", e);
+      }
+    }
+  }
+
   public boolean isManagedByFlushManager() {
     return managedByFlushManager;
   }
@@ -323,4 +322,5 @@ public class UnsealedTsFileProcessorV2 {
       flushQueryLock.readLock().unlock();
     }
   }
+
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
index 70bbe4d..02a6c82 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
@@ -131,7 +131,6 @@ public class MemTableFlushTaskV2 {
       }
     }
 
-    tsFileIoWriter.makeMetadataVisible();
     flushCallBack.accept(memTable);
   });