You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/12 05:55:38 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated: fix write restore info concurrent bug

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
     new 4e7962b  fix write restore info concurrent bug
4e7962b is described below

commit 4e7962b1dbb8fda9733f3301d440c7d91d416f3f
Author: qiaojialin <64...@qq.com>
AuthorDate: Wed Jun 12 13:55:24 2019 +0800

    fix write restore info concurrent bug
---
 .../engine/bufferwrite/BufferWriteProcessor.java   | 47 +++-------------------
 .../db/engine/filenode/FileNodeProcessor.java      |  2 +
 .../db/engine/memtable/MemTableFlushTask.java      | 23 ++++++++---
 .../iotdb/db/engine/memtable/MemTablePool.java     |  2 +-
 .../db/engine/overflow/io/OverflowProcessor.java   |  8 ++--
 .../bufferwrite/BufferWriteProcessorNewTest.java   |  1 -
 .../bufferwrite/BufferWriteProcessorTest.java      |  3 --
 7 files changed, 30 insertions(+), 56 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
index 97e53cb..bc43479 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
@@ -55,7 +55,6 @@ import org.apache.iotdb.db.utils.ImmediateFuture;
 import org.apache.iotdb.db.utils.MemUtils;
 import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
 import org.apache.iotdb.db.writelog.node.WriteLogNode;
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -99,12 +98,8 @@ public class BufferWriteProcessor extends Processor {
   private WriteLogNode logNode;
   private VersionController versionController;
 
-  private boolean isClosing = true;
-
+  private boolean isClosing = false;
   private boolean isClosed = false;
-  private boolean isFlush = false;
-
-
 
   private TsFileResource currentTsFileResource;
 
@@ -173,7 +168,6 @@ public class BufferWriteProcessor extends Processor {
       workMemTable.clear();
     }
     isClosing = false;
-    isFlush = false;
   }
 
   public void checkOpen() throws BufferWriteProcessorException {
@@ -321,19 +315,6 @@ public class BufferWriteProcessor extends Processor {
   }
 
 
-  private void switchFlushToWork() {
-    LOGGER.info("BufferWrite Processor {} try to get flushQueryLock for switchFlushToWork", getProcessorName());
-    flushQueryLock.lock();
-    LOGGER.info("BufferWrite Processor {} get flushQueryLock for switchFlushToWork", getProcessorName());
-    try {
-      writer.appendMetadata();
-      isFlush = false;
-    } finally {
-      flushQueryLock.unlock();
-      LOGGER.info("BufferWrite Processor {} release the flushQueryLock for switchFlushToWork successfully", getProcessorName());
-    }
-  }
-
   /**
    * return the memtable to MemTablePool and make
    * @param memTable
@@ -386,15 +367,10 @@ public class BufferWriteProcessor extends Processor {
       }
       result = true;
     } catch (Exception e) {
-      LOGGER.error(
-          "The bufferwrite processor {} failed to flush {}, when calling the filenodeFlushAction.",
-          getProcessorName(), displayMessage, e);
+      LOGGER.error("The bufferwrite processor {} failed to flush {}.", getProcessorName(), displayMessage, e);
       result = false;
-    } finally {
-      switchFlushToWork();
-      LOGGER.info("The bufferwrite processor {} ends flushing {}.", getProcessorName(),
-            displayMessage);
     }
+
     if (LOGGER.isInfoEnabled()) {
       long flushEndTime = System.currentTimeMillis();
       LOGGER.info(
@@ -559,15 +535,6 @@ public class BufferWriteProcessor extends Processor {
   }
 
   /**
-   * check if is flushing.
-   *
-   * @return True if flushing
-   */
-  public boolean isFlush() {
-    return isFlush;
-  }
-
-  /**
    * get metadata size.
    *
    * @return The sum of all timeseries's metadata size within this file.
@@ -649,12 +616,10 @@ public class BufferWriteProcessor extends Processor {
       throws BufferWriteProcessorException {
     checkOpen();
     workMemTable.delete(deviceId, measurementId, timestamp);
-    if (isFlush()) {
       // flushing MemTable cannot be directly modified since another thread is reading it
-      for (IMemTable memTable : flushingMemTables) {
-        if (memTable.containSeries(deviceId, measurementId)) {
-          memTable.delete(new Deletion(deviceId + PATH_SEPARATOR + measurementId, 0, timestamp));
-        }
+    for (IMemTable memTable : flushingMemTables) {
+      if (memTable.containSeries(deviceId, measurementId)) {
+        memTable.delete(new Deletion(deviceId + PATH_SEPARATOR + measurementId, 0, timestamp));
       }
     }
   }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index 5bad134..2761720 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -176,6 +176,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
    */
   private Map<String, Action> parameters;
   private FileSchema fileSchema;
+
   private Action fileNodeFlushAction = () -> {
     synchronized (fileNodeProcessorStore) {
       try {
@@ -185,6 +186,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       }
     }
   };
+
   private Action bufferwriteFlushAction = () -> {
     // update the lastUpdateTime Notice: Thread safe
     synchronized (fileNodeProcessorStore) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java
index 02cd45c..0c103bb 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java
@@ -61,6 +61,8 @@ public class MemTableFlushTask {
 
   private Thread memoryFlushThread = new Thread(() -> {
     long memSerializeTime = 0;
+    LOGGER.info(
+        "BufferWrite Processor {},start serialize data into mem.", processorName);
     while (!stop) {
       Object task = memoryTaskQueue.poll();
       if (task == null) {
@@ -102,14 +104,18 @@ public class MemTableFlushTask {
   // rather than per each memtable.
   private Thread ioFlushThread = new Thread(() -> {
     long ioTime = 0;
+    long lastWaitIdx = 0;
     long currentTsFileFlushId;
-    boolean printed = false;
+    LOGGER.info("BufferWrite Processor {}, start io cost.", processorName);
+    long waitStartTime = System.currentTimeMillis();
     while ((currentTsFileFlushId = tsFileIoWriter.getFlushID().get()) != flushId) {
       try {
-        if (!printed) {
-          LOGGER.info("tsFileIoWriter flushID: {}, flush task flushID: {}", currentTsFileFlushId,
-              flushId);
-          printed = true;
+        long waitedTime = System.currentTimeMillis() - waitStartTime;
+        long currWaitIdx = waitedTime / 2000;
+        if (currWaitIdx > lastWaitIdx) {
+          lastWaitIdx  = currWaitIdx;
+          LOGGER.info("tsFileIoWriter flushID: {}, flush task flushID: {} has waited {}ms", currentTsFileFlushId,
+              flushId, waitedTime);
         }
         Thread.sleep(10);
       } catch (InterruptedException e) {
@@ -143,16 +149,21 @@ public class MemTableFlushTask {
         ioTime += System.currentTimeMillis() - starTime;
       }
     }
+
     MemTablePool.getInstance().release(memTable);
     LOGGER.info("Processor {} return back a memtable to MemTablePool", processorName);
     flushCallBack.afterFlush(memTable, tsFileIoWriter);
     if (tsFileIoWriter instanceof RestorableTsFileIOWriter) {
       try {
-        ((RestorableTsFileIOWriter) tsFileIoWriter).flush();
+        RestorableTsFileIOWriter restorableTsFileIOWriter = (RestorableTsFileIOWriter) tsFileIoWriter;
+        restorableTsFileIOWriter.flush();
+        restorableTsFileIOWriter.appendMetadata();
       } catch (IOException e) {
         LOGGER.error("write restore file meet error", e);
       }
     }
+
+    // enable next flush task to IO
     long newId = tsFileIoWriter.getFlushID().incrementAndGet();
     LOGGER.info("BufferWrite Processor {}, flushing a memtable into disk:  io cost {}ms, new flushID in tsFileIoWriter: {}.",
         processorName, ioTime, newId);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
index f82bfe5..7799c7a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
@@ -54,7 +54,7 @@ public class MemTablePool {
       long waitedTime = System.currentTimeMillis() - waitStartTime;
       if (waitedTime / 2000 > lastPrintIdx) {
         lastPrintIdx = waitedTime / 2000;
-        LOGGER.info("{} has waited for a memtable for {}ms", waitedTime);
+        LOGGER.info("{} has waited for a memtable for {}ms", applier, waitedTime);
       }
     }
   }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
index d8cb8be..4317cbc 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
@@ -767,13 +767,13 @@ public class OverflowProcessor extends Processor {
 //    try {
 //      Pair<> workSupport;
 //      workSupport = new OverflowMemtable();
-//      if(isFlush){
-//        // isFlush = true, indicating an AsyncFlushThread has been running, only add Current overflowInfo
+//      if(isFlushing){
+//        // isFlushing = true, indicating an AsyncFlushThread has been running, only add Current overflowInfo
 //        // into List.
 //
 //
 //      }else {
-//        isFlush = true;
+//        isFlushing = true;
 ////        flushFuture = FlushManager.getInstance().submit(() ->
 //            flushTask("asynchronously", walTaskId));
 //      }
@@ -797,7 +797,7 @@ public class OverflowProcessor extends Processor {
 //            OverflowMemtable temp = flushSupport == null ? new OverflowMemtable() : flushSupport;
 //            flushSupport = workSupport;
 //            workSupport = temp;
-//            isFlush = true;
+//            isFlushing = true;
 //            break;
 //          }
 //          flushInfo = flushTaskList.remove(0);
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java
index 1fb5e48..a826495 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java
@@ -124,7 +124,6 @@ public class BufferWriteProcessorNewTest {
       assertEquals(num, timeValuePair.getTimestamp());
       assertEquals(num, timeValuePair.getValue().getInt());
     }
-    assertFalse(bufferwrite.isFlush());
     long lastFlushTime = bufferwrite.getLastFlushTime();
     // flush asynchronously
     bufferwrite.flush();
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java
index 65ef7a4..d3c462d 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java
@@ -28,7 +28,6 @@ import static org.junit.Assert.fail;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.nio.Buffer;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -219,7 +218,6 @@ public class BufferWriteProcessorTest {
     bufferwrite = new BufferWriteProcessor(directories.getFolderForTest(), deviceId, insertPath,
         parameters, bfcloseConsumer, SysTimeVersionController.INSTANCE,
         FileSchemaUtils.constructFileSchema(deviceId));
-    assertFalse(bufferwrite.isFlush());
     assertTrue(bufferwrite.canBeClosed());
     assertEquals(0, bufferwrite.memoryUsage());
     assertEquals(TsFileIOWriter.magicStringBytes.length, bufferwrite.getFileSize());
@@ -242,7 +240,6 @@ public class BufferWriteProcessorTest {
       Assert.fail("mock flush spends more than 10 seconds... "
           + "Please modify the value or change a better test environment");
     }
-    assertFalse(bufferwrite.isFlush());
     assertEquals(0, bufferwrite.memoryUsage());
     // query result
     Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = bufferwrite