You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/12/03 08:44:44 UTC
[iotdb] branch rel/0.11 updated: [To rel/0.11] [Mem control] Move
one of the insert check out from the sg lock (#2173)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.11 by this push:
new b5eb03b [To rel/0.11] [Mem control] Move one of the insert check out from the sg lock (#2173)
b5eb03b is described below
commit b5eb03b4d75b02873a0a62267b770e99055e514c
Author: Haonan <hh...@outlook.com>
AuthorDate: Thu Dec 3 16:44:33 2020 +0800
[To rel/0.11] [Mem control] Move one of the insert check out from the sg lock (#2173)
---
.../resources/conf/iotdb-engine.properties | 8 ++--
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 14 +++---
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 6 +--
.../org/apache/iotdb/db/engine/StorageEngine.java | 20 ++++++++
.../engine/storagegroup/StorageGroupProcessor.java | 15 +++++-
.../db/engine/storagegroup/TsFileProcessor.java | 53 ++++++----------------
6 files changed, 61 insertions(+), 55 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 2079a57..9774a3c 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -245,11 +245,11 @@ storage_group_report_threshold=16777216
# it's just an advised value, the real limitation will be the smaller one between this and the one we calculated
max_deduplicated_path_num=1000
-# When an inserting is rejected, waiting time (in ms) to check system again, 0 by default.
-waiting_time_when_insert_blocked=0
+# When an inserting is rejected, waiting time (in ms) to check the status again, 50 by default.
+check_period_when_insert_blocked=50
-# When the waiting time (in ms) of an inserting exceeds this, throw an exception. 0 by default.
-max_waiting_time_when_insert_blocked=0
+# When the waiting time (in ms) of an inserting exceeds this, throw an exception. 10000 by default.
+max_waiting_time_when_insert_blocked=10000
# estimated metadata size (in byte) of one timeseries in Mtree
estimated_series_size=300
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 59e1ae2..19ccf5d 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -162,14 +162,14 @@ public class IoTDBConfig {
private long storageGroupSizeReportThreshold = 16 * 1024 * 1024L;
/**
- * When inserting rejected, waiting this time to check system again
+ * When inserting rejected, waiting period to check system again
*/
- private int waitingTimeWhenInsertBlockedInMs = 0;
+ private int checkPeriodWhenInsertBlocked = 50;
/**
* When inserting rejected exceeds this, throw an exception
*/
- private int maxWaitingTimeWhenInsertBlockedInMs = 0;
+ private int maxWaitingTimeWhenInsertBlockedInMs = 10000;
/**
* Is the write ahead log enable.
*/
@@ -2006,12 +2006,12 @@ public class IoTDBConfig {
this.maxQueryDeduplicatedPathNum = maxQueryDeduplicatedPathNum;
}
- public int getWaitingTimeWhenInsertBlocked() {
- return waitingTimeWhenInsertBlockedInMs;
+ public int getCheckPeriodWhenInsertBlocked() {
+ return checkPeriodWhenInsertBlocked;
}
- public void setWaitingTimeWhenInsertBlocked(int waitingTimeWhenInsertBlocked) {
- this.waitingTimeWhenInsertBlockedInMs = waitingTimeWhenInsertBlocked;
+ public void setCheckPeriodWhenInsertBlocked(int checkPeriodWhenInsertBlocked) {
+ this.checkPeriodWhenInsertBlocked = checkPeriodWhenInsertBlocked;
}
public int getMaxWaitingTimeWhenInsertBlocked() {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index d7a7058..dc95ef1 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -283,9 +283,9 @@ public class IoTDBDescriptor {
.getProperty("avg_series_point_number_threshold",
Integer.toString(conf.getAvgSeriesPointNumberThreshold()))));
- conf.setWaitingTimeWhenInsertBlocked(Integer.parseInt(properties
- .getProperty("waiting_time_when_insert_blocked",
- Integer.toString(conf.getWaitingTimeWhenInsertBlocked()))));
+ conf.setCheckPeriodWhenInsertBlocked(Integer.parseInt(properties
+ .getProperty("check_period_when_insert_blocked",
+ Integer.toString(conf.getCheckPeriodWhenInsertBlocked()))));
conf.setMaxWaitingTimeWhenInsertBlocked(Integer.parseInt(properties
.getProperty("max_waiting_time_when_insert_blocked",
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index d7d1df6..7aeedd4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -72,6 +72,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryFileManager;
+import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.db.service.IService;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.service.ServiceType;
@@ -91,6 +92,7 @@ public class StorageEngine implements IService {
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final long TTL_CHECK_INTERVAL = 60 * 1000L;
+
/**
* a folder (system/storage_groups/ by default) that persist system info. Each Storage Processor
* will have a subfolder under the systemDir.
@@ -824,4 +826,22 @@ public class StorageEngine implements IService {
public void mergeUnLock(List<StorageGroupProcessor> list) {
list.forEach(storageGroupProcessor -> storageGroupProcessor.getTsFileManagement().readUnLock());
}
+
+ /**
+ * block insertion if the insertion is rejected by memory control
+ */
+ public static void blockInsertionIfReject() throws WriteProcessException {
+ long startTime = System.currentTimeMillis();
+ while (SystemInfo.getInstance().isRejected()) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(config.getCheckPeriodWhenInsertBlocked());
+ if (System.currentTimeMillis() - startTime > config.getMaxWaitingTimeWhenInsertBlocked()) {
+ throw new WriteProcessException("System rejected over " + config.getMaxWaitingTimeWhenInsertBlocked() +
+ "ms");
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 5cd831a..8702b3b 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -123,6 +123,8 @@ public class StorageGroupProcessor {
private static final String FAIL_TO_UPGRADE_FOLDER = "Failed to move {} to upgrade folder";
private static final Logger DEBUG_LOGGER = LoggerFactory.getLogger("QUERY_DEBUG");
+ private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
/**
* All newly generated chunks after merge have version number 0, so we set merged Modification
* file version to 1 to take effect
@@ -131,7 +133,6 @@ public class StorageGroupProcessor {
private static final Logger logger = LoggerFactory.getLogger(StorageGroupProcessor.class);
- private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private final boolean enableMemControl = config.isEnableMemControl();
/**
* indicating the file to be loaded already exists locally.
@@ -658,6 +659,9 @@ public class StorageGroupProcessor {
if (!isAlive(insertRowPlan.getTime())) {
throw new OutOfTTLException(insertRowPlan.getTime(), (System.currentTimeMillis() - dataTTL));
}
+ if (enableMemControl) {
+ StorageEngine.blockInsertionIfReject();
+ }
writeLock();
try {
// init map
@@ -692,6 +696,15 @@ public class StorageGroupProcessor {
*/
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public void insertTablet(InsertTabletPlan insertTabletPlan) throws BatchInsertionException {
+ if (enableMemControl) {
+ try {
+ StorageEngine.blockInsertionIfReject();
+ } catch (WriteProcessException e) {
+ TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()];
+ Arrays.fill(results, RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
+ throw new BatchInsertionException(results);
+ }
+ }
writeLock();
try {
TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()];
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 1007948..392e72d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -27,7 +27,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -35,6 +34,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.adapter.CompressionRatio;
+import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.flush.CloseFileListener;
import org.apache.iotdb.db.engine.flush.FlushListener;
import org.apache.iotdb.db.engine.flush.FlushManager;
@@ -83,8 +83,6 @@ public class TsFileProcessor {
private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private final boolean enableMemControl = config.isEnableMemControl();
- private final int waitingTimeWhenInsertBlocked = config.getWaitingTimeWhenInsertBlocked();
- private final int maxWaitingTimeWhenInsertBlocked = config.getMaxWaitingTimeWhenInsertBlocked();
private StorageGroupInfo storageGroupInfo;
private TsFileProcessorInfo tsFileProcessorInfo;
@@ -177,7 +175,6 @@ public class TsFileProcessor {
workMemTable = new PrimitiveMemTable(enableMemControl);
}
if (enableMemControl) {
- blockInsertionIfReject();
checkMemCostAndAddToTspInfo(insertRowPlan);
}
@@ -218,12 +215,11 @@ public class TsFileProcessor {
if (workMemTable == null) {
workMemTable = new PrimitiveMemTable(enableMemControl);
}
- if (enableMemControl) {
- blockInsertionIfReject();
- checkMemCostAndAddToTspInfo(insertTabletPlan, start, end);
- }
try {
+ if (enableMemControl) {
+ checkMemCostAndAddToTspInfo(insertTabletPlan, start, end);
+ }
workMemTable.insertTablet(insertTabletPlan, start, end);
if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
insertTabletPlan.setStart(start);
@@ -283,22 +279,8 @@ public class TsFileProcessor {
textDataIncrement += MemUtils.getBinarySize((Binary) insertRowPlan.getValues()[i]);
}
}
- memTableIncrement += textDataIncrement;
- storageGroupInfo.addStorageGroupMemCost(memTableIncrement);
- tsFileProcessorInfo.addTSPMemCost(unsealedResourceIncrement + chunkMetadataIncrement);
- if (storageGroupInfo.needToReportToSystem()) {
- SystemInfo.getInstance().reportStorageGroupStatus(storageGroupInfo);
- try {
- blockInsertionIfReject();
- } catch (WriteProcessException e) {
- storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement);
- tsFileProcessorInfo.releaseTSPMemCost(unsealedResourceIncrement + chunkMetadataIncrement);
- SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo, false);
- throw e;
- }
- }
- workMemTable.addTVListRamCost(memTableIncrement);
- workMemTable.addTextDataSize(textDataIncrement);
+ updateMemoryInfo(memTableIncrement, unsealedResourceIncrement,
+ chunkMetadataIncrement, textDataIncrement);
}
private void checkMemCostAndAddToTspInfo(InsertTabletPlan insertTabletPlan, int start, int end)
@@ -347,13 +329,19 @@ public class TsFileProcessor {
textDataIncrement += MemUtils.getBinaryColumnSize(column, start, end);
}
}
+ updateMemoryInfo(memTableIncrement, unsealedResourceIncrement,
+ chunkMetadataIncrement, textDataIncrement);
+ }
+
+ private void updateMemoryInfo(long memTableIncrement, long unsealedResourceIncrement,
+ long chunkMetadataIncrement, long textDataIncrement) throws WriteProcessException {
memTableIncrement += textDataIncrement;
storageGroupInfo.addStorageGroupMemCost(memTableIncrement);
tsFileProcessorInfo.addTSPMemCost(unsealedResourceIncrement + chunkMetadataIncrement);
if (storageGroupInfo.needToReportToSystem()) {
SystemInfo.getInstance().reportStorageGroupStatus(storageGroupInfo);
try {
- blockInsertionIfReject();
+ StorageEngine.blockInsertionIfReject();
} catch (WriteProcessException e) {
storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement);
tsFileProcessorInfo.releaseTSPMemCost(unsealedResourceIncrement + chunkMetadataIncrement);
@@ -365,21 +353,6 @@ public class TsFileProcessor {
workMemTable.addTextDataSize(textDataIncrement);
}
- private void blockInsertionIfReject() throws WriteProcessException {
- long startTime = System.currentTimeMillis();
- while (SystemInfo.getInstance().isRejected()) {
- try {
- TimeUnit.MILLISECONDS.sleep(waitingTimeWhenInsertBlocked);
- if (System.currentTimeMillis() - startTime > maxWaitingTimeWhenInsertBlocked) {
- throw new WriteProcessException("System rejected over " + maxWaitingTimeWhenInsertBlocked + "ms");
- }
- } catch (InterruptedException e) {
- logger.error("Failed when waiting for getting memory for insertion ", e);
- Thread.currentThread().interrupt();
- }
- }
- }
-
/**
* Delete data which belongs to the timeseries `deviceId.measurementId` and the timestamp of which
* <= 'timestamp' in the deletion. <br/>