You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/12/27 06:00:57 UTC

[iotdb] branch rel/0.11 updated: [To rel/0.11] [IOTDB-1069] restrict the flushing memtable number to avoid OOM when mem_control is disabled (#2317)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.11 by this push:
     new 5afe292  [To rel/0.11] [IOTDB-1069] restrict the flushing memtable number to avoid OOM when mem_control is disabled  (#2317)
5afe292 is described below

commit 5afe2920d32a42b1adfd98161127e495f08d963d
Author: Haonan <hh...@outlook.com>
AuthorDate: Sun Dec 27 14:00:34 2020 +0800

    [To rel/0.11] [IOTDB-1069] restrict the flushing memtable number to avoid OOM when mem_control is disabled  (#2317)
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  13 +++
 .../engine/storagegroup/StorageGroupProcessor.java |   2 +-
 .../db/engine/storagegroup/TsFileProcessor.java    |  19 +++-
 .../org/apache/iotdb/db/metadata/MManager.java     |   8 ++
 .../apache/iotdb/db/rescon/MemTableManager.java    | 110 +++++++++++++++++++++
 .../org/apache/iotdb/db/rescon/SystemInfo.java     |   1 +
 6 files changed, 150 insertions(+), 3 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 5582a96..8c5b7e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -252,6 +252,11 @@ public class IoTDBConfig {
   private String walDir = DEFAULT_BASE_DIR + File.separator + "wal";
 
   /**
+   * Maximum MemTable number. Invalid when enableMemControl is true.
+   */
+  private int maxMemtableNumber = 0;
+
+  /**
    * The amount of data iterate each time in server
    */
   private int batchSize = 100000;
@@ -1053,6 +1058,14 @@ public class IoTDBConfig {
     this.batchSize = batchSize;
   }
 
+  public int getMaxMemtableNumber() {
+    return maxMemtableNumber;
+  }
+
+  public void setMaxMemtableNumber(int maxMemtableNumber) {
+    this.maxMemtableNumber = maxMemtableNumber;
+  }
+
   public int getConcurrentFlushThread() {
     return concurrentFlushThread;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index bfe409d..2bb4cef 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -836,7 +836,7 @@ public class StorageGroupProcessor {
     try {
       tsFileProcessor.insertTablet(insertTabletPlan, start, end, results);
     } catch (WriteProcessRejectException e) {
-      logger.warn("insert to TsFileProcessor rejected ", e);
+      logger.warn("insert to TsFileProcessor rejected, {}", e.getMessage());
       return false;
     } catch (WriteProcessException e) {
       logger.error("insert to TsFileProcessor error ", e);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index c858cde..845cf7a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -57,6 +57,7 @@ import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.rescon.MemTableManager;
 import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
 import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.db.utils.MemUtils;
@@ -173,8 +174,15 @@ public class TsFileProcessor {
   public void insert(InsertRowPlan insertRowPlan) throws WriteProcessException {
 
     if (workMemTable == null) {
-      workMemTable = new PrimitiveMemTable(enableMemControl);
+      if (enableMemControl) {
+        workMemTable = new PrimitiveMemTable(enableMemControl);
+        MemTableManager.getInstance().addMemtableNumber();
+      }
+      else {
+        workMemTable = MemTableManager.getInstance().getAvailableMemTable(tsFileResource);
+      }
     }
+
     if (enableMemControl) {
       checkMemCostAndAddToTspInfo(insertRowPlan);
     }
@@ -214,7 +222,13 @@ public class TsFileProcessor {
       TSStatus[] results) throws WriteProcessException {
 
     if (workMemTable == null) {
-      workMemTable = new PrimitiveMemTable(enableMemControl);
+      if (enableMemControl) {
+        workMemTable = new PrimitiveMemTable(enableMemControl);
+        MemTableManager.getInstance().addMemtableNumber();
+      }
+      else {
+        workMemTable = MemTableManager.getInstance().getAvailableMemTable(tsFileResource);
+      }
     }
 
     try {
@@ -659,6 +673,7 @@ public class TsFileProcessor {
             memTable.isSignalMemTable(), flushingMemTables.size());
       }
       memTable.release();
+      MemTableManager.getInstance().decreaseMemtableNumber();
       if (enableMemControl) {
         // reset the mem cost in StorageGroupProcessorInfo
         storageGroupInfo.releaseStorageGroupMemCost(memTable.getTVListsRamCost());
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 44d2bd2..84bc60b 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -72,6 +72,7 @@ import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
+import org.apache.iotdb.db.rescon.MemTableManager;
 import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
 import org.apache.iotdb.db.utils.RandomDeleteCache;
 import org.apache.iotdb.db.utils.SchemaUtils;
@@ -587,6 +588,9 @@ public class MManager {
   public void setStorageGroup(PartialPath storageGroup) throws MetadataException {
     try {
       mtree.setStorageGroup(storageGroup);
+      if (!config.isEnableMemControl()) {
+        MemTableManager.getInstance().addOrDeleteStorageGroup(1);
+      }
       if (!isRecovering) {
         logWriter.setStorageGroup(storageGroup.getFullPath());
       }
@@ -620,6 +624,10 @@ public class MManager {
           updateSchemaDataTypeNumMap(leafMNode.getSchema().getType(), -1);
         }
 
+        if (!config.isEnableMemControl()) {
+          MemTableManager.getInstance().addOrDeleteStorageGroup(-1);
+        }
+
         // if success
         if (!isRecovering) {
           logWriter.deleteStorageGroup(storageGroup.getFullPath());
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/MemTableManager.java b/server/src/main/java/org/apache/iotdb/db/rescon/MemTableManager.java
new file mode 100644
index 0000000..5244ad8
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/MemTableManager.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.rescon;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+public class MemTableManager {
+
+  private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
+
+  private static final Logger logger = LoggerFactory.getLogger(MemTableManager.class);
+
+  private static final int WAIT_TIME = 100;
+  public static final int MEMTABLE_NUM_FOR_EACH_PARTITION = 4;
+  private int currentMemtableNumber = 0;
+
+  private MemTableManager() {
+  }
+
+  public static MemTableManager getInstance() {
+    return InstanceHolder.INSTANCE;
+  }
+
+  /**
+   * Called when memory control is disabled
+   */
+  public synchronized IMemTable getAvailableMemTable(TsFileResource tsFileResource) {
+    if (!reachMaxMemtableNumber()) {
+      currentMemtableNumber++;
+      return new PrimitiveMemTable();
+    } else {
+      // wait until the total memtable number is less than the capacity of the system
+      int waitCount = 1;
+      while (true) {
+        if (!reachMaxMemtableNumber()) {
+          currentMemtableNumber++;
+          return new PrimitiveMemTable();
+        }
+        try {
+          TimeUnit.MILLISECONDS.sleep(WAIT_TIME);
+        } catch (InterruptedException e) {
+          logger.error("{} fails to wait for memtables {}, continue to wait", tsFileResource, e);
+          Thread.currentThread().interrupt();
+        }
+        logger.info("{} has waited for a memtable for {}ms", tsFileResource, waitCount++ * 100);
+      }
+    }
+  }
+
+  public synchronized int getCurrentMemtableNumber() {
+    return currentMemtableNumber;
+  }
+
+  public synchronized void addMemtableNumber() {
+    currentMemtableNumber++;
+  }
+
+  public synchronized void decreaseMemtableNumber() {
+    currentMemtableNumber--;
+  }
+
+  /**
+   * Called when memory control is disabled
+   */
+  private boolean reachMaxMemtableNumber() {
+    return currentMemtableNumber >= CONFIG.getMaxMemtableNumber();
+  }
+
+  /**
+   * Called when memory control is disabled
+   */
+  public synchronized void addOrDeleteStorageGroup(int diff) {
+    int maxMemTableNum = CONFIG.getMaxMemtableNumber();
+    maxMemTableNum += MEMTABLE_NUM_FOR_EACH_PARTITION 
+        * CONFIG.getConcurrentWritingTimePartition() * diff;
+    CONFIG.setMaxMemtableNumber(maxMemTableNum);
+  }
+
+  private static class InstanceHolder {
+
+    private static final MemTableManager INSTANCE = new MemTableManager();
+
+    private InstanceHolder() {
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
index 593e0ce..cbf76bd 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
@@ -48,6 +48,7 @@ public class SystemInfo {
   private static final double REJECT_THERSHOLD = 
       config.getAllocateMemoryForWrite() * config.getRejectProportion();
 
+
   /**
    * Report current mem cost of storage group to system. Called when the memory of
    * storage group newly accumulates to IoTDBConfig.getStorageGroupSizeReportThreshold()