You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/25 07:01:58 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated: fix memtable flush task null pointer bug

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
     new b029d1f  fix memtable flush task null pointer bug
b029d1f is described below

commit b029d1ffb787854586239aa14ff95e40bbf9a94f
Author: qiaojialin <64...@qq.com>
AuthorDate: Tue Jun 25 15:01:44 2019 +0800

    fix memtable flush task null pointer bug
---
 .../filenodeV2/UnsealedTsFileProcessorV2.java      |   4 +-
 .../db/engine/memtable/MemTableFlushTaskV2.java    | 202 +++++++++++----------
 .../engine/memtable/MemTableFlushTaskV2Test.java   |   4 +-
 3 files changed, 110 insertions(+), 100 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
index f16e0ab..b3074a9 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
@@ -296,9 +296,9 @@ public class UnsealedTsFileProcessorV2 {
 
     // null memtable only appears when calling asyncClose()
     if (memTableToFlush.isManagedByMemPool()) {
-      MemTableFlushTaskV2 flushTask = new MemTableFlushTaskV2(writer, storageGroupName,
+      MemTableFlushTaskV2 flushTask = new MemTableFlushTaskV2(memTableToFlush, fileSchema, writer, storageGroupName,
           this::releaseFlushedMemTableCallback);
-      flushTask.flushMemTable(fileSchema, memTableToFlush);
+      flushTask.flushMemTable();
       MemTablePool.getInstance().putBack(memTableToFlush, storageGroupName);
       logNode.notifyEndFlush();
       LOGGER.info("flush a memtable has finished");
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
index a66809d..cf700fb 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
@@ -49,12 +49,15 @@ public class MemTableFlushTaskV2 {
 
   private Consumer<IMemTable> flushCallBack;
   private IMemTable memTable;
+  private FileSchema fileSchema;
 
   private boolean memoryFlushNoMoreTask = false;
   private boolean ioFlushTaskCanStop = false;
 
-  public MemTableFlushTaskV2(NativeRestorableIOWriter writer, String storageGroup,
+  public MemTableFlushTaskV2(IMemTable memTable, FileSchema fileSchema, NativeRestorableIOWriter writer, String storageGroup,
       Consumer<IMemTable> callBack) {
+    this.memTable = memTable;
+    this.fileSchema = fileSchema;
     this.tsFileIoWriter = writer;
     this.storageGroup = storageGroup;
     this.flushCallBack = callBack;
@@ -68,59 +71,63 @@ public class MemTableFlushTaskV2 {
   private Runnable memoryFlushTask = new Runnable() {
     @Override
     public void run() {
-      long memSerializeTime = 0;
-      boolean returnWhenNoTask = false;
-      LOGGER.info("Storage group {} memtable {}, starts to serialize data into mem.", storageGroup,
-          memTable.getVersion());
-      while (true) {
-        if (memoryFlushNoMoreTask) {
-          returnWhenNoTask = true;
-        }
-        Object task = memoryTaskQueue.poll();
-        if (task == null) {
-          if (returnWhenNoTask) {
-            break;
-          }
-          try {
-            Thread.sleep(10);
-          } catch (InterruptedException e) {
-            LOGGER.error("Storage group {} memtable {}, io flush task is interrupted.",
-                storageGroup, memTable.getVersion(), e);
+      try {
+        long memSerializeTime = 0;
+        boolean returnWhenNoTask = false;
+        LOGGER.info("Storage group {} memtable {}, starts to serialize data into mem.", storageGroup,
+            memTable.getVersion());
+        while (true) {
+          if (memoryFlushNoMoreTask) {
+            returnWhenNoTask = true;
           }
-        } else {
-          if (task instanceof String) {
-            LOGGER.info("Storage group {} memtable {}, issues a start flush chunk group task.",
-                storageGroup, memTable.getVersion());
-            ioTaskQueue.add(task);
-          } else if (task instanceof ChunkGroupIoTask) {
-            LOGGER.info("Storage group {} memtable {}, issues a end flush chunk group task.",
-                storageGroup, memTable.getVersion());
-            ioTaskQueue.add(task);
-          } else {
-            long starTime = System.currentTimeMillis();
-            Pair<List<TimeValuePair>, MeasurementSchema> memorySerializeTask = (Pair<List<TimeValuePair>, MeasurementSchema>) task;
-            ChunkBuffer chunkBuffer = new ChunkBuffer(memorySerializeTask.right);
-            IChunkWriter seriesWriter = new ChunkWriterImpl(memorySerializeTask.right, chunkBuffer,
-                PAGE_SIZE_THRESHOLD);
+          Object task = memoryTaskQueue.poll();
+          if (task == null) {
+            if (returnWhenNoTask) {
+              break;
+            }
             try {
-              writeOneSeries(memorySerializeTask.left, seriesWriter,
-                  memorySerializeTask.right.getType());
-              ioTaskQueue.add(seriesWriter);
-            } catch (IOException e) {
-              LOGGER.error("Storage group {} memtable {}, io error.", storageGroup,
-                  memTable.getVersion(), e);
-              throw new RuntimeException(e);
+              Thread.sleep(10);
+            } catch (InterruptedException e) {
+              LOGGER.error("Storage group {} memtable {}, io flush task is interrupted.",
+                  storageGroup, memTable.getVersion(), e);
+            }
+          } else {
+            if (task instanceof String) {
+              LOGGER.info("Storage group {} memtable {}, issues a start flush chunk group task.",
+                  storageGroup, memTable.getVersion());
+              ioTaskQueue.add(task);
+            } else if (task instanceof ChunkGroupIoTask) {
+              LOGGER.info("Storage group {} memtable {}, issues a end flush chunk group task.",
+                  storageGroup, memTable.getVersion());
+              ioTaskQueue.add(task);
+            } else {
+              long starTime = System.currentTimeMillis();
+              Pair<List<TimeValuePair>, MeasurementSchema> memorySerializeTask = (Pair<List<TimeValuePair>, MeasurementSchema>) task;
+              ChunkBuffer chunkBuffer = new ChunkBuffer(memorySerializeTask.right);
+              IChunkWriter seriesWriter = new ChunkWriterImpl(memorySerializeTask.right, chunkBuffer,
+                  PAGE_SIZE_THRESHOLD);
+              try {
+                writeOneSeries(memorySerializeTask.left, seriesWriter,
+                    memorySerializeTask.right.getType());
+                ioTaskQueue.add(seriesWriter);
+              } catch (IOException e) {
+                LOGGER.error("Storage group {} memtable {}, io error.", storageGroup,
+                    memTable.getVersion(), e);
+                throw new RuntimeException(e);
+              }
+              LOGGER.info("Storage group {} memtable {}, issues a write chunk task.",
+                  storageGroup, memTable.getVersion());
+              memSerializeTime += System.currentTimeMillis() - starTime;
             }
-            LOGGER.info("Storage group {} memtable {}, issues a write chunk task.",
-                storageGroup, memTable.getVersion());
-            memSerializeTime += System.currentTimeMillis() - starTime;
           }
         }
+        ioFlushTaskCanStop = true;
+        LOGGER.info("Storage group {}, flushing memtable {} into disk: serialize data into mem cost "
+                + "{} ms.",
+            storageGroup, memTable.getVersion(), memSerializeTime);
+      } catch (RuntimeException e) {
+        LOGGER.error("memoryFlush thread is dead", e);
       }
-      ioFlushTaskCanStop = true;
-      LOGGER.info("Storage group {}, flushing memtable {} into disk: serialize data into mem cost "
-              + "{} ms.",
-          storageGroup, memTable.getVersion(), memSerializeTime);
     }
   };
 
@@ -130,52 +137,56 @@ public class MemTableFlushTaskV2 {
   private Runnable ioFlushTask = new Runnable() {
     @Override
     public void run() {
-      long ioTime = 0;
-      boolean returnWhenNoTask = false;
-      LOGGER.info("Storage group {} memtable {}, start io.", storageGroup, memTable.getVersion());
-      while (true) {
-        if (ioFlushTaskCanStop) {
-          returnWhenNoTask = true;
-        }
-        Object seriesWriterOrEndChunkGroupTask = ioTaskQueue.poll();
-        if (seriesWriterOrEndChunkGroupTask == null) {
-          if (returnWhenNoTask) {
-            break;
+      try {
+        long ioTime = 0;
+        boolean returnWhenNoTask = false;
+        LOGGER.info("Storage group {} memtable {}, start io.", storageGroup, memTable.getVersion());
+        while (true) {
+          if (ioFlushTaskCanStop) {
+            returnWhenNoTask = true;
           }
-          try {
-            Thread.sleep(10);
-          } catch (InterruptedException e) {
-            LOGGER.error("Storage group {} memtable, io flush task is interrupted.", storageGroup
-                , memTable.getVersion(), e);
-          }
-        } else {
-          long starTime = System.currentTimeMillis();
-          try {
-            if (seriesWriterOrEndChunkGroupTask instanceof IChunkWriter) {
-              LOGGER.info("Storage group {} memtable {}, writing a series to file.", storageGroup,
-                  memTable.getVersion());
-              ((IChunkWriter) seriesWriterOrEndChunkGroupTask).writeToFileWriter(tsFileIoWriter);
-            } else if (seriesWriterOrEndChunkGroupTask instanceof String) {
-              LOGGER.info("Storage group {} memtable {}, start a chunk group.", storageGroup,
-                  memTable.getVersion());
-              tsFileIoWriter.startChunkGroup((String) seriesWriterOrEndChunkGroupTask);
-            } else {
-              LOGGER.info("Storage group {} memtable {}, end a chunk group.", storageGroup,
-                  memTable.getVersion());
-              ChunkGroupIoTask task = (ChunkGroupIoTask) seriesWriterOrEndChunkGroupTask;
-              tsFileIoWriter.endChunkGroup(task.version);
-              task.finished = true;
+          Object seriesWriterOrEndChunkGroupTask = ioTaskQueue.poll();
+          if (seriesWriterOrEndChunkGroupTask == null) {
+            if (returnWhenNoTask) {
+              break;
+            }
+            try {
+              Thread.sleep(10);
+            } catch (InterruptedException e) {
+              LOGGER.error("Storage group {} memtable, io flush task is interrupted.", storageGroup
+                  , memTable.getVersion(), e);
+            }
+          } else {
+            long starTime = System.currentTimeMillis();
+            try {
+              if (seriesWriterOrEndChunkGroupTask instanceof IChunkWriter) {
+                LOGGER.info("Storage group {} memtable {}, writing a series to file.", storageGroup,
+                    memTable.getVersion());
+                ((IChunkWriter) seriesWriterOrEndChunkGroupTask).writeToFileWriter(tsFileIoWriter);
+              } else if (seriesWriterOrEndChunkGroupTask instanceof String) {
+                LOGGER.info("Storage group {} memtable {}, start a chunk group.", storageGroup,
+                    memTable.getVersion());
+                tsFileIoWriter.startChunkGroup((String) seriesWriterOrEndChunkGroupTask);
+              } else {
+                LOGGER.info("Storage group {} memtable {}, end a chunk group.", storageGroup,
+                    memTable.getVersion());
+                ChunkGroupIoTask task = (ChunkGroupIoTask) seriesWriterOrEndChunkGroupTask;
+                tsFileIoWriter.endChunkGroup(task.version);
+                task.finished = true;
+              }
+            } catch (IOException e) {
+              LOGGER.error("Storage group {} memtable {}, io error.", storageGroup,
+                  memTable.getVersion(), e);
+              throw new RuntimeException(e);
             }
-          } catch (IOException e) {
-            LOGGER.error("Storage group {} memtable {}, io error.", storageGroup,
-                memTable.getVersion(), e);
-            throw new RuntimeException(e);
+            ioTime += System.currentTimeMillis() - starTime;
           }
-          ioTime += System.currentTimeMillis() - starTime;
         }
+        LOGGER.info("flushing a memtable {} in storage group {}, cost {}ms", memTable.getVersion(),
+            storageGroup, ioTime);
+      } catch (RuntimeException e) {
+        LOGGER.error("ioflush thread is dead", e);
       }
-      LOGGER.info("flushing a memtable {} in storage group {}, cost {}ms", memTable.getVersion(),
-          storageGroup, ioTime);
     }
   };
 
@@ -221,23 +232,22 @@ public class MemTableFlushTaskV2 {
   /**
    * the function for flushing memtable.
    */
-  public void flushMemTable(FileSchema fileSchema, IMemTable imemTable) {
+  public void flushMemTable() {
     long sortTime = 0;
     ChunkGroupIoTask theLastTask = EMPTY_TASK;
-    this.memTable = imemTable;
-    for (String deviceId : imemTable.getMemTableMap().keySet()) {
+    for (String deviceId : memTable.getMemTableMap().keySet()) {
       memoryTaskQueue.add(deviceId);
-      int seriesNumber = imemTable.getMemTableMap().get(deviceId).size();
-      for (String measurementId : imemTable.getMemTableMap().get(deviceId).keySet()) {
+      int seriesNumber = memTable.getMemTableMap().get(deviceId).size();
+      for (String measurementId : memTable.getMemTableMap().get(deviceId).keySet()) {
         long startTime = System.currentTimeMillis();
         // TODO if we can not use TSFileIO writer, then we have to redesign the class of TSFileIO.
-        IWritableMemChunk series = imemTable.getMemTableMap().get(deviceId).get(measurementId);
+        IWritableMemChunk series = memTable.getMemTableMap().get(deviceId).get(measurementId);
         MeasurementSchema desc = fileSchema.getMeasurementSchema(measurementId);
         List<TimeValuePair> sortedTimeValuePairs = series.getSortedTimeValuePairList();
         sortTime += System.currentTimeMillis() - startTime;
         memoryTaskQueue.add(new Pair<>(sortedTimeValuePairs, desc));
       }
-      theLastTask = new ChunkGroupIoTask(seriesNumber, deviceId, imemTable.getVersion());
+      theLastTask = new ChunkGroupIoTask(seriesNumber, deviceId, memTable.getVersion());
       memoryTaskQueue.add(theLastTask);
     }
     memoryFlushNoMoreTask = true;
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2Test.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2Test.java
index 074317d..2566fa3 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2Test.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2Test.java
@@ -59,7 +59,7 @@ public class MemTableFlushTaskV2Test {
     MemTableTestUtils.produceData(memTable, startTime, endTime, MemTableTestUtils.deviceId0,
         MemTableTestUtils.measurementId0,
         MemTableTestUtils.dataType0);
-    MemTableFlushTaskV2 memTableFlushTask = new MemTableFlushTaskV2(writer, storageGroup,
+    MemTableFlushTaskV2 memTableFlushTask = new MemTableFlushTaskV2(memTable, MemTableTestUtils.getFileSchema(), writer, storageGroup,
         memtable -> {
           writer.makeMetadataVisible();
           MemTablePool.getInstance().putBack(memtable, storageGroup);
@@ -67,7 +67,7 @@ public class MemTableFlushTaskV2Test {
     assertTrue(writer
         .getVisibleMetadatas(MemTableTestUtils.deviceId0, MemTableTestUtils.measurementId0,
             MemTableTestUtils.dataType0).isEmpty());
-    memTableFlushTask.flushMemTable(MemTableTestUtils.getFileSchema(), memTable);
+    memTableFlushTask.flushMemTable();
     assertEquals(1, writer
         .getVisibleMetadatas(MemTableTestUtils.deviceId0, MemTableTestUtils.measurementId0,
             MemTableTestUtils.dataType0).size());