You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by li...@apache.org on 2019/06/12 08:16:47 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated: remove reopen mechanism in bufferwrite

This is an automated email from the ASF dual-hosted git repository.

liurui pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
     new 71f5c1a  remove reopen mechanism in bufferwrite
71f5c1a is described below

commit 71f5c1afc090007b42c2b0c61520fbf04d945805
Author: liuruiyiyang <24...@qq.com>
AuthorDate: Wed Jun 12 16:16:16 2019 +0800

    remove reopen mechanism in bufferwrite
---
 .../engine/bufferwrite/BufferWriteProcessor.java   | 52 ++++------------------
 .../db/engine/filenode/FileNodeProcessor.java      | 15 ++-----
 2 files changed, 12 insertions(+), 55 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
index 62cfee5..fe47e1d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
@@ -98,7 +98,6 @@ public class BufferWriteProcessor extends Processor {
   private WriteLogNode logNode;
   private VersionController versionController;
 
-  private boolean isClosing = true;
   private boolean isClosed = false;
 
   private TsFileResource currentTsFileResource;
@@ -125,7 +124,7 @@ public class BufferWriteProcessor extends Processor {
     //bufferwriteCloseAction = parameters.get(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION);
     filenodeFlushAction = parameters.get(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION);
     this.bufferwriteCloseConsumer = bufferwriteCloseConsumer;
-    reopen(fileName);
+    open(fileName);
     try {
       getLogNode();
     } catch (IOException e) {
@@ -135,20 +134,8 @@ public class BufferWriteProcessor extends Processor {
 
   }
 
-  public void reopen(String fileName) throws BufferWriteProcessorException {
-    if (!isClosing) {
-      return;
-    }
+  private void open(String fileName) throws BufferWriteProcessorException {
 
-    try {
-      LOGGER.info("wait for flush to reopen a BufferWrite Processor {}", processorName);
-      this.flushFuture.get();
-      LOGGER.info("wait for close to reopen a BufferWrite Processor {}", processorName);
-      this.closeFuture.get();
-      LOGGER.info("now begin to reopen the bufferwrite processor {}", processorName);
-    } catch (InterruptedException | ExecutionException e) {
-      LOGGER.error("reopen error in Bufferwrite Processor {}", processorName, e);
-    }
     new File(baseDir, processorName).mkdirs();
     this.insertFilePath = Paths.get(baseDir, processorName, fileName).toString();
     bufferWriteRelativePath = processorName + File.separatorChar + fileName;
@@ -157,22 +144,11 @@ public class BufferWriteProcessor extends Processor {
     } catch (IOException e) {
       throw new BufferWriteProcessorException(e);
     }
-    if (workMemTable == null) {
-      long start1 = System.currentTimeMillis();
-      workMemTable = MemTablePool.getInstance().getEmptyMemTable(this);
-      start1 = System.currentTimeMillis() - start1;
-      if (start1 > 1000) {
-        LOGGER.info("BufferWriteProcessor.reopen getEmptyMemtable cost: {}", start1);
-      }
-    } else {
-      workMemTable.clear();
-    }
-    isClosing = false;
-  }
-
-  public void checkOpen() throws BufferWriteProcessorException {
-    if (isClosing) {
-      throw new BufferWriteProcessorException("BufferWriteProcessor already closed");
+    long start1 = System.currentTimeMillis();
+    workMemTable = MemTablePool.getInstance().getEmptyMemTable(this);
+    start1 = System.currentTimeMillis() - start1;
+    if (start1 > 1000) {
+      LOGGER.info("BufferWriteProcessor.open getEmptyMemtable cost: {}", start1);
     }
   }
 
@@ -193,7 +169,6 @@ public class BufferWriteProcessor extends Processor {
   public boolean write(String deviceId, String measurementId, long timestamp, TSDataType dataType,
       String value)
       throws BufferWriteProcessorException {
-    checkOpen();
     TSRecord record = new TSRecord(timestamp, deviceId);
     DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementId, value);
     record.addTuple(dataPoint);
@@ -210,7 +185,6 @@ public class BufferWriteProcessor extends Processor {
    */
   public boolean write(TSRecord tsRecord) throws BufferWriteProcessorException {
     long start1 = System.currentTimeMillis();
-    checkOpen();
     long memUsage = MemUtils.getRecordSize(tsRecord);
     BasicMemController.UsageLevel level = BasicMemController.getInstance()
         .acquireUsage(this, memUsage);
@@ -293,7 +267,6 @@ public class BufferWriteProcessor extends Processor {
   public Pair<ReadOnlyMemChunk, List<ChunkMetaData>> queryBufferWriteData(String deviceId,
       String measurementId, TSDataType dataType, Map<String, String> props)
       throws BufferWriteProcessorException {
-    checkOpen();
     flushQueryLock.lock();
     try {
       MemSeriesLazyMerger memSeriesLazyMerger = new MemSeriesLazyMerger();
@@ -392,7 +365,7 @@ public class BufferWriteProcessor extends Processor {
 
   // keyword synchronized is added in this method, so that only one flush task can be submitted now.
   private Future<Boolean> flush(boolean isCloseTaskCalled) throws IOException {
-    if (!isCloseTaskCalled && isClosing) {
+    if (!isCloseTaskCalled) {
       throw new IOException("BufferWriteProcessor closed");
     }
     // statistic information for flush
@@ -478,11 +451,7 @@ public class BufferWriteProcessor extends Processor {
 
   @Override
   public synchronized void close() throws BufferWriteProcessorException {
-    if (isClosing) {
-      return;
-    }
     try {
-      isClosing = true;
       // flush data (if there are flushing task, flush() will be blocked)
       //Future<Boolean> flush = flush();
       //and wait for finishing flush async
@@ -619,7 +588,6 @@ public class BufferWriteProcessor extends Processor {
    */
   public void delete(String deviceId, String measurementId, long timestamp)
       throws BufferWriteProcessorException {
-    checkOpen();
     workMemTable.delete(deviceId, measurementId, timestamp);
       // flushing MemTable cannot be directly modified since another thread is reading it
     for (IMemTable memTable : flushingMemTables) {
@@ -648,10 +616,6 @@ public class BufferWriteProcessor extends Processor {
     return insertFilePath;
   }
 
-  public boolean isClosing() {
-    return isClosing;
-  }
-
   public boolean isClosed() {
     return isClosed;
   }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index 2761720..a1ee26a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -582,13 +582,6 @@ public class FileNodeProcessor extends Processor implements IStatistic {
             .format("The filenode processor %s failed to get the bufferwrite processor.",
                 processorName), e);
       }
-    } else if (bufferWriteProcessor.isClosing()) {
-      try {
-        bufferWriteProcessor.reopen(insertTime + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR
-            + System.currentTimeMillis());
-      } catch (BufferWriteProcessorException e) {
-        throw new FileNodeProcessorException("Cannot reopen BufferWriteProcessor", e);
-      }
     }
     return bufferWriteProcessor;
   }
@@ -1767,7 +1760,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
   public FileNodeFlushFuture flush() throws IOException {
     Future<Boolean> bufferWriteFlushFuture = null;
     Future<Boolean> overflowFlushFuture = null;
-    if (bufferWriteProcessor != null && !bufferWriteProcessor.isClosing()) {
+    if (bufferWriteProcessor != null) {
       bufferWriteFlushFuture = bufferWriteProcessor.flush();
     }
     if (overflowProcessor != null && !overflowProcessor.isClosed()) {
@@ -1780,7 +1773,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
    * Close the bufferwrite processor.
    */
   public Future<Boolean> closeBufferWrite() throws FileNodeProcessorException {
-    if (bufferWriteProcessor == null || bufferWriteProcessor.isClosing()) {
+    if (bufferWriteProcessor == null) {
       return new ImmediateFuture<>(true);
     }
     try {
@@ -1962,7 +1955,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       // delete data in memory
       OverflowProcessor ofProcessor = getOverflowProcessor(getProcessorName());
       ofProcessor.delete(deviceId, measurementId, timestamp, version, updatedModFiles);
-      if (bufferWriteProcessor != null && !bufferWriteProcessor.isClosing()) {
+      if (bufferWriteProcessor != null) {
         bufferWriteProcessor.delete(deviceId, measurementId, timestamp);
       }
     } catch (Exception e) {
@@ -2017,7 +2010,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       }
       throw e;
     }
-    if (bufferWriteProcessor != null && !bufferWriteProcessor.isClosing()) {
+    if (bufferWriteProcessor != null) {
       try {
         bufferWriteProcessor.delete(deviceId, measurementId, timestamp);
       } catch (BufferWriteProcessorException e) {