You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2020/12/28 08:12:59 UTC

[iotdb] 01/01: fix flush OOM

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch flush_OOM_11
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c38974b1ba877c9cfc742ff17c3eae949cfecf10
Author: HTHou <hh...@outlook.com>
AuthorDate: Mon Dec 28 16:12:23 2020 +0800

    fix flush OOM
---
 .../resources/conf/iotdb-engine.properties         |  3 ++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 13 ++++++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  4 ++
 .../iotdb/db/engine/flush/MemTableFlushTask.java   | 49 +++++++++++-----------
 4 files changed, 44 insertions(+), 25 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 3cf1302..7c08de1 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -267,6 +267,9 @@ max_waiting_time_when_insert_blocked=10000
 # estimated metadata size (in byte) of one timeseries in Mtree
 estimated_series_size=300
 
+# size of ioTaskQueue and encodingTaskQueue. The default value is 2147483647
+queue_size_for_flushing=2147483647
+
 ####################
 ### Upgrade Configurations
 ####################
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 5582a96..ac90660 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -759,6 +759,11 @@ public class IoTDBConfig {
    */
   private boolean debugState = false;
 
+  /**
+   * the size of ioTaskQueue and encodingTaskQueue
+   */
+  private int queueSizeForFlushing = Integer.MAX_VALUE;
+
   public IoTDBConfig() {
     // empty constructor
   }
@@ -2045,4 +2050,12 @@ public class IoTDBConfig {
   public void setDebugState(boolean debugState) {
     this.debugState = debugState;
   }
+
+  public int getQueueSizeForFlushing() {
+    return queueSizeForFlushing;
+  }
+
+  public void setQueueSizeForFlushing(int queueSizeForFlushing) {
+    this.queueSizeForFlushing = queueSizeForFlushing;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index dc95ef1..15520ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -295,6 +295,10 @@ public class IoTDBDescriptor {
           .getProperty("estimated_series_size",
               Integer.toString(conf.getEstimatedSeriesSize()))));
 
+      conf.setQueueSizeForFlushing(Integer.parseInt(properties
+          .getProperty("queue_size_for_flushing",
+              Integer.toString(conf.getQueueSizeForFlushing()))));
+
       conf.setMergeChunkPointNumberThreshold(Integer.parseInt(properties
           .getProperty("merge_chunk_point_number",
               Integer.toString(conf.getMergeChunkPointNumberThreshold()))));
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index e9f4c92..550c35f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -19,10 +19,12 @@
 package org.apache.iotdb.db.engine.flush;
 
 import java.io.IOException;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
@@ -42,12 +44,15 @@ public class MemTableFlushTask {
   private static final Logger logger = LoggerFactory.getLogger(MemTableFlushTask.class);
   private static final FlushSubTaskPoolManager subTaskPoolManager = FlushSubTaskPoolManager
       .getInstance();
+  private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private final Future<?> encodingTaskFuture;
   private final Future<?> ioTaskFuture;
   private RestorableTsFileIOWriter writer;
 
-  private final ConcurrentLinkedQueue<Object> ioTaskQueue = new ConcurrentLinkedQueue<>();
-  private final ConcurrentLinkedQueue<Object> encodingTaskQueue = new ConcurrentLinkedQueue<>();
+  private final LinkedBlockingQueue<Object> ioTaskQueue =
+      new LinkedBlockingQueue<>(config.getQueueSizeForFlushing());
+  private final LinkedBlockingQueue<Object> encodingTaskQueue = 
+      new LinkedBlockingQueue<>(config.getQueueSizeForFlushing());
   private String storageGroup;
 
   private IMemTable memTable;
@@ -84,16 +89,16 @@ public class MemTableFlushTask {
     long sortTime = 0;
 
     for (String deviceId : memTable.getMemTableMap().keySet()) {
-      encodingTaskQueue.add(new StartFlushGroupIOTask(deviceId));
+      encodingTaskQueue.put(new StartFlushGroupIOTask(deviceId));
       for (String measurementId : memTable.getMemTableMap().get(deviceId).keySet()) {
         long startTime = System.currentTimeMillis();
         IWritableMemChunk series = memTable.getMemTableMap().get(deviceId).get(measurementId);
         MeasurementSchema desc = series.getSchema();
         TVList tvList = series.getSortedTVList();
         sortTime += System.currentTimeMillis() - startTime;
-        encodingTaskQueue.add(new Pair<>(tvList, desc));
+        encodingTaskQueue.put(new Pair<>(tvList, desc));
       }
-      encodingTaskQueue.add(new EndChunkGroupIoTask());
+      encodingTaskQueue.put(new EndChunkGroupIoTask());
     }
     noMoreEncodingTask = true;
     logger.debug(
@@ -176,23 +181,25 @@ public class MemTableFlushTask {
           if (noMoreMessages) {
             break;
           }
-          try {
-            TimeUnit.MILLISECONDS.sleep(10);
-          } catch (@SuppressWarnings("squid:S2142") InterruptedException e) {
-            logger.error("Storage group {} memtable {}, encoding task is interrupted.",
-                storageGroup, memTable.getVersion(), e);
-            // generally it is because the thread pool is shutdown so the task should be aborted
-            break;
-          }
         } else {
           if (task instanceof StartFlushGroupIOTask || task instanceof EndChunkGroupIoTask) {
-            ioTaskQueue.add(task);
+            try {
+              ioTaskQueue.put(task);
+            } catch (InterruptedException e) {
+              // TODO Auto-generated catch block
+              e.printStackTrace();
+            }
           } else {
             long starTime = System.currentTimeMillis();
             Pair<TVList, MeasurementSchema> encodingMessage = (Pair<TVList, MeasurementSchema>) task;
             IChunkWriter seriesWriter = new ChunkWriterImpl(encodingMessage.right);
             writeOneSeries(encodingMessage.left, seriesWriter, encodingMessage.right.getType());
-            ioTaskQueue.add(seriesWriter);
+            try {
+              ioTaskQueue.put(seriesWriter);
+            } catch (InterruptedException e) {
+              // TODO Auto-generated catch block
+              e.printStackTrace();
+            }
             memSerializeTime += System.currentTimeMillis() - starTime;
           }
         }
@@ -218,14 +225,6 @@ public class MemTableFlushTask {
         if (returnWhenNoTask) {
           break;
         }
-        try {
-          TimeUnit.MILLISECONDS.sleep(10);
-        } catch (@SuppressWarnings("squid:S2142") InterruptedException e) {
-          logger.error("Storage group {} memtable {}, io task is interrupted.", storageGroup
-              , memTable.getVersion(), e);
-          // generally it is because the thread pool is shutdown so the task should be aborted
-          break;
-        }
       } else {
         long starTime = System.currentTimeMillis();
         try {