You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2020/12/01 08:47:22 UTC

[iotdb] 01/01: more one insert check out of the sg lock

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch mc_insert_block
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 235aadd76f2e9f7c56dd0ef37d290f7645093c45
Author: HTHou <hh...@outlook.com>
AuthorDate: Tue Dec 1 16:46:44 2020 +0800

    more one insert check out of the sg lock
---
 .../engine/storagegroup/StorageGroupProcessor.java | 33 ++++++++++++++++++++--
 .../db/engine/storagegroup/TsFileProcessor.java    |  2 --
 2 files changed, 31 insertions(+), 4 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 69aec66..2e52b1d 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -40,6 +40,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.commons.io.FileUtils;
@@ -79,6 +80,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryFileManager;
+import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.service.UpgradeSevice;
 import org.apache.iotdb.db.utils.CopyOnReadLinkedList;
@@ -124,6 +126,8 @@ public class StorageGroupProcessor {
   private static final String FAIL_TO_UPGRADE_FOLDER = "Failed to move {} to upgrade folder";
   private static final Logger DEBUG_LOGGER = LoggerFactory.getLogger("QUERY_DEBUG");
 
+  private final int waitingTimeWhenInsertBlocked = config.getWaitingTimeWhenInsertBlocked();
+  private final int maxWaitingTimeWhenInsertBlocked = config.getMaxWaitingTimeWhenInsertBlocked();
   /**
    * All newly generated chunks after merge have version number 0, so we set merged Modification
    * file version to 1 to take effect
@@ -676,6 +680,9 @@ public class StorageGroupProcessor {
     if (!isAlive(insertRowPlan.getTime())) {
       throw new OutOfTTLException(insertRowPlan.getTime(), (System.currentTimeMillis() - dataTTL));
     }
+    if (enableMemControl) {
+      blockInsertionIfReject();
+    }
     writeLock();
     try {
       // init map
@@ -710,10 +717,17 @@ public class StorageGroupProcessor {
    */
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   public void insertTablet(InsertTabletPlan insertTabletPlan) throws BatchInsertionException {
+    TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()];
+    Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
+    if (enableMemControl) {
+      try {
+        blockInsertionIfReject();
+      } catch (WriteProcessException e) {
+        throw new BatchInsertionException(results);
+      }
+    }
     writeLock();
     try {
-      TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()];
-      Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
       boolean noFailure = true;
 
       /*
@@ -802,6 +816,21 @@ public class StorageGroupProcessor {
     }
   }
 
+  private void blockInsertionIfReject() throws WriteProcessException {
+    long startTime = System.currentTimeMillis();
+    while (SystemInfo.getInstance().isRejected()) {
+      try {
+        TimeUnit.MILLISECONDS.sleep(waitingTimeWhenInsertBlocked);
+        if (System.currentTimeMillis() - startTime > maxWaitingTimeWhenInsertBlocked) {
+          throw new WriteProcessException("System rejected over " + maxWaitingTimeWhenInsertBlocked + "ms");
+        }
+      } catch (InterruptedException e) {
+        logger.error("Failed when waiting for getting memory for insertion ", e);
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
   /**
    * @return whether the given time falls in ttl
    */
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 4ed4d31..f0a6910 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -177,7 +177,6 @@ public class TsFileProcessor {
       workMemTable = new PrimitiveMemTable(enableMemControl);
     }
     if (enableMemControl) {
-      blockInsertionIfReject();
       checkMemCostAndAddToTspInfo(insertRowPlan);
     }
 
@@ -220,7 +219,6 @@ public class TsFileProcessor {
       workMemTable = new PrimitiveMemTable(enableMemControl);
     }
     if (enableMemControl) {
-      blockInsertionIfReject();
       checkMemCostAndAddToTspInfo(insertTabletPlan, start, end);
     }