You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/01/27 04:59:02 UTC

[iotdb] branch two_stage_pipeline updated: change to one stage

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch two_stage_pipeline
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/two_stage_pipeline by this push:
     new 0b91782  change to one stage
0b91782 is described below

commit 0b9178239bd5a91e599af38caf20bbea2168a4ec
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Wed Jan 27 12:58:20 2021 +0800

    change to one stage
---
 .../iotdb/db/engine/flush/MemTableFlushTask.java   | 112 ++++-----------------
 1 file changed, 19 insertions(+), 93 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index cb72070..eaf2fc7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -20,14 +20,8 @@ package org.apache.iotdb.db.engine.flush;
 
 import java.io.IOException;
 import java.util.Map;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
-import org.apache.iotdb.db.exception.runtime.FlushRunTimeException;
 import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
@@ -40,17 +34,12 @@ import org.slf4j.LoggerFactory;
 public class MemTableFlushTask {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(MemTableFlushTask.class);
-  private static final FlushSubTaskPoolManager SUB_TASK_POOL_MANAGER = FlushSubTaskPoolManager
-      .getInstance();
-  private final Future<?> ioTaskFuture;
-  private RestorableTsFileIOWriter writer;
+  private final RestorableTsFileIOWriter writer;
 
-  private final ConcurrentLinkedQueue<Object> ioTaskQueue = new ConcurrentLinkedQueue<>();
-  private String storageGroup;
+  private final String storageGroup;
 
-  private IMemTable memTable;
+  private final IMemTable memTable;
 
-  private volatile boolean noMoreIOTask = false;
 
   /**
    * @param memTable the memTable to flush
@@ -62,7 +51,6 @@ public class MemTableFlushTask {
     this.memTable = memTable;
     this.writer = writer;
     this.storageGroup = storageGroup;
-    this.ioTaskFuture = SUB_TASK_POOL_MANAGER.submit(ioTask);
     LOGGER.debug("flush task of Storage group {} memtable {} is created ",
         storageGroup, memTable.getVersion());
   }
@@ -71,7 +59,7 @@ public class MemTableFlushTask {
    * the function for flushing memtable.
    */
   public void syncFlushMemTable()
-      throws ExecutionException, InterruptedException {
+      throws InterruptedException, IOException {
     LOGGER.info("The memTable size of SG {} is {}, the avg series points num in chunk is {} ",
         storageGroup,
         memTable.memSize(),
@@ -79,11 +67,11 @@ public class MemTableFlushTask {
     long start = System.currentTimeMillis();
     long sortTime = 0;
     long encodingTime = 0;
+    long ioTime = 0;
 
     //for map do not use get(key) to iteratate
     for (Map.Entry<String, Map<String, IWritableMemChunk>> memTableEntry : memTable.getMemTableMap().entrySet()) {
-      ioTaskQueue.add(new StartFlushGroupIOTask(memTableEntry.getKey()));
-
+      writer.startChunkGroup(memTableEntry.getKey());
       final Map<String, IWritableMemChunk> value = memTableEntry.getValue();
       for (Map.Entry<String, IWritableMemChunk> iWritableMemChunkEntry : value.entrySet()) {
         long startTime = System.currentTimeMillis();
@@ -96,29 +84,29 @@ public class MemTableFlushTask {
         writeOneSeries(tvList, seriesWriter, desc.getType());
         seriesWriter.sealCurrentPage();
         seriesWriter.clearPageWriter();
-        encodingTime += (System.currentTimeMillis() - encodingStartTime);
-        ioTaskQueue.add(seriesWriter);
+        long ioStartTime = System.currentTimeMillis();
+        encodingTime += (ioStartTime - encodingStartTime);
+        seriesWriter.writeToFileWriter(this.writer);
+        ioTime += (System.currentTimeMillis() - ioStartTime);
       }
-
-      ioTaskQueue.add(new EndChunkGroupIoTask());
+      long ioStartTime = System.currentTimeMillis();
+      writer.setMinPlanIndex(memTable.getMinPlanIndex());
+      writer.setMaxPlanIndex(memTable.getMaxPlanIndex());
+      writer.endChunkGroup();
+      ioTime += (System.currentTimeMillis() - ioStartTime);
     }
 
-    noMoreIOTask = true;
     LOGGER.info(
         "Storage group {} memtable {}, flushing into disk: data sort time cost {} ms.",
         storageGroup, memTable.getVersion(), sortTime);
     LOGGER.info("Storage group {}, flushing memtable {} into disk: Encoding data cost "
             + "{} ms.",
         storageGroup, memTable.getVersion(), encodingTime);
+    LOGGER.info("flushing a memtable {} in storage group {}, io cost {}ms", memTable.getVersion(),
+        storageGroup, ioTime);
 
-    ioTaskFuture.get();
-
-    try {
-      writer.writeVersion(memTable.getVersion());
-      writer.writePlanIndices();
-    } catch (IOException e) {
-      throw new ExecutionException(e);
-    }
+    writer.writeVersion(memTable.getVersion());
+    writer.writePlanIndices();
 
     LOGGER.info(
         "Storage group {} memtable {} flushing a memtable has finished! Time consumption: {}ms",
@@ -164,66 +152,4 @@ public class MemTableFlushTask {
     }
   }
 
-  @SuppressWarnings("squid:S135")
-  private Runnable ioTask = () -> {
-    long ioTime = 0;
-    boolean returnWhenNoTask = false;
-    LOGGER.debug("Storage group {} memtable {}, start io.", storageGroup, memTable.getVersion());
-    while (true) {
-      if (noMoreIOTask) {
-        returnWhenNoTask = true;
-      }
-      Object ioMessage = ioTaskQueue.poll();
-      if (ioMessage == null) {
-        if (returnWhenNoTask) {
-          break;
-        }
-        try {
-          TimeUnit.MILLISECONDS.sleep(10);
-        } catch (@SuppressWarnings("squid:S2142") InterruptedException e) {
-          LOGGER.error("Storage group {} memtable {}, io task is interrupted.", storageGroup
-              , memTable.getVersion());
-          // generally it is because the thread pool is shutdown so the task should be aborted
-          break;
-        }
-      } else {
-        long starTime = System.currentTimeMillis();
-        try {
-          if (ioMessage instanceof StartFlushGroupIOTask) {
-            this.writer.startChunkGroup(((StartFlushGroupIOTask) ioMessage).deviceId);
-          } else if (ioMessage instanceof IChunkWriter) {
-            ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
-            chunkWriter.writeToFileWriter(this.writer);
-          } else {
-            this.writer.setMinPlanIndex(memTable.getMinPlanIndex());
-            this.writer.setMaxPlanIndex(memTable.getMaxPlanIndex());
-            this.writer.endChunkGroup();
-          }
-        } catch (IOException e) {
-          LOGGER.error("Storage group {} memtable {}, io task meets error.", storageGroup,
-              memTable.getVersion(), e);
-          throw new FlushRunTimeException(e);
-        }
-        ioTime += System.currentTimeMillis() - starTime;
-      }
-    }
-    LOGGER.info("flushing a memtable {} in storage group {}, io cost {}ms", memTable.getVersion(),
-        storageGroup, ioTime);
-  };
-
-  static class EndChunkGroupIoTask {
-
-    EndChunkGroupIoTask() {
-
-    }
-  }
-
-  static class StartFlushGroupIOTask {
-
-    private final String deviceId;
-
-    StartFlushGroupIOTask(String deviceId) {
-      this.deviceId = deviceId;
-    }
-  }
 }