You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/11 13:19:42 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated: fix getClosingBufferWriteProcessor bug

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
     new 80f41d9  fix getClosingBufferWriteProcessor bug
80f41d9 is described below

commit 80f41d914eb6ada1cbf623816f5009cf5e4f96a3
Author: qiaojialin <64...@qq.com>
AuthorDate: Tue Jun 11 21:19:28 2019 +0800

    fix getClosingBufferWriteProcessor bug
---
 .../java/org/apache/iotdb/db/engine/Processor.java | 10 +++----
 .../engine/bufferwrite/BufferWriteProcessor.java   | 31 +++++++++++-----------
 .../db/engine/filenode/CopyOnWriteLinkedList.java  | 12 ++++++---
 .../iotdb/db/engine/filenode/FileNodeManager.java  | 18 ++++++-------
 .../db/engine/filenode/FileNodeProcessor.java      | 16 +++++++----
 .../iotdb/db/query/control/FileReaderManager.java  |  2 +-
 6 files changed, 50 insertions(+), 39 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java
index 0388476..42d6f5e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java
@@ -82,11 +82,11 @@ public abstract class Processor {
    * Release the write lock
    */
   public void writeUnlock() {
-    lock.writeLock().unlock();
     start = System.currentTimeMillis() - start;
     if (start > 1000) {
       LOGGER.info("Processor {} hold lock for {}ms", processorName, start, new RuntimeException());
     }
+    lock.writeLock().unlock();
   }
 
   /**
@@ -115,15 +115,15 @@ public abstract class Processor {
    *            true release write lock, false release read unlock
    */
   public void unlock(boolean isWriteUnlock) {
+    start = System.currentTimeMillis() - start;
+    if (start > 1000) {
+      LOGGER.info("Processor {} hold lock for {}ms", processorName, start, new RuntimeException());
+    }
     if (isWriteUnlock) {
       writeUnlock();
     } else {
       readUnlock();
     }
-    start = System.currentTimeMillis() - start;
-    if (start > 1000) {
-      LOGGER.info("Processor {} hold lock for {}ms", processorName, start, new RuntimeException());
-    }
   }
 
   /**
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
index a0ed3c2..d627d9b 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
@@ -25,7 +25,6 @@ import java.io.IOException;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -46,14 +45,12 @@ import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.memtable.MemSeriesLazyMerger;
 import org.apache.iotdb.db.engine.memtable.MemTableFlushTask;
 import org.apache.iotdb.db.engine.memtable.MemTablePool;
-import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
 import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.pool.FlushManager;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.BufferWriteProcessorException;
 import org.apache.iotdb.db.qp.constant.DatetimeUtils;
-import org.apache.iotdb.db.utils.FileUtils;
 import org.apache.iotdb.db.utils.ImmediateFuture;
 import org.apache.iotdb.db.utils.MemUtils;
 import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
@@ -101,7 +98,9 @@ public class BufferWriteProcessor extends Processor {
   private WriteLogNode logNode;
   private VersionController versionController;
 
-  private boolean isClosed = true;
+  private boolean isClosing = true;
+
+  private boolean isClosed = false;
   private boolean isFlush = false;
 
 
@@ -141,7 +140,7 @@ public class BufferWriteProcessor extends Processor {
   }
 
   public void reopen(String fileName) throws BufferWriteProcessorException {
-    if (!isClosed) {
+    if (!isClosing) {
       return;
     }
 
@@ -167,12 +166,12 @@ public class BufferWriteProcessor extends Processor {
     } else {
       workMemTable.clear();
     }
-    isClosed = false;
+    isClosing = false;
     isFlush = false;
   }
 
   public void checkOpen() throws BufferWriteProcessorException {
-    if (isClosed) {
+    if (isClosing) {
       throw new BufferWriteProcessorException("BufferWriteProcessor already closed");
     }
   }
@@ -385,7 +384,7 @@ public class BufferWriteProcessor extends Processor {
 
   // keyword synchronized is added in this method, so that only one flush task can be submitted now.
   private Future<Boolean> flush(boolean isCloseTaskCalled) throws IOException {
-    if (!isCloseTaskCalled && isClosed) {
+    if (!isCloseTaskCalled && isClosing) {
       throw new IOException("BufferWriteProcessor closed");
     }
     // statistic information for flush
@@ -459,11 +458,11 @@ public class BufferWriteProcessor extends Processor {
 
   @Override
   public synchronized void close() throws BufferWriteProcessorException {
-    if (isClosed) {
+    if (isClosing) {
       return;
     }
     try {
-      isClosed = true;
+      isClosing = true;
       // flush data (if there are flushing task, flush() will be blocked)
       //Future<Boolean> flush = flush();
       //and wait for finishing flush async
@@ -488,12 +487,8 @@ public class BufferWriteProcessor extends Processor {
       //FIXME suppose the flush-thread-pool is 2.
       // then if a flush task and a close task are running in the same time
       // and the close task is faster, then writer == null, and the flush task will throw nullpointer
-      // expcetion. Add "synchronized" keyword on both flush and close may solve the issue.
+      // exception. Add "synchronized" keyword on both flush and close may solve the issue.
       writer = null;
-      isClosed = true;
-      //A BUG may appears here: workMemTable has been cleared,
-      // but the corresponding TsFile is not maintained in Processor.
-      workMemTable.clear();
       // update the IntervalFile for interval list
       bufferwriteCloseConsumer.accept(this);
       // flush the changed information for filenode
@@ -513,6 +508,8 @@ public class BufferWriteProcessor extends Processor {
     }catch (IOException | ActionException e) {
       LOGGER.error("Close bufferwrite processor {} failed.", getProcessorName(), e);
       return false;
+    } finally {
+      isClosed = true;
     }
     return true;
   }
@@ -642,6 +639,10 @@ public class BufferWriteProcessor extends Processor {
     return insertFilePath;
   }
 
+  public boolean isClosing() {
+    return isClosing;
+  }
+
   public boolean isClosed() {
     return isClosed;
   }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/CopyOnWriteLinkedList.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/CopyOnWriteLinkedList.java
index a412525..89a8c0a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/CopyOnWriteLinkedList.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/CopyOnWriteLinkedList.java
@@ -31,17 +31,21 @@ import java.util.concurrent.CopyOnWriteArrayList;
  * @param <T>
  */
 public class CopyOnWriteLinkedList<T> {
+
   LinkedList<T> data = new LinkedList<>();
   List<T> readCopy;
 
-  public synchronized  void add(T d) {
+  public synchronized void add(T d) {
     data.add(d);
-    readCopy = new ArrayList<>(data);
+  }
+
+  public synchronized void remove(T d) {
+    data.remove(d);
   }
 
   public synchronized Iterator<T> iterator() {
-      readCopy = new ArrayList<>(data);
-      return data.iterator();
+    readCopy = new ArrayList<>(data);
+    return readCopy.iterator();
   }
 
   public synchronized void reset() {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
index a1fe9cd..85b2a94 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
@@ -128,18 +128,18 @@ public class FileNodeManager implements IStatistic, IService {
       statMonitor.registerStatistics(MonitorConstants.STAT_STORAGE_DELTA_NAME, this);
     }
 
-//    closedProcessorCleaner.scheduleWithFixedDelay(()->{
-//      int size = 0;
-//      for (FileNodeProcessor fileNodeProcessor : processorMap.values()) {
-//        size += fileNodeProcessor.getClosingBufferWriteProcessor().size();
-//      }
-//      if (size > 5) {
-//        LOGGER.info("Current closing processor number is {}", size);
-//      }
+    closedProcessorCleaner.scheduleWithFixedDelay(()->{
+      int size = 0;
+      for (FileNodeProcessor fileNodeProcessor : processorMap.values()) {
+        size += fileNodeProcessor.getClosingBufferWriteProcessor().size();
+      }
+      if (size > 5) {
+        LOGGER.info("Current closing processor number is {}", size);
+      }
 //      for (FileNodeProcessor fileNodeProcessor : processorMap.values()) {
 //        fileNodeProcessor.checkAllClosingProcessors();
 //      }
-//    }, 0, 3000, TimeUnit.MILLISECONDS);
+    }, 0, 30000, TimeUnit.MILLISECONDS);
 
   }
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index df31499..5bad134 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -580,7 +580,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
             .format("The filenode processor %s failed to get the bufferwrite processor.",
                 processorName), e);
       }
-    } else if (bufferWriteProcessor.isClosed()) {
+    } else if (bufferWriteProcessor.isClosing()) {
       try {
         bufferWriteProcessor.reopen(insertTime + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR
             + System.currentTimeMillis());
@@ -1765,7 +1765,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
   public FileNodeFlushFuture flush() throws IOException {
     Future<Boolean> bufferWriteFlushFuture = null;
     Future<Boolean> overflowFlushFuture = null;
-    if (bufferWriteProcessor != null && !bufferWriteProcessor.isClosed()) {
+    if (bufferWriteProcessor != null && !bufferWriteProcessor.isClosing()) {
       bufferWriteFlushFuture = bufferWriteProcessor.flush();
     }
     if (overflowProcessor != null && !overflowProcessor.isClosed()) {
@@ -1778,7 +1778,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
    * Close the bufferwrite processor.
    */
   public Future<Boolean> closeBufferWrite() throws FileNodeProcessorException {
-    if (bufferWriteProcessor == null || bufferWriteProcessor.isClosed()) {
+    if (bufferWriteProcessor == null || bufferWriteProcessor.isClosing()) {
       return new ImmediateFuture<>(true);
     }
     try {
@@ -1960,7 +1960,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       // delete data in memory
       OverflowProcessor ofProcessor = getOverflowProcessor(getProcessorName());
       ofProcessor.delete(deviceId, measurementId, timestamp, version, updatedModFiles);
-      if (bufferWriteProcessor != null && !bufferWriteProcessor.isClosed()) {
+      if (bufferWriteProcessor != null && !bufferWriteProcessor.isClosing()) {
         bufferWriteProcessor.delete(deviceId, measurementId, timestamp);
       }
     } catch (Exception e) {
@@ -2015,7 +2015,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       }
       throw e;
     }
-    if (bufferWriteProcessor != null && !bufferWriteProcessor.isClosed()) {
+    if (bufferWriteProcessor != null && !bufferWriteProcessor.isClosing()) {
       try {
         bufferWriteProcessor.delete(deviceId, measurementId, timestamp);
       } catch (BufferWriteProcessorException e) {
@@ -2044,6 +2044,12 @@ public class FileNodeProcessor extends Processor implements IStatistic {
   }
 
   public CopyOnWriteLinkedList<BufferWriteProcessor> getClosingBufferWriteProcessor() {
+    for (BufferWriteProcessor processor: closingBufferWriteProcessor.read()) {
+      if (processor.isClosed()) {
+        closingBufferWriteProcessor.remove(processor);
+      }
+    }
+    closingBufferWriteProcessor.reset();
     return closingBufferWriteProcessor;
   }
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java b/iotdb/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
index 461ff0b..cd33c91 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
@@ -117,7 +117,7 @@ public class FileReaderManager implements IService {
 
   /**
    * Get the reader of the file(tsfile or unseq tsfile) indicated by filePath. If the reader already
-   * exists, just get it from closedFileReaderMap or unclosedFileReaderMap depending on isClosed .
+   * exists, just get it from closedFileReaderMap or unclosedFileReaderMap depending on isClosing .
    * Otherwise a new reader will be created and cached.
    *
    * @param filePath the path of the file, of which the reader is desired.