You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/12/03 12:10:09 UTC
[iotdb] branch rel/0.11 updated: [To rel/0.11] change mem control
lock usage (#2114)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.11 by this push:
new 75e4ddf [To rel/0.11] change mem control lock usage (#2114)
75e4ddf is described below
commit 75e4ddfcdb7c1794e2a46e57676a0bf4819c9324
Author: Haonan <hh...@outlook.com>
AuthorDate: Thu Dec 3 20:09:58 2020 +0800
[To rel/0.11] change mem control lock usage (#2114)
---
.../resources/conf/iotdb-engine.properties | 7 ++++
.../org/apache/iotdb/db/engine/StorageEngine.java | 5 +--
.../engine/storagegroup/StorageGroupProcessor.java | 9 ++++--
.../db/engine/storagegroup/TsFileProcessor.java | 13 ++++++--
.../db/exception/WriteProcessRejectException.java | 35 ++++++++++++++++++++
.../org/apache/iotdb/db/rescon/SystemInfo.java | 37 +++++++++++-----------
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 +
7 files changed, 81 insertions(+), 26 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 9774a3c..0d0fffd 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -174,6 +174,7 @@ timestamp_precision=ms
wal_buffer_size=16777216
# When a TsFile's file size (in byte) exceeds this, the TsFile is forced closed.
+# It may cause memTable size smaller if it is a large value
tsfile_size_threshold=1
# When a memTable's size (in byte) exceeds this, the memtable is flushed to disk. The default threshold is 256 MB.
@@ -224,18 +225,22 @@ enable_mem_control=true
# Memory Allocation Ratio: Write, Read, Schema and Free Memory.
# The parameter form is a:b:c:d, where a, b, c and d are integers. for example: 1:1:1:1 , 6:2:1:1
+# If you have high level of writing pressure and low level of reading pressure, please adjust it to for example 6:1:1:2
write_read_schema_free_memory_proportion=4:3:1:2
# primitive array size (length of each array) in array pool
primitive_array_size=128
# Ratio of write memory for invoking flush disk, 0.4 by default
+# If you have extremely high write load (like batch=1000), it can be set lower than the default value like 0.2
flush_proportion=0.4
# Ratio of write memory allocated for buffered arrays, 0.6 by default
buffered_arrays_memory_proportion=0.6
# Ratio of write memory for rejecting insertion, 0.8 by default
+# If you have extremely high write load (like batch=1000) and the physical memory size is large enough,
+# it can be set higher than the default value like 0.9
reject_proportion=0.8
# If memory (in byte) of storage group increased more than this threshold, report to system. The default value is 16MB
@@ -246,9 +251,11 @@ storage_group_report_threshold=16777216
max_deduplicated_path_num=1000
# When an inserting is rejected, waiting time (in ms) to check the status again, 50 by default.
+# If the insertion has been rejected and the read load is low, it can be set larger
check_period_when_insert_blocked=50
# When the waiting time (in ms) of an inserting exceeds this, throw an exception. 10000 by default.
+# If the insertion has been rejected and the read load is low, it can be set larger
max_waiting_time_when_insert_blocked=10000
# estimated metadata size (in byte) of one timeseries in Mtree
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 7aeedd4..6f796b0 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -61,6 +61,7 @@ import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.StorageGroupProcessorException;
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.exception.WriteProcessException;
+import org.apache.iotdb.db.exception.WriteProcessRejectException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
@@ -830,13 +831,13 @@ public class StorageEngine implements IService {
/**
* block insertion if the insertion is rejected by memory control
*/
- public static void blockInsertionIfReject() throws WriteProcessException {
+ public static void blockInsertionIfReject() throws WriteProcessRejectException {
long startTime = System.currentTimeMillis();
while (SystemInfo.getInstance().isRejected()) {
try {
TimeUnit.MILLISECONDS.sleep(config.getCheckPeriodWhenInsertBlocked());
if (System.currentTimeMillis() - startTime > config.getMaxWaitingTimeWhenInsertBlocked()) {
- throw new WriteProcessException("System rejected over " + config.getMaxWaitingTimeWhenInsertBlocked() +
+ throw new WriteProcessRejectException("System rejected over " + config.getMaxWaitingTimeWhenInsertBlocked() +
"ms");
}
} catch (InterruptedException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 8702b3b..bfe409d 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -68,6 +68,7 @@ import org.apache.iotdb.db.exception.LoadFileException;
import org.apache.iotdb.db.exception.StorageGroupProcessorException;
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.exception.WriteProcessException;
+import org.apache.iotdb.db.exception.WriteProcessRejectException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.OutOfTTLException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -699,9 +700,9 @@ public class StorageGroupProcessor {
if (enableMemControl) {
try {
StorageEngine.blockInsertionIfReject();
- } catch (WriteProcessException e) {
+ } catch (WriteProcessRejectException e) {
TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()];
- Arrays.fill(results, RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
+ Arrays.fill(results, RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT));
throw new BatchInsertionException(results);
}
}
@@ -834,6 +835,9 @@ public class StorageGroupProcessor {
try {
tsFileProcessor.insertTablet(insertTabletPlan, start, end, results);
+ } catch (WriteProcessRejectException e) {
+ logger.warn("insert to TsFileProcessor rejected ", e);
+ return false;
} catch (WriteProcessException e) {
logger.error("insert to TsFileProcessor error ", e);
return false;
@@ -888,7 +892,6 @@ public class StorageGroupProcessor {
return;
}
- // insert TsFileProcessor
tsFileProcessor.insert(insertRowPlan);
// try to update the latest time of the device of this tsRecord
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 392e72d..c858cde 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -48,6 +48,7 @@ import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.UpdateEndTimeCallBack;
import org.apache.iotdb.db.engine.version.VersionController;
+import org.apache.iotdb.db.exception.WriteProcessRejectException;
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -220,6 +221,13 @@ public class TsFileProcessor {
if (enableMemControl) {
checkMemCostAndAddToTspInfo(insertTabletPlan, start, end);
}
+ } catch (WriteProcessException e) {
+ for (int i = start; i < end; i++) {
+ results[i] = RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT, e.getMessage());
+ }
+ throw new WriteProcessException(e);
+ }
+ try {
workMemTable.insertTablet(insertTabletPlan, start, end);
if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
insertTabletPlan.setStart(start);
@@ -248,7 +256,8 @@ public class TsFileProcessor {
tsFileResource.updatePlanIndexes(insertTabletPlan.getIndex());
}
- private void checkMemCostAndAddToTspInfo(InsertRowPlan insertRowPlan) throws WriteProcessException {
+ private void checkMemCostAndAddToTspInfo(InsertRowPlan insertRowPlan)
+ throws WriteProcessException {
// memory of increased PrimitiveArray and TEXT values, e.g., add a long[128], add 128*8
long memTableIncrement = 0L;
long textDataIncrement = 0L;
@@ -342,7 +351,7 @@ public class TsFileProcessor {
SystemInfo.getInstance().reportStorageGroupStatus(storageGroupInfo);
try {
StorageEngine.blockInsertionIfReject();
- } catch (WriteProcessException e) {
+ } catch (WriteProcessRejectException e) {
storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement);
tsFileProcessorInfo.releaseTSPMemCost(unsealedResourceIncrement + chunkMetadataIncrement);
SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo, false);
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/WriteProcessRejectException.java b/server/src/main/java/org/apache/iotdb/db/exception/WriteProcessRejectException.java
new file mode 100644
index 0000000..90b9e58
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/exception/WriteProcessRejectException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.exception;
+
+import org.apache.iotdb.rpc.TSStatusCode;
+
+public class WriteProcessRejectException extends WriteProcessException {
+
+ private static final long serialVersionUID = -4217324287547595610L;
+
+ public WriteProcessRejectException(String message) {
+ super(message, TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode());
+ }
+
+ public WriteProcessRejectException(String message, int errorCode) {
+ super(message, errorCode);
+ }
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
index 74c00f0..593e0ce 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
@@ -20,11 +20,10 @@
package org.apache.iotdb.db.rescon;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -39,13 +38,15 @@ public class SystemInfo {
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final Logger logger = LoggerFactory.getLogger(SystemInfo.class);
- private AtomicLong totalSgMemCost = new AtomicLong();
+ private long totalSgMemCost = 0L;
private volatile boolean rejected = false;
- private Map<StorageGroupInfo, Long> reportedSgMemCostMap = new ConcurrentHashMap<>();
+ private Map<StorageGroupInfo, Long> reportedSgMemCostMap = new HashMap<>();
- private static final double FLUSH_PROPORTION = config.getFlushProportion();
- private static final double REJECT_PROPORTION = config.getRejectProportion();
+ private static final double FLUSH_THERSHOLD =
+ config.getAllocateMemoryForWrite() * config.getFlushProportion();
+ private static final double REJECT_THERSHOLD =
+ config.getAllocateMemoryForWrite() * config.getRejectProportion();
/**
* Report current mem cost of storage group to system. Called when the memory of
@@ -53,22 +54,22 @@ public class SystemInfo {
*
* @param storageGroupInfo storage group
*/
- public void reportStorageGroupStatus(StorageGroupInfo storageGroupInfo) {
+ public synchronized void reportStorageGroupStatus(StorageGroupInfo storageGroupInfo) {
long delta = storageGroupInfo.getMemCost() -
reportedSgMemCostMap.getOrDefault(storageGroupInfo, 0L);
- totalSgMemCost.addAndGet(delta);
+ totalSgMemCost += delta;
if (logger.isDebugEnabled()) {
logger.debug("Report Storage Group Status to the system. "
+ "After adding {}, current sg mem cost is {}.", delta, totalSgMemCost);
}
reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getMemCost());
storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost());
- if (totalSgMemCost.get() >= config.getAllocateMemoryForWrite() * FLUSH_PROPORTION) {
+ if (totalSgMemCost >= FLUSH_THERSHOLD) {
logger.debug("The total storage group mem costs are too large, call for flushing. "
+ "Current sg cost is {}", totalSgMemCost);
chooseTSPToMarkFlush();
}
- if (totalSgMemCost.get() >= config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {
+ if (totalSgMemCost >= REJECT_THERSHOLD) {
logger.info("Change system to reject status...");
rejected = true;
}
@@ -80,11 +81,11 @@ public class SystemInfo {
*
* @param storageGroupInfo storage group
*/
- public void resetStorageGroupStatus(StorageGroupInfo storageGroupInfo,
+ public synchronized void resetStorageGroupStatus(StorageGroupInfo storageGroupInfo,
boolean shouldInvokeFlush) {
if (reportedSgMemCostMap.containsKey(storageGroupInfo)) {
- this.totalSgMemCost.addAndGet(storageGroupInfo.getMemCost() -
- reportedSgMemCostMap.get(storageGroupInfo));
+ this.totalSgMemCost -= (reportedSgMemCostMap.get(storageGroupInfo) -
+ storageGroupInfo.getMemCost());
storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost());
reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getMemCost());
if (shouldInvokeFlush) {
@@ -94,8 +95,7 @@ public class SystemInfo {
}
private void checkSystemToInvokeFlush() {
- if (totalSgMemCost.get() >= config.getAllocateMemoryForWrite() * FLUSH_PROPORTION &&
- totalSgMemCost.get() < config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {
+ if (totalSgMemCost >= FLUSH_THERSHOLD && totalSgMemCost < REJECT_THERSHOLD) {
logger.debug("Some sg memory released but still exceeding flush proportion, call flush.");
if (rejected) {
logger.info("Some sg memory released, set system to normal status.");
@@ -104,7 +104,7 @@ public class SystemInfo {
rejected = false;
forceAsyncFlush();
}
- else if (totalSgMemCost.get() >= config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {
+ else if (totalSgMemCost >= REJECT_THERSHOLD) {
logger.warn("Some sg memory released, but system is still in reject status.");
logCurrentTotalSGMemory();
rejected = true;
@@ -169,8 +169,7 @@ public class SystemInfo {
}
List<TsFileProcessor> processors = new ArrayList<>();
long memCost = 0;
- while (totalSgMemCost.get() - memCost > config.getAllocateMemoryForWrite() *
- FLUSH_PROPORTION / 2) {
+ while (totalSgMemCost - memCost > FLUSH_THERSHOLD / 2) {
if (tsps.isEmpty() || tsps.peek().getWorkMemTableRamCost() == 0) {
return processors;
}
@@ -187,7 +186,7 @@ public class SystemInfo {
public void close() {
reportedSgMemCostMap.clear();
- totalSgMemCost.set(0);
+ totalSgMemCost = 0;
rejected = false;
}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 3490e51..6e57937 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -59,6 +59,7 @@ public enum TSStatusCode {
PATH_ERROR(410),
QUERY_PROCESS_ERROR(411),
WRITE_PROCESS_ERROR(412),
+ WRITE_PROCESS_REJECT(413),
INTERNAL_SERVER_ERROR(500),
CLOSE_OPERATION_ERROR(501),