You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/06/25 01:36:13 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated: modify mem table flush task

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
     new 199c075  modify mem table flush task
199c075 is described below

commit 199c07595ea89fb21389361d6e2bd25a10899d5b
Author: lta <li...@163.com>
AuthorDate: Tue Jun 25 09:35:57 2019 +0800

    modify mem table flush task
---
 .../db/engine/memtable/MemTableFlushTaskV2.java    | 64 +++++++++++++---------
 .../db/engine/memtable/MemTableFlushTaskV3.java    | 64 +++++++++-------------
 .../engine/filenodeV2/FileNodeProcessorV2Test.java |  2 +-
 3 files changed, 65 insertions(+), 65 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
index 724b021..43e6a78 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
@@ -17,7 +17,6 @@ package org.apache.iotdb.db.engine.memtable;
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.function.Consumer;
 import org.apache.iotdb.db.engine.pool.FlushSubTaskPoolManager;
@@ -40,8 +39,8 @@ public class MemTableFlushTaskV2 {
   private static final int PAGE_SIZE_THRESHOLD = TSFileConfig.pageSizeInByte;
   private static final FlushSubTaskPoolManager subTaskPoolManager = FlushSubTaskPoolManager
       .getInstance();
-  private Future memoryFlushTaskFuture;
-  private Future ioFlushTaskFuture;
+  private Future memoryFlushTask;
+  private Future ioFlushTask;
   private NativeRestorableIOWriter tsFileIoWriter;
 
   private ConcurrentLinkedQueue ioTaskQueue = new ConcurrentLinkedQueue();
@@ -57,17 +56,21 @@ public class MemTableFlushTaskV2 {
     this.tsFileIoWriter = writer;
     this.storageGroup = storageGroup;
     this.flushCallBack = callBack;
-    this.memoryFlushTaskFuture = subTaskPoolManager.submit(memoryFlushTask);
-    this.ioFlushTaskFuture = subTaskPoolManager.submit(ioFlushTask);
+//    this.memoryFlushTask = subTaskPoolManager.submit(memoryFlushThread);
+
+    memoryFlushThread.start();
+    ioFlushThread.start();
     LOGGER.info("flush task created in Storage group {} ", storageGroup);
   }
 
-  private Runnable memoryFlushTask = new Runnable() {
-    @Override
-    public void run() {
+
+  private Thread memoryFlushThread = new Thread(() -> {
       long memSerializeTime = 0;
       LOGGER.info("Storage group {},start serialize data into mem.", storageGroup);
       while (!stop) {
+        if (!memoryTaskQueue.isEmpty()) {
+          LOGGER.info("memory task queue is {}", memoryTaskQueue);
+        }
         Object task = memoryTaskQueue.poll();
         if (task == null) {
           try {
@@ -77,10 +80,10 @@ public class MemTableFlushTaskV2 {
           }
         } else {
           if (task instanceof String) {
+            LOGGER.info("add String {} to io queue", task);
             ioTaskQueue.add(task);
-          } else if (task instanceof ChunkGroupIoTask) {
-            ioTaskQueue.add(task);
-          } else {
+          } else if (task instanceof Pair) {
+            LOGGER.info("add chunk writer {}", task);
             long starTime = System.currentTimeMillis();
             Pair<List<TimeValuePair>, MeasurementSchema> memorySerializeTask = (Pair<List<TimeValuePair>, MeasurementSchema>) task;
             ChunkBuffer chunkBuffer = new ChunkBuffer(memorySerializeTask.right);
@@ -95,20 +98,20 @@ public class MemTableFlushTaskV2 {
               throw new RuntimeException(e);
             }
             memSerializeTime += System.currentTimeMillis() - starTime;
+          } else {
+            LOGGER.info("end chunk group {} io task to io task queue", task.toString());
+            ioTaskQueue.add(task);
           }
         }
       }
       LOGGER.info("Storage group {}, flushing a memtable into disk: serialize data into mem cost {} ms.",
           storageGroup, memSerializeTime);
-    }
-  };
+  }, Thread.currentThread().getId() + "-1");
 
 
   //TODO a better way is: for each TsFile, assign it a Executors.singleThreadPool,
   // rather than per each memtable.
-  private Runnable ioFlushTask = new Runnable() {
-    @Override
-    public void run() {
+  private Thread ioFlushThread = new Thread(() -> {
       long ioTime = 0;
       LOGGER.info("Storage group {}, start io cost.", storageGroup);
       while (!stop) {
@@ -123,15 +126,17 @@ public class MemTableFlushTaskV2 {
           long starTime = System.currentTimeMillis();
           try {
             if (seriesWriterOrEndChunkGroupTask instanceof IChunkWriter) {
+              LOGGER.info("write series to disk");
               ((IChunkWriter) seriesWriterOrEndChunkGroupTask).writeToFileWriter(tsFileIoWriter);
             } else if (seriesWriterOrEndChunkGroupTask instanceof String) {
+              LOGGER.info("start chunk group");
               tsFileIoWriter.startChunkGroup((String) seriesWriterOrEndChunkGroupTask);
             } else {
+              LOGGER.info("end chunk group {} io task from task queue", seriesWriterOrEndChunkGroupTask.toString());
+              LOGGER.info("end chunk group");
               ChunkGroupIoTask task = (ChunkGroupIoTask) seriesWriterOrEndChunkGroupTask;
               tsFileIoWriter.endChunkGroup(task.version);
-              synchronized (task){
-                task.notify();
-              }
+              task.finished = true;
             }
           } catch (IOException e) {
             LOGGER.error("Storage group {}, io error.", storageGroup, e);
@@ -141,8 +146,7 @@ public class MemTableFlushTaskV2 {
         }
       }
       LOGGER.info("flushing a memtable in storage group {}, cost {}ms", storageGroup, ioTime);
-    }
-  };
+  }, Thread.currentThread().getId() + "-2");
 
 
   private void writeOneSeries(List<TimeValuePair> tvPairs, IChunkWriter seriesWriterImpl,
@@ -190,9 +194,12 @@ public class MemTableFlushTaskV2 {
     long sortTime = 0;
     ChunkGroupIoTask theLastTask = EMPTY_TASK;
     this.memTable = imemTable;
+    LOGGER.info("Current thread id is {}" , Thread.currentThread().getId());
     for (String deviceId : imemTable.getMemTableMap().keySet()) {
       memoryTaskQueue.add(deviceId);
       int seriesNumber = imemTable.getMemTableMap().get(deviceId).size();
+      LOGGER.info("series number: {}", seriesNumber);
+      LOGGER.info("add device, memory queue {}", memoryTaskQueue);
       for (String measurementId : imemTable.getMemTableMap().get(deviceId).keySet()) {
         long startTime = System.currentTimeMillis();
         // TODO if we can not use TSFileIO writer, then we have to redesign the class of TSFileIO.
@@ -200,17 +207,21 @@ public class MemTableFlushTaskV2 {
         MeasurementSchema desc = fileSchema.getMeasurementSchema(measurementId);
         List<TimeValuePair> sortedTimeValuePairs = series.getSortedTimeValuePairList();
         sortTime += System.currentTimeMillis() - startTime;
+        LOGGER.info("add seies writer in flush thread {}", sortedTimeValuePairs);
         memoryTaskQueue.add(new Pair<>(sortedTimeValuePairs, desc));
+        LOGGER.info("add series writer, memory queue {}", memoryTaskQueue);
       }
       theLastTask = new ChunkGroupIoTask(seriesNumber, deviceId, imemTable.getVersion());
+      LOGGER.info("ChunkGroupIoTask task {}", theLastTask.toString());
       memoryTaskQueue.add(theLastTask);
+      LOGGER.info("add chunk group to task, memory queue {}", memoryTaskQueue);
     }
     LOGGER.info(
         "{}, flushing a memtable into disk: data sort time cost {} ms.",
         storageGroup, sortTime);
-    synchronized (theLastTask){
+    while (!theLastTask.finished) {
       try {
-        theLastTask.wait();
+        Thread.sleep(10);
       } catch (InterruptedException e) {
         LOGGER.error("Storage group {}, flush memtable table thread is interrupted.",
             storageGroup, e);
@@ -219,10 +230,7 @@ public class MemTableFlushTaskV2 {
     }
     stop = true;
 
-    try {
-      ioFlushTaskFuture.get();
-    } catch (InterruptedException | ExecutionException e) {
-      LOGGER.error("Waiting for IO flush task end meets error", e);
+    while (ioFlushThread.isAlive()) {
     }
 
     LOGGER.info("flushing a memtable finished!");
@@ -235,6 +243,7 @@ public class MemTableFlushTaskV2 {
     int seriesNumber;
     String deviceId;
     long version;
+    volatile boolean finished;
 
     public ChunkGroupIoTask(int seriesNumber, String deviceId, long version) {
       this(seriesNumber, deviceId, version, false);
@@ -244,6 +253,7 @@ public class MemTableFlushTaskV2 {
       this.seriesNumber = seriesNumber;
       this.deviceId = deviceId;
       this.version = version;
+      this.finished = finished;
     }
   }
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV3.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV3.java
index 12e00d2..71f7613 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV3.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV3.java
@@ -17,6 +17,7 @@ package org.apache.iotdb.db.engine.memtable;
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.function.Consumer;
 import org.apache.iotdb.db.engine.pool.FlushSubTaskPoolManager;
@@ -39,8 +40,8 @@ public class MemTableFlushTaskV3 {
   private static final int PAGE_SIZE_THRESHOLD = TSFileConfig.pageSizeInByte;
   private static final FlushSubTaskPoolManager subTaskPoolManager = FlushSubTaskPoolManager
       .getInstance();
-  private Future memoryFlushTask;
-  private Future ioFlushTask;
+  private Future memoryFlushTaskFuture;
+  private Future ioFlushTaskFuture;
   private NativeRestorableIOWriter tsFileIoWriter;
 
   private ConcurrentLinkedQueue ioTaskQueue = new ConcurrentLinkedQueue();
@@ -56,21 +57,17 @@ public class MemTableFlushTaskV3 {
     this.tsFileIoWriter = writer;
     this.storageGroup = storageGroup;
     this.flushCallBack = callBack;
-//    this.memoryFlushTask = subTaskPoolManager.submit(memoryFlushThread);
-
-    memoryFlushThread.start();
-    ioFlushThread.start();
+    this.memoryFlushTaskFuture = subTaskPoolManager.submit(memoryFlushTask);
+    this.ioFlushTaskFuture = subTaskPoolManager.submit(ioFlushTask);
     LOGGER.info("flush task created in Storage group {} ", storageGroup);
   }
 
-
-  private Thread memoryFlushThread = new Thread(() -> {
+  private Runnable memoryFlushTask = new Runnable() {
+    @Override
+    public void run() {
       long memSerializeTime = 0;
       LOGGER.info("Storage group {},start serialize data into mem.", storageGroup);
       while (!stop) {
-        if (!memoryTaskQueue.isEmpty()) {
-          LOGGER.info("memory task queue is {}", memoryTaskQueue);
-        }
         Object task = memoryTaskQueue.poll();
         if (task == null) {
           try {
@@ -80,10 +77,10 @@ public class MemTableFlushTaskV3 {
           }
         } else {
           if (task instanceof String) {
-            LOGGER.info("add String {} to io queue", task);
             ioTaskQueue.add(task);
-          } else if (task instanceof Pair) {
-            LOGGER.info("add chunk writer {}", task);
+          } else if (task instanceof ChunkGroupIoTask) {
+            ioTaskQueue.add(task);
+          } else {
             long starTime = System.currentTimeMillis();
             Pair<List<TimeValuePair>, MeasurementSchema> memorySerializeTask = (Pair<List<TimeValuePair>, MeasurementSchema>) task;
             ChunkBuffer chunkBuffer = new ChunkBuffer(memorySerializeTask.right);
@@ -98,20 +95,20 @@ public class MemTableFlushTaskV3 {
               throw new RuntimeException(e);
             }
             memSerializeTime += System.currentTimeMillis() - starTime;
-          } else {
-            LOGGER.info("end chunk group {} io task to io task queue", task.toString());
-            ioTaskQueue.add(task);
           }
         }
       }
       LOGGER.info("Storage group {}, flushing a memtable into disk: serialize data into mem cost {} ms.",
           storageGroup, memSerializeTime);
-  }, Thread.currentThread().getId() + "-1");
+    }
+  };
 
 
   //TODO a better way is: for each TsFile, assign it a Executors.singleThreadPool,
   // rather than per each memtable.
-  private Thread ioFlushThread = new Thread(() -> {
+  private Runnable ioFlushTask = new Runnable() {
+    @Override
+    public void run() {
       long ioTime = 0;
       LOGGER.info("Storage group {}, start io cost.", storageGroup);
       while (!stop) {
@@ -126,17 +123,15 @@ public class MemTableFlushTaskV3 {
           long starTime = System.currentTimeMillis();
           try {
             if (seriesWriterOrEndChunkGroupTask instanceof IChunkWriter) {
-              LOGGER.info("write series to disk");
               ((IChunkWriter) seriesWriterOrEndChunkGroupTask).writeToFileWriter(tsFileIoWriter);
             } else if (seriesWriterOrEndChunkGroupTask instanceof String) {
-              LOGGER.info("start chunk group");
               tsFileIoWriter.startChunkGroup((String) seriesWriterOrEndChunkGroupTask);
             } else {
-              LOGGER.info("end chunk group {} io task from task queue", seriesWriterOrEndChunkGroupTask.toString());
-              LOGGER.info("end chunk group");
               ChunkGroupIoTask task = (ChunkGroupIoTask) seriesWriterOrEndChunkGroupTask;
               tsFileIoWriter.endChunkGroup(task.version);
-              task.finished = true;
+              synchronized (task){
+                task.notify();
+              }
             }
           } catch (IOException e) {
             LOGGER.error("Storage group {}, io error.", storageGroup, e);
@@ -146,7 +141,8 @@ public class MemTableFlushTaskV3 {
         }
       }
       LOGGER.info("flushing a memtable in storage group {}, cost {}ms", storageGroup, ioTime);
-  }, Thread.currentThread().getId() + "-2");
+    }
+  };
 
 
   private void writeOneSeries(List<TimeValuePair> tvPairs, IChunkWriter seriesWriterImpl,
@@ -194,12 +190,9 @@ public class MemTableFlushTaskV3 {
     long sortTime = 0;
     ChunkGroupIoTask theLastTask = EMPTY_TASK;
     this.memTable = imemTable;
-    LOGGER.info("Current thread id is {}" , Thread.currentThread().getId());
     for (String deviceId : imemTable.getMemTableMap().keySet()) {
       memoryTaskQueue.add(deviceId);
       int seriesNumber = imemTable.getMemTableMap().get(deviceId).size();
-      LOGGER.info("series number: {}", seriesNumber);
-      LOGGER.info("add device, memory queue {}", memoryTaskQueue);
       for (String measurementId : imemTable.getMemTableMap().get(deviceId).keySet()) {
         long startTime = System.currentTimeMillis();
         // TODO if we can not use TSFileIO writer, then we have to redesign the class of TSFileIO.
@@ -207,21 +200,17 @@ public class MemTableFlushTaskV3 {
         MeasurementSchema desc = fileSchema.getMeasurementSchema(measurementId);
         List<TimeValuePair> sortedTimeValuePairs = series.getSortedTimeValuePairList();
         sortTime += System.currentTimeMillis() - startTime;
-        LOGGER.info("add seies writer in flush thread {}", sortedTimeValuePairs);
         memoryTaskQueue.add(new Pair<>(sortedTimeValuePairs, desc));
-        LOGGER.info("add series writer, memory queue {}", memoryTaskQueue);
       }
       theLastTask = new ChunkGroupIoTask(seriesNumber, deviceId, imemTable.getVersion());
-      LOGGER.info("ChunkGroupIoTask task {}", theLastTask.toString());
       memoryTaskQueue.add(theLastTask);
-      LOGGER.info("add chunk group to task, memory queue {}", memoryTaskQueue);
     }
     LOGGER.info(
         "{}, flushing a memtable into disk: data sort time cost {} ms.",
         storageGroup, sortTime);
-    while (!theLastTask.finished) {
+    synchronized (theLastTask){
       try {
-        Thread.sleep(10);
+        theLastTask.wait();
       } catch (InterruptedException e) {
         LOGGER.error("Storage group {}, flush memtable table thread is interrupted.",
             storageGroup, e);
@@ -230,7 +219,10 @@ public class MemTableFlushTaskV3 {
     }
     stop = true;
 
-    while (ioFlushThread.isAlive()) {
+    try {
+      ioFlushTaskFuture.get();
+    } catch (InterruptedException | ExecutionException e) {
+      LOGGER.error("Waiting for IO flush task end meets error", e);
     }
 
     LOGGER.info("flushing a memtable finished!");
@@ -243,7 +235,6 @@ public class MemTableFlushTaskV3 {
     int seriesNumber;
     String deviceId;
     long version;
-    volatile boolean finished;
 
     public ChunkGroupIoTask(int seriesNumber, String deviceId, long version) {
       this(seriesNumber, deviceId, version, false);
@@ -253,7 +244,6 @@ public class MemTableFlushTaskV3 {
       this.seriesNumber = seriesNumber;
       this.deviceId = deviceId;
       this.version = version;
-      this.finished = finished;
     }
   }
 
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
index a3915aa..7f59f19 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
@@ -61,9 +61,9 @@ public class FileNodeProcessorV2Test {
       processor.asyncForceClose();
     }
 
-    System.out.println("reach");
 
     processor.syncCloseFileNode();
+    System.out.println("reach");
     QueryDataSourceV2 queryDataSource = null;
     try {
       queryDataSource = processor.query(deviceId, measurementId);