You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/19 01:59:52 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated: fix filenode merge lock

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
     new fce0cd7  fix filenode merge lock
fce0cd7 is described below

commit fce0cd739f8755cd0af18225f09ec0abc36857e9
Author: qiaojialin <64...@qq.com>
AuthorDate: Wed Jun 19 09:59:38 2019 +0800

    fix filenode merge lock
---
 .../bufferwriteV2/BufferWriteProcessorV2.java      |  2 +-
 .../db/engine/bufferwriteV2/FlushManager.java      | 24 +++++++++++-----------
 .../db/engine/filenode/FileNodeProcessor.java      |  2 +-
 .../db/engine/filenodeV2/TsFileResourceV2.java     |  7 +++----
 4 files changed, 17 insertions(+), 18 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/BufferWriteProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/BufferWriteProcessorV2.java
index 081b383..3ecf5e7 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/BufferWriteProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/BufferWriteProcessorV2.java
@@ -133,7 +133,7 @@ public class BufferWriteProcessorV2 {
    */
   public void asyncFlush() {
     flushingMemTables.addLast(workMemTable);
-    FlushManager.getInstance().registerBWP(this);
+    FlushManager.getInstance().registerBWProcessor(this);
     workMemTable = null;
   }
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/FlushManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/FlushManager.java
index 55477f9..085dbfe 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/FlushManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/FlushManager.java
@@ -23,26 +23,26 @@ import org.apache.iotdb.db.engine.pool.FlushPoolManager;
 
 public class FlushManager {
 
-  private ConcurrentLinkedQueue<BufferWriteProcessorV2> bwpQueue = new ConcurrentLinkedQueue<>();
+  private ConcurrentLinkedQueue<BufferWriteProcessorV2> bwProcessorQueue = new ConcurrentLinkedQueue<>();
 
   private FlushPoolManager flushPool = FlushPoolManager.getInstance();
 
-  private Runnable flushAction = () -> {
-    BufferWriteProcessorV2 bwp = bwpQueue.poll();
-    bwp.flushOneMemTable();
-    bwp.setManagedByFlushManager(false);
-    registerBWP(bwp);
+  private Runnable flushThread = () -> {
+    BufferWriteProcessorV2 bwProcessor = bwProcessorQueue.poll();
+    bwProcessor.flushOneMemTable();
+    bwProcessor.setManagedByFlushManager(false);
+    registerBWProcessor(bwProcessor);
   };
 
   /**
    * Add BufferWriteProcessor to asyncFlush manager
    */
-  public boolean registerBWP(BufferWriteProcessorV2 bwp) {
-    synchronized (bwp) {
-      if (!bwp.isManagedByFlushManager() && bwp.getFlushingMemTableSize() > 0) {
-        bwpQueue.add(bwp);
-        bwp.setManagedByFlushManager(true);
-        flushPool.submit(flushAction);
+  public boolean registerBWProcessor(BufferWriteProcessorV2 bwProcessor) {
+    synchronized (bwProcessor) {
+      if (!bwProcessor.isManagedByFlushManager() && bwProcessor.getFlushingMemTableSize() > 0) {
+        bwProcessorQueue.add(bwProcessor);
+        bwProcessor.setManagedByFlushManager(true);
+        flushPool.submit(flushThread);
         return true;
       }
       return false;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index 43086c4..0d670a7 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -1058,6 +1058,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
   public void merge() throws FileNodeProcessorException {
     // close bufferwrite and overflow, prepare for merge
     LOGGER.info("The filenode processor {} begins to merge.", getProcessorName());
+    writeLock();
     prepareForMerge();
     // change status from overflowed to no overflowed
     isOverflowed = false;
@@ -2077,7 +2078,6 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       try {
         ZoneId zoneId = IoTDBDescriptor.getInstance().getConfig().getZoneID();
         long mergeStartTime = System.currentTimeMillis();
-        writeLock();
         merge();
         long mergeEndTime = System.currentTimeMillis();
         long intervalTime = mergeEndTime - mergeStartTime;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
index 5741037..a459f3e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
@@ -23,22 +23,21 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Set;
 import org.apache.iotdb.db.conf.directories.Directories;
 import org.apache.iotdb.db.engine.filenode.OverflowChangeType;
-import org.apache.iotdb.db.engine.filenode.TsFileResource;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 public class TsFileResourceV2 {
 
   private File file;
+
+  // device -> start time
   private Map<String, Long> startTimeMap;
 
+  // device -> end time
   private Map<String, Long> endTimeMap;
 
   private transient ModificationFile modFile;