You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2021/01/19 02:54:11 UTC

[GitHub] [iotdb] SilverNarcissus commented on a change in pull request #2358: [IOTDB-1084] Fix temporary memory of flushing may cause OOM

SilverNarcissus commented on a change in pull request #2358:
URL: https://github.com/apache/iotdb/pull/2358#discussion_r559882004



##########
File path: server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
##########
@@ -170,94 +189,104 @@ private void writeOneSeries(TVList tvPairs, IChunkWriter seriesWriterImpl,
     @Override
     public void run() {
       long memSerializeTime = 0;

Review comment:
       You should use class variable `memSerializeTime` rather than this local variable 

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
##########
@@ -170,94 +189,104 @@ private void writeOneSeries(TVList tvPairs, IChunkWriter seriesWriterImpl,
     @Override
     public void run() {
       long memSerializeTime = 0;
-      boolean noMoreMessages = false;
       LOGGER.debug("Storage group {} memtable flushing to file {} starts to encoding data.",
-              storageGroup, writer.getFile().getName());
+          storageGroup, writer.getFile().getName());
       while (true) {
-        if (noMoreEncodingTask) {
-          noMoreMessages = true;
+
+        Object task = null;
+        try {
+          task = encodingTaskQueue.take();
+        } catch (InterruptedException e1) {
+          LOGGER.error("Take task into ioTaskQueue Interrupted");
+          Thread.currentThread().interrupt();
+          break;
         }
-        Object task = encodingTaskQueue.poll();
-        if (task == null) {
-          if (noMoreMessages) {
-            break;
-          }
+        if (task instanceof StartFlushGroupIOTask || task instanceof EndChunkGroupIoTask) {
           try {
-            TimeUnit.MILLISECONDS.sleep(10);
+            ioTaskQueue.put(task);
           } catch (@SuppressWarnings("squid:S2142") InterruptedException e) {
             LOGGER.error("Storage group {} memtable flushing to file {}, encoding task is interrupted.",
                 storageGroup, writer.getFile().getName(), e);
             // generally it is because the thread pool is shutdown so the task should be aborted
             break;
           }
+        } else if (task instanceof TaskEnd) {
+          break;
         } else {
-          if (task instanceof StartFlushGroupIOTask || task instanceof EndChunkGroupIoTask) {
-            ioTaskQueue.add(task);
-          } else {
-            long starTime = System.currentTimeMillis();
-            Pair<TVList, MeasurementSchema> encodingMessage = (Pair<TVList, MeasurementSchema>) task;
-            IChunkWriter seriesWriter = new ChunkWriterImpl(encodingMessage.right);
-            writeOneSeries(encodingMessage.left, seriesWriter, encodingMessage.right.getType());
-            ioTaskQueue.add(seriesWriter);
-            memSerializeTime += System.currentTimeMillis() - starTime;
+          long starTime = System.currentTimeMillis();
+          Pair<TVList, MeasurementSchema> encodingMessage = (Pair<TVList, MeasurementSchema>) task;
+          IChunkWriter seriesWriter = new ChunkWriterImpl(encodingMessage.right);
+          writeOneSeries(encodingMessage.left, seriesWriter, encodingMessage.right.getType());
+          seriesWriter.sealCurrentPage();
+          seriesWriter.clearPageWriter();
+          try {
+            ioTaskQueue.put(seriesWriter);
+          } catch (InterruptedException e) {
+            LOGGER.error("Put task into ioTaskQueue Interrupted");
+            Thread.currentThread().interrupt();
           }
+          memSerializeTime += System.currentTimeMillis() - starTime;
         }
       }
-      noMoreIOTask = true;
-      LOGGER.debug("Storage group {}, flushing memtable into file {}: Encoding data cost "
-              + "{} ms.",
+      try {
+        ioTaskQueue.put(new TaskEnd());
+      } catch (InterruptedException e) {
+        LOGGER.error("Put task into ioTaskQueue Interrupted");
+        Thread.currentThread().interrupt();
+      }
+      
+      LOGGER.debug("Storage group {}, flushing memtable {} into disk: Encoding data cost "
+          + "{} ms.",
           storageGroup, writer.getFile().getName(), memSerializeTime);
     }
   };
 
   @SuppressWarnings("squid:S135")
   private Runnable ioTask = () -> {
     long ioTime = 0;

Review comment:
       You should use class variable ioTime rather than this local variable

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
##########
@@ -43,18 +46,23 @@
   private static final Logger LOGGER = LoggerFactory.getLogger(MemTableFlushTask.class);
   private static final FlushSubTaskPoolManager SUB_TASK_POOL_MANAGER = FlushSubTaskPoolManager
       .getInstance();
+  private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private final Future<?> encodingTaskFuture;
   private final Future<?> ioTaskFuture;
   private RestorableTsFileIOWriter writer;
 
-  private final ConcurrentLinkedQueue<Object> ioTaskQueue = new ConcurrentLinkedQueue<>();
-  private final ConcurrentLinkedQueue<Object> encodingTaskQueue = new ConcurrentLinkedQueue<>();
+  private final LinkedBlockingQueue<Object> encodingTaskQueue = new LinkedBlockingQueue<>();
+  private final LinkedBlockingQueue<Object> ioTaskQueue = (config.isEnableMemControl()
+      && SystemInfo.getInstance().isEncodingFasterThanIo())
+          ? new LinkedBlockingQueue<>(config.getIoTaskQueueSizeForFlushing())
+          : new LinkedBlockingQueue<>();
+
   private String storageGroup;
 
   private IMemTable memTable;
 
-  private volatile boolean noMoreEncodingTask = false;
-  private volatile boolean noMoreIOTask = false;
+  private long memSerializeTime = 0L;
+  private long ioTime = 0L;

Review comment:
       These two variable will be write and read by different thread without locking. The situation is only one thread to write, so no locking is reasonable, but they should be volatile so they can be seen by another thread. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org