You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/12/27 06:00:57 UTC
[iotdb] branch rel/0.11 updated: [To rel/0.11] [IOTDB-1069]
restrict the flushing memtable number to avoid OOM when mem_control is
disabled (#2317)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.11 by this push:
new 5afe292 [To rel/0.11] [IOTDB-1069] restrict the flushing memtable number to avoid OOM when mem_control is disabled (#2317)
5afe292 is described below
commit 5afe2920d32a42b1adfd98161127e495f08d963d
Author: Haonan <hh...@outlook.com>
AuthorDate: Sun Dec 27 14:00:34 2020 +0800
[To rel/0.11] [IOTDB-1069] restrict the flushing memtable number to avoid OOM when mem_control is disabled (#2317)
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 13 +++
.../engine/storagegroup/StorageGroupProcessor.java | 2 +-
.../db/engine/storagegroup/TsFileProcessor.java | 19 +++-
.../org/apache/iotdb/db/metadata/MManager.java | 8 ++
.../apache/iotdb/db/rescon/MemTableManager.java | 110 +++++++++++++++++++++
.../org/apache/iotdb/db/rescon/SystemInfo.java | 1 +
6 files changed, 150 insertions(+), 3 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 5582a96..8c5b7e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -252,6 +252,11 @@ public class IoTDBConfig {
private String walDir = DEFAULT_BASE_DIR + File.separator + "wal";
/**
+ * Maximum MemTable number. Invalid when enableMemControl is true.
+ */
+ private int maxMemtableNumber = 0;
+
+ /**
* The amount of data iterate each time in server
*/
private int batchSize = 100000;
@@ -1053,6 +1058,14 @@ public class IoTDBConfig {
this.batchSize = batchSize;
}
+ public int getMaxMemtableNumber() {
+ return maxMemtableNumber;
+ }
+
+ public void setMaxMemtableNumber(int maxMemtableNumber) {
+ this.maxMemtableNumber = maxMemtableNumber;
+ }
+
public int getConcurrentFlushThread() {
return concurrentFlushThread;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index bfe409d..2bb4cef 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -836,7 +836,7 @@ public class StorageGroupProcessor {
try {
tsFileProcessor.insertTablet(insertTabletPlan, start, end, results);
} catch (WriteProcessRejectException e) {
- logger.warn("insert to TsFileProcessor rejected ", e);
+ logger.warn("insert to TsFileProcessor rejected, {}", e.getMessage());
return false;
} catch (WriteProcessException e) {
logger.error("insert to TsFileProcessor error ", e);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index c858cde..845cf7a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -57,6 +57,7 @@ import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.rescon.MemTableManager;
import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.db.utils.MemUtils;
@@ -173,8 +174,15 @@ public class TsFileProcessor {
public void insert(InsertRowPlan insertRowPlan) throws WriteProcessException {
if (workMemTable == null) {
- workMemTable = new PrimitiveMemTable(enableMemControl);
+ if (enableMemControl) {
+ workMemTable = new PrimitiveMemTable(enableMemControl);
+ MemTableManager.getInstance().addMemtableNumber();
+ }
+ else {
+ workMemTable = MemTableManager.getInstance().getAvailableMemTable(tsFileResource);
+ }
}
+
if (enableMemControl) {
checkMemCostAndAddToTspInfo(insertRowPlan);
}
@@ -214,7 +222,13 @@ public class TsFileProcessor {
TSStatus[] results) throws WriteProcessException {
if (workMemTable == null) {
- workMemTable = new PrimitiveMemTable(enableMemControl);
+ if (enableMemControl) {
+ workMemTable = new PrimitiveMemTable(enableMemControl);
+ MemTableManager.getInstance().addMemtableNumber();
+ }
+ else {
+ workMemTable = MemTableManager.getInstance().getAvailableMemTable(tsFileResource);
+ }
}
try {
@@ -659,6 +673,7 @@ public class TsFileProcessor {
memTable.isSignalMemTable(), flushingMemTables.size());
}
memTable.release();
+ MemTableManager.getInstance().decreaseMemtableNumber();
if (enableMemControl) {
// reset the mem cost in StorageGroupProcessorInfo
storageGroupInfo.releaseStorageGroupMemCost(memTable.getTVListsRamCost());
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 44d2bd2..84bc60b 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -72,6 +72,7 @@ import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
+import org.apache.iotdb.db.rescon.MemTableManager;
import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
import org.apache.iotdb.db.utils.RandomDeleteCache;
import org.apache.iotdb.db.utils.SchemaUtils;
@@ -587,6 +588,9 @@ public class MManager {
public void setStorageGroup(PartialPath storageGroup) throws MetadataException {
try {
mtree.setStorageGroup(storageGroup);
+ if (!config.isEnableMemControl()) {
+ MemTableManager.getInstance().addOrDeleteStorageGroup(1);
+ }
if (!isRecovering) {
logWriter.setStorageGroup(storageGroup.getFullPath());
}
@@ -620,6 +624,10 @@ public class MManager {
updateSchemaDataTypeNumMap(leafMNode.getSchema().getType(), -1);
}
+ if (!config.isEnableMemControl()) {
+ MemTableManager.getInstance().addOrDeleteStorageGroup(-1);
+ }
+
// if success
if (!isRecovering) {
logWriter.deleteStorageGroup(storageGroup.getFullPath());
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/MemTableManager.java b/server/src/main/java/org/apache/iotdb/db/rescon/MemTableManager.java
new file mode 100644
index 0000000..5244ad8
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/MemTableManager.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.rescon;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+public class MemTableManager {
+
+ private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
+
+ private static final Logger logger = LoggerFactory.getLogger(MemTableManager.class);
+
+ private static final int WAIT_TIME = 100;
+ public static final int MEMTABLE_NUM_FOR_EACH_PARTITION = 4;
+ private int currentMemtableNumber = 0;
+
+ private MemTableManager() {
+ }
+
+ public static MemTableManager getInstance() {
+ return InstanceHolder.INSTANCE;
+ }
+
+ /**
+ * Called when memory control is disabled
+ */
+ public synchronized IMemTable getAvailableMemTable(TsFileResource tsFileResource) {
+ if (!reachMaxMemtableNumber()) {
+ currentMemtableNumber++;
+ return new PrimitiveMemTable();
+ } else {
+ // wait until the total memtable number is less than the capacity of the system
+ int waitCount = 1;
+ while (true) {
+ if (!reachMaxMemtableNumber()) {
+ currentMemtableNumber++;
+ return new PrimitiveMemTable();
+ }
+ try {
+ TimeUnit.MILLISECONDS.sleep(WAIT_TIME);
+ } catch (InterruptedException e) {
+ logger.error("{} fails to wait for memtables {}, continue to wait", tsFileResource, e);
+ Thread.currentThread().interrupt();
+ }
+ logger.info("{} has waited for a memtable for {}ms", tsFileResource, waitCount++ * 100);
+ }
+ }
+ }
+
+ public synchronized int getCurrentMemtableNumber() {
+ return currentMemtableNumber;
+ }
+
+ public synchronized void addMemtableNumber() {
+ currentMemtableNumber++;
+ }
+
+ public synchronized void decreaseMemtableNumber() {
+ currentMemtableNumber--;
+ }
+
+ /**
+ * Called when memory control is disabled
+ */
+ private boolean reachMaxMemtableNumber() {
+ return currentMemtableNumber >= CONFIG.getMaxMemtableNumber();
+ }
+
+ /**
+ * Called when memory control is disabled
+ */
+ public synchronized void addOrDeleteStorageGroup(int diff) {
+ int maxMemTableNum = CONFIG.getMaxMemtableNumber();
+ maxMemTableNum += MEMTABLE_NUM_FOR_EACH_PARTITION
+ * CONFIG.getConcurrentWritingTimePartition() * diff;
+ CONFIG.setMaxMemtableNumber(maxMemTableNum);
+ }
+
+ private static class InstanceHolder {
+
+ private static final MemTableManager INSTANCE = new MemTableManager();
+
+ private InstanceHolder() {
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
index 593e0ce..cbf76bd 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
@@ -48,6 +48,7 @@ public class SystemInfo {
private static final double REJECT_THERSHOLD =
config.getAllocateMemoryForWrite() * config.getRejectProportion();
+
/**
* Report current mem cost of storage group to system. Called when the memory of
* storage group newly accumulates to IoTDBConfig.getStorageGroupSizeReportThreshold()