You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2020/12/03 03:24:09 UTC
[iotdb] 01/05: more one insert check out of the sg lock
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch mc_insert_block_11
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1ce8b79c8a9bd87911245069f6bf90bcf0851aa4
Author: HTHou <hh...@outlook.com>
AuthorDate: Tue Dec 1 16:46:44 2020 +0800
more one insert check out of the sg lock
---
.../engine/storagegroup/StorageGroupProcessor.java | 33 ++++++++++++++++++++--
.../db/engine/storagegroup/TsFileProcessor.java | 2 --
2 files changed, 31 insertions(+), 4 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 5cd831a..2c4e09f 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -40,6 +40,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.io.FileUtils;
@@ -79,6 +80,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryFileManager;
+import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.service.UpgradeSevice;
import org.apache.iotdb.db.utils.CopyOnReadLinkedList;
@@ -123,6 +125,8 @@ public class StorageGroupProcessor {
private static final String FAIL_TO_UPGRADE_FOLDER = "Failed to move {} to upgrade folder";
private static final Logger DEBUG_LOGGER = LoggerFactory.getLogger("QUERY_DEBUG");
+ private final int waitingTimeWhenInsertBlocked = config.getWaitingTimeWhenInsertBlocked();
+ private final int maxWaitingTimeWhenInsertBlocked = config.getMaxWaitingTimeWhenInsertBlocked();
/**
* All newly generated chunks after merge have version number 0, so we set merged Modification
* file version to 1 to take effect
@@ -658,6 +662,9 @@ public class StorageGroupProcessor {
if (!isAlive(insertRowPlan.getTime())) {
throw new OutOfTTLException(insertRowPlan.getTime(), (System.currentTimeMillis() - dataTTL));
}
+ if (enableMemControl) {
+ blockInsertionIfReject();
+ }
writeLock();
try {
// init map
@@ -692,10 +699,17 @@ public class StorageGroupProcessor {
*/
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public void insertTablet(InsertTabletPlan insertTabletPlan) throws BatchInsertionException {
+ TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()];
+ Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
+ if (enableMemControl) {
+ try {
+ blockInsertionIfReject();
+ } catch (WriteProcessException e) {
+ throw new BatchInsertionException(results);
+ }
+ }
writeLock();
try {
- TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()];
- Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
boolean noFailure = true;
/*
@@ -784,6 +798,21 @@ public class StorageGroupProcessor {
}
}
+ private void blockInsertionIfReject() throws WriteProcessException {
+ long startTime = System.currentTimeMillis();
+ while (SystemInfo.getInstance().isRejected()) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(waitingTimeWhenInsertBlocked);
+ if (System.currentTimeMillis() - startTime > maxWaitingTimeWhenInsertBlocked) {
+ throw new WriteProcessException("System rejected over " + maxWaitingTimeWhenInsertBlocked + "ms");
+ }
+ } catch (InterruptedException e) {
+ logger.error("Failed when waiting for getting memory for insertion ", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
/**
* @return whether the given time falls in ttl
*/
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 1007948..dd9b896 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -177,7 +177,6 @@ public class TsFileProcessor {
workMemTable = new PrimitiveMemTable(enableMemControl);
}
if (enableMemControl) {
- blockInsertionIfReject();
checkMemCostAndAddToTspInfo(insertRowPlan);
}
@@ -219,7 +218,6 @@ public class TsFileProcessor {
workMemTable = new PrimitiveMemTable(enableMemControl);
}
if (enableMemControl) {
- blockInsertionIfReject();
checkMemCostAndAddToTspInfo(insertTabletPlan, start, end);
}