You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2020/12/01 08:47:22 UTC
[iotdb] 01/01: more one insert check out of the sg lock
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch mc_insert_block
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 235aadd76f2e9f7c56dd0ef37d290f7645093c45
Author: HTHou <hh...@outlook.com>
AuthorDate: Tue Dec 1 16:46:44 2020 +0800
more one insert check out of the sg lock
---
.../engine/storagegroup/StorageGroupProcessor.java | 33 ++++++++++++++++++++--
.../db/engine/storagegroup/TsFileProcessor.java | 2 --
2 files changed, 31 insertions(+), 4 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 69aec66..2e52b1d 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -40,6 +40,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.io.FileUtils;
@@ -79,6 +80,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryFileManager;
+import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.service.UpgradeSevice;
import org.apache.iotdb.db.utils.CopyOnReadLinkedList;
@@ -124,6 +126,8 @@ public class StorageGroupProcessor {
private static final String FAIL_TO_UPGRADE_FOLDER = "Failed to move {} to upgrade folder";
private static final Logger DEBUG_LOGGER = LoggerFactory.getLogger("QUERY_DEBUG");
+ private final int waitingTimeWhenInsertBlocked = config.getWaitingTimeWhenInsertBlocked();
+ private final int maxWaitingTimeWhenInsertBlocked = config.getMaxWaitingTimeWhenInsertBlocked();
/**
* All newly generated chunks after merge have version number 0, so we set merged Modification
* file version to 1 to take effect
@@ -676,6 +680,9 @@ public class StorageGroupProcessor {
if (!isAlive(insertRowPlan.getTime())) {
throw new OutOfTTLException(insertRowPlan.getTime(), (System.currentTimeMillis() - dataTTL));
}
+ if (enableMemControl) {
+ blockInsertionIfReject();
+ }
writeLock();
try {
// init map
@@ -710,10 +717,17 @@ public class StorageGroupProcessor {
*/
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public void insertTablet(InsertTabletPlan insertTabletPlan) throws BatchInsertionException {
+ TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()];
+ Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
+ if (enableMemControl) {
+ try {
+ blockInsertionIfReject();
+ } catch (WriteProcessException e) {
+ throw new BatchInsertionException(results);
+ }
+ }
writeLock();
try {
- TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()];
- Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
boolean noFailure = true;
/*
@@ -802,6 +816,21 @@ public class StorageGroupProcessor {
}
}
+ private void blockInsertionIfReject() throws WriteProcessException {
+ long startTime = System.currentTimeMillis();
+ while (SystemInfo.getInstance().isRejected()) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(waitingTimeWhenInsertBlocked);
+ if (System.currentTimeMillis() - startTime > maxWaitingTimeWhenInsertBlocked) {
+ throw new WriteProcessException("System rejected over " + maxWaitingTimeWhenInsertBlocked + "ms");
+ }
+ } catch (InterruptedException e) {
+ logger.error("Failed when waiting for getting memory for insertion ", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
/**
* @return whether the given time falls in ttl
*/
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 4ed4d31..f0a6910 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -177,7 +177,6 @@ public class TsFileProcessor {
workMemTable = new PrimitiveMemTable(enableMemControl);
}
if (enableMemControl) {
- blockInsertionIfReject();
checkMemCostAndAddToTspInfo(insertRowPlan);
}
@@ -220,7 +219,6 @@ public class TsFileProcessor {
workMemTable = new PrimitiveMemTable(enableMemControl);
}
if (enableMemControl) {
- blockInsertionIfReject();
checkMemCostAndAddToTspInfo(insertTabletPlan, start, end);
}