You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/02/04 08:45:06 UTC

[iotdb] 01/02: s

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch pipeline_test
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 621856f78c0e7747abe236326b85977ab84b3296
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Thu Feb 4 16:28:34 2021 +0800

    s
---
 .../iotdb/db/engine/flush/MemTableFlushTask.java   | 173 +++++++++------------
 1 file changed, 74 insertions(+), 99 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index f67b486..b06e170 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -19,19 +19,16 @@
 package org.apache.iotdb.db.engine.flush;
 
 import java.io.IOException;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import java.util.concurrent.TimeUnit;
 import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
 import org.apache.iotdb.db.exception.runtime.FlushRunTimeException;
 import org.apache.iotdb.db.utils.datastructure.TVList;
-import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
@@ -46,23 +43,18 @@ public class MemTableFlushTask {
   private static final Logger LOGGER = LoggerFactory.getLogger(MemTableFlushTask.class);
   private static final FlushSubTaskPoolManager SUB_TASK_POOL_MANAGER = FlushSubTaskPoolManager
       .getInstance();
-  private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private final Future<?> encodingTaskFuture;
   private final Future<?> ioTaskFuture;
   private RestorableTsFileIOWriter writer;
 
-  private final LinkedBlockingQueue<Object> encodingTaskQueue = new LinkedBlockingQueue<>();
-  private final LinkedBlockingQueue<Object> ioTaskQueue = (config.isEnableMemControl()
-      && SystemInfo.getInstance().isEncodingFasterThanIo())
-          ? new LinkedBlockingQueue<>(config.getIoTaskQueueSizeForFlushing())
-          : new LinkedBlockingQueue<>();
-
+  private final ConcurrentLinkedQueue<Object> ioTaskQueue = new ConcurrentLinkedQueue<>();
+  private final ConcurrentLinkedQueue<Object> encodingTaskQueue = new ConcurrentLinkedQueue<>();
   private String storageGroup;
 
   private IMemTable memTable;
 
-  private volatile long memSerializeTime = 0L;
-  private volatile long ioTime = 0L;
+  private volatile boolean noMoreEncodingTask = false;
+  private volatile boolean noMoreIOTask = false;
 
   /**
    * @param memTable the memTable to flush
@@ -77,7 +69,7 @@ public class MemTableFlushTask {
     this.encodingTaskFuture = SUB_TASK_POOL_MANAGER.submit(encodingTask);
     this.ioTaskFuture = SUB_TASK_POOL_MANAGER.submit(ioTask);
     LOGGER.debug("flush task of Storage group {} memtable is created, flushing to file {}.",
-              storageGroup, writer.getFile().getName());
+        storageGroup, writer.getFile().getName());
   }
 
   /**
@@ -89,19 +81,12 @@ public class MemTableFlushTask {
         storageGroup,
         memTable.memSize(),
         memTable.getTotalPointsNum() / memTable.getSeriesNumber());
-
-    long estimatedTemporaryMemSize = 0L;
-    if (config.isEnableMemControl() && SystemInfo.getInstance().isEncodingFasterThanIo()) {
-      estimatedTemporaryMemSize = memTable.memSize() / memTable.getSeriesNumber()
-          * config.getIoTaskQueueSizeForFlushing();
-      SystemInfo.getInstance().applyTemporaryMemoryForFlushing(estimatedTemporaryMemSize);
-    }
     long start = System.currentTimeMillis();
     long sortTime = 0;
 
     //for map do not use get(key) to iteratate
     for (Map.Entry<String, Map<String, IWritableMemChunk>> memTableEntry : memTable.getMemTableMap().entrySet()) {
-      encodingTaskQueue.put(new StartFlushGroupIOTask(memTableEntry.getKey()));
+      encodingTaskQueue.add(new StartFlushGroupIOTask(memTableEntry.getKey()));
 
       final Map<String, IWritableMemChunk> value = memTableEntry.getValue();
       for (Map.Entry<String, IWritableMemChunk> iWritableMemChunkEntry : value.entrySet()) {
@@ -110,12 +95,13 @@ public class MemTableFlushTask {
         MeasurementSchema desc = series.getSchema();
         TVList tvList = series.getSortedTVListForFlush();
         sortTime += System.currentTimeMillis() - startTime;
-        encodingTaskQueue.put(new Pair<>(tvList, desc));
+        encodingTaskQueue.add(new Pair<>(tvList, desc));
       }
 
-      encodingTaskQueue.put(new EndChunkGroupIoTask());
+      encodingTaskQueue.add(new EndChunkGroupIoTask());
     }
-    encodingTaskQueue.put(new TaskEnd());
+
+    noMoreEncodingTask = true;
     LOGGER.info(
         "Storage group {} memtable flushing into file {}: data sort time cost {} ms.",
         storageGroup, writer.getFile().getName(), sortTime);
@@ -123,6 +109,8 @@ public class MemTableFlushTask {
     try {
       encodingTaskFuture.get();
     } catch (InterruptedException | ExecutionException e) {
+      // avoid ioTask waiting forever
+      noMoreIOTask = true;
       ioTaskFuture.cancel(true);
       throw e;
     }
@@ -135,13 +123,6 @@ public class MemTableFlushTask {
       throw new ExecutionException(e);
     }
 
-    if (config.isEnableMemControl()) {
-      if (estimatedTemporaryMemSize != 0) {
-        SystemInfo.getInstance().releaseTemporaryMemoryForFlushing(estimatedTemporaryMemSize);
-      }
-      SystemInfo.getInstance().setEncodingFasterThanIo(ioTime >= memSerializeTime);
-    }
-
     LOGGER.info(
         "Storage group {} memtable {} flushing a memtable has finished! Time consumption: {}ms",
         storageGroup, memTable, System.currentTimeMillis() - start);
@@ -188,103 +169,97 @@ public class MemTableFlushTask {
     @SuppressWarnings("squid:S135")
     @Override
     public void run() {
+      long memSerializeTime = 0;
+      boolean noMoreMessages = false;
       LOGGER.debug("Storage group {} memtable flushing to file {} starts to encoding data.",
           storageGroup, writer.getFile().getName());
       while (true) {
-
-        Object task = null;
-        try {
-          task = encodingTaskQueue.take();
-        } catch (InterruptedException e1) {
-          LOGGER.error("Take task into ioTaskQueue Interrupted");
-          Thread.currentThread().interrupt();
-          break;
+        if (noMoreEncodingTask) {
+          noMoreMessages = true;
         }
-        if (task instanceof StartFlushGroupIOTask || task instanceof EndChunkGroupIoTask) {
+        Object task = encodingTaskQueue.poll();
+        if (task == null) {
+          if (noMoreMessages) {
+            break;
+          }
           try {
-            ioTaskQueue.put(task);
+            TimeUnit.MILLISECONDS.sleep(10);
           } catch (@SuppressWarnings("squid:S2142") InterruptedException e) {
             LOGGER.error("Storage group {} memtable flushing to file {}, encoding task is interrupted.",
                 storageGroup, writer.getFile().getName(), e);
             // generally it is because the thread pool is shutdown so the task should be aborted
             break;
           }
-        } else if (task instanceof TaskEnd) {
-          break;
         } else {
-          long starTime = System.currentTimeMillis();
-          Pair<TVList, MeasurementSchema> encodingMessage = (Pair<TVList, MeasurementSchema>) task;
-          IChunkWriter seriesWriter = new ChunkWriterImpl(encodingMessage.right);
-          writeOneSeries(encodingMessage.left, seriesWriter, encodingMessage.right.getType());
-          seriesWriter.sealCurrentPage();
-          seriesWriter.clearPageWriter();
-          try {
-            ioTaskQueue.put(seriesWriter);
-          } catch (InterruptedException e) {
-            LOGGER.error("Put task into ioTaskQueue Interrupted");
-            Thread.currentThread().interrupt();
+          if (task instanceof StartFlushGroupIOTask || task instanceof EndChunkGroupIoTask) {
+            ioTaskQueue.add(task);
+          } else {
+            long starTime = System.currentTimeMillis();
+            Pair<TVList, MeasurementSchema> encodingMessage = (Pair<TVList, MeasurementSchema>) task;
+            IChunkWriter seriesWriter = new ChunkWriterImpl(encodingMessage.right);
+            writeOneSeries(encodingMessage.left, seriesWriter, encodingMessage.right.getType());
+            seriesWriter.sealCurrentPage();
+            seriesWriter.clearPageWriter();
+            memSerializeTime += System.currentTimeMillis() - starTime;
+            ioTaskQueue.add(seriesWriter);
           }
-          memSerializeTime += System.currentTimeMillis() - starTime;
         }
       }
-      try {
-        ioTaskQueue.put(new TaskEnd());
-      } catch (InterruptedException e) {
-        LOGGER.error("Put task into ioTaskQueue Interrupted");
-        Thread.currentThread().interrupt();
-      }
-      
-      LOGGER.info("Storage group {}, flushing memtable {} into disk: Encoding data cost "
-          + "{} ms.",
+      noMoreIOTask = true;
+      LOGGER.info("Storage group {}, flushing memtable into file {}: Encoding data cost "
+              + "{} ms.",
           storageGroup, writer.getFile().getName(), memSerializeTime);
     }
   };
 
   @SuppressWarnings("squid:S135")
   private Runnable ioTask = () -> {
+    long ioTime = 0;
+    boolean returnWhenNoTask = false;
     LOGGER.debug("Storage group {} memtable flushing to file {} start io.",
         storageGroup, writer.getFile().getName());
     while (true) {
-      Object ioMessage = null;
-      try {
-        ioMessage = ioTaskQueue.take();
-      } catch (InterruptedException e1) {
-        LOGGER.error("take task from ioTaskQueue Interrupted");
-        Thread.currentThread().interrupt();
-        break;
+      if (noMoreIOTask) {
+        returnWhenNoTask = true;
       }
-      long starTime = System.currentTimeMillis();
-      try {
-        if (ioMessage instanceof StartFlushGroupIOTask) {
-          this.writer.startChunkGroup(((StartFlushGroupIOTask) ioMessage).deviceId);
-        } else if (ioMessage instanceof TaskEnd) {
+      Object ioMessage = ioTaskQueue.poll();
+      if (ioMessage == null) {
+        if (returnWhenNoTask) {
+          break;
+        }
+        try {
+          TimeUnit.MILLISECONDS.sleep(10);
+        } catch (@SuppressWarnings("squid:S2142") InterruptedException e) {
+          LOGGER.error("Storage group {} memtable flushing to file {}, io task is interrupted.",
+              storageGroup, writer.getFile().getName());
+          // generally it is because the thread pool is shutdown so the task should be aborted
           break;
-        } else if (ioMessage instanceof IChunkWriter) {
-          ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
-          chunkWriter.writeToFileWriter(this.writer);
-        } else {
-          this.writer.setMinPlanIndex(memTable.getMinPlanIndex());
-          this.writer.setMaxPlanIndex(memTable.getMaxPlanIndex());
-          this.writer.endChunkGroup();
         }
-      } catch (IOException e) {
-        LOGGER.error("Storage group {} memtable {}, io task meets error.", storageGroup,
-            memTable, e);
-        throw new FlushRunTimeException(e);
+      } else {
+        long starTime = System.currentTimeMillis();
+        try {
+          if (ioMessage instanceof StartFlushGroupIOTask) {
+//            this.writer.startChunkGroup(((StartFlushGroupIOTask) ioMessage).deviceId);
+          } else if (ioMessage instanceof IChunkWriter) {
+            ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
+//            chunkWriter.writeToFileWriter(this.writer);
+          } else {
+//            this.writer.setMinPlanIndex(memTable.getMinPlanIndex());
+//            this.writer.setMaxPlanIndex(memTable.getMaxPlanIndex());
+            this.writer.endChunkGroup();
+          }
+        } catch (IOException e) {
+          LOGGER.error("Storage group {} memtable flushing to file {}, io task meets error.",
+              storageGroup, writer.getFile().getName(), e);
+          throw new FlushRunTimeException(e);
+        }
+        ioTime += System.currentTimeMillis() - starTime;
       }
-      ioTime += System.currentTimeMillis() - starTime;
     }
     LOGGER.info("flushing a memtable to file {} in storage group {}, io cost {}ms",
-            writer.getFile().getName(), storageGroup, ioTime);
+        writer.getFile().getName(), storageGroup, ioTime);
   };
 
-  static class TaskEnd {
-
-    TaskEnd() {
-
-    }
-  }
-
   static class EndChunkGroupIoTask {
 
     EndChunkGroupIoTask() {
@@ -300,4 +275,4 @@ public class MemTableFlushTask {
       this.deviceId = deviceId;
     }
   }
-}
+}
\ No newline at end of file