You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/12/03 08:44:44 UTC

[iotdb] branch rel/0.11 updated: [To rel/0.11] [Mem control] Move one of the insert check out from the sg lock (#2173)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.11 by this push:
     new b5eb03b  [To rel/0.11] [Mem control] Move one of the insert check out from the sg lock (#2173)
b5eb03b is described below

commit b5eb03b4d75b02873a0a62267b770e99055e514c
Author: Haonan <hh...@outlook.com>
AuthorDate: Thu Dec 3 16:44:33 2020 +0800

    [To rel/0.11] [Mem control] Move one of the insert check out from the sg lock (#2173)
---
 .../resources/conf/iotdb-engine.properties         |  8 ++--
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 14 +++---
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  6 +--
 .../org/apache/iotdb/db/engine/StorageEngine.java  | 20 ++++++++
 .../engine/storagegroup/StorageGroupProcessor.java | 15 +++++-
 .../db/engine/storagegroup/TsFileProcessor.java    | 53 ++++++----------------
 6 files changed, 61 insertions(+), 55 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 2079a57..9774a3c 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -245,11 +245,11 @@ storage_group_report_threshold=16777216
 # it's just an advised value, the real limitation will be the smaller one between this and the one we calculated
 max_deduplicated_path_num=1000
 
-# When an inserting is rejected, waiting time (in ms) to check system again, 0 by default.
-waiting_time_when_insert_blocked=0
+# When an inserting is rejected, waiting time (in ms) to check the status again, 50 by default.
+check_period_when_insert_blocked=50
 
-# When the waiting time (in ms) of an inserting exceeds this, throw an exception. 0 by default.
-max_waiting_time_when_insert_blocked=0
+# When the waiting time (in ms) of an inserting exceeds this, throw an exception. 10000 by default.
+max_waiting_time_when_insert_blocked=10000
 
 # estimated metadata size (in byte) of one timeseries in Mtree
 estimated_series_size=300
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 59e1ae2..19ccf5d 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -162,14 +162,14 @@ public class IoTDBConfig {
   private long storageGroupSizeReportThreshold = 16 * 1024 * 1024L;
 
   /**
-   * When inserting rejected, waiting this time to check system again
+   * When inserting rejected, waiting period to check system again
    */
-  private int waitingTimeWhenInsertBlockedInMs = 0;
+  private int checkPeriodWhenInsertBlocked = 50;
 
   /**
    * When inserting rejected exceeds this, throw an exception
    */
-  private int maxWaitingTimeWhenInsertBlockedInMs = 0; 
+  private int maxWaitingTimeWhenInsertBlockedInMs = 10000; 
   /**
    * Is the write ahead log enable.
    */
@@ -2006,12 +2006,12 @@ public class IoTDBConfig {
     this.maxQueryDeduplicatedPathNum = maxQueryDeduplicatedPathNum;
   }
 
-  public int getWaitingTimeWhenInsertBlocked() {
-    return waitingTimeWhenInsertBlockedInMs;
+  public int getCheckPeriodWhenInsertBlocked() {
+    return checkPeriodWhenInsertBlocked;
   }
 
-  public void setWaitingTimeWhenInsertBlocked(int waitingTimeWhenInsertBlocked) {
-    this.waitingTimeWhenInsertBlockedInMs = waitingTimeWhenInsertBlocked;
+  public void setCheckPeriodWhenInsertBlocked(int checkPeriodWhenInsertBlocked) {
+    this.checkPeriodWhenInsertBlocked = checkPeriodWhenInsertBlocked;
   }
 
   public int getMaxWaitingTimeWhenInsertBlocked() {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index d7a7058..dc95ef1 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -283,9 +283,9 @@ public class IoTDBDescriptor {
           .getProperty("avg_series_point_number_threshold",
               Integer.toString(conf.getAvgSeriesPointNumberThreshold()))));
 
-      conf.setWaitingTimeWhenInsertBlocked(Integer.parseInt(properties
-          .getProperty("waiting_time_when_insert_blocked",
-              Integer.toString(conf.getWaitingTimeWhenInsertBlocked()))));
+      conf.setCheckPeriodWhenInsertBlocked(Integer.parseInt(properties
+          .getProperty("check_period_when_insert_blocked",
+              Integer.toString(conf.getCheckPeriodWhenInsertBlocked()))));
 
       conf.setMaxWaitingTimeWhenInsertBlocked(Integer.parseInt(properties
           .getProperty("max_waiting_time_when_insert_blocked",
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index d7d1df6..7aeedd4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -72,6 +72,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryFileManager;
+import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.db.service.IService;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.service.ServiceType;
@@ -91,6 +92,7 @@ public class StorageEngine implements IService {
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private static final long TTL_CHECK_INTERVAL = 60 * 1000L;
 
+
   /**
    * a folder (system/storage_groups/ by default) that persist system info. Each Storage Processor
    * will have a subfolder under the systemDir.
@@ -824,4 +826,22 @@ public class StorageEngine implements IService {
   public void mergeUnLock(List<StorageGroupProcessor> list) {
     list.forEach(storageGroupProcessor -> storageGroupProcessor.getTsFileManagement().readUnLock());
   }
+
+  /**
+   * block insertion if the insertion is rejected by memory control
+   */
+  public static void blockInsertionIfReject() throws WriteProcessException {
+    long startTime = System.currentTimeMillis();
+    while (SystemInfo.getInstance().isRejected()) {
+      try {
+        TimeUnit.MILLISECONDS.sleep(config.getCheckPeriodWhenInsertBlocked());
+        if (System.currentTimeMillis() - startTime > config.getMaxWaitingTimeWhenInsertBlocked()) {
+          throw new WriteProcessException("System rejected over " + config.getMaxWaitingTimeWhenInsertBlocked() +
+              "ms");
+        }
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 5cd831a..8702b3b 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -123,6 +123,8 @@ public class StorageGroupProcessor {
   private static final String FAIL_TO_UPGRADE_FOLDER = "Failed to move {} to upgrade folder";
   private static final Logger DEBUG_LOGGER = LoggerFactory.getLogger("QUERY_DEBUG");
 
+  private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
   /**
    * All newly generated chunks after merge have version number 0, so we set merged Modification
    * file version to 1 to take effect
@@ -131,7 +133,6 @@ public class StorageGroupProcessor {
 
   private static final Logger logger = LoggerFactory.getLogger(StorageGroupProcessor.class);
 
-  private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private final boolean enableMemControl = config.isEnableMemControl();
   /**
    * indicating the file to be loaded already exists locally.
@@ -658,6 +659,9 @@ public class StorageGroupProcessor {
     if (!isAlive(insertRowPlan.getTime())) {
       throw new OutOfTTLException(insertRowPlan.getTime(), (System.currentTimeMillis() - dataTTL));
     }
+    if (enableMemControl) {
+      StorageEngine.blockInsertionIfReject();
+    }
     writeLock();
     try {
       // init map
@@ -692,6 +696,15 @@ public class StorageGroupProcessor {
    */
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   public void insertTablet(InsertTabletPlan insertTabletPlan) throws BatchInsertionException {
+    if (enableMemControl) {
+      try {
+        StorageEngine.blockInsertionIfReject();
+      } catch (WriteProcessException e) {
+        TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()];
+        Arrays.fill(results, RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
+        throw new BatchInsertionException(results);
+      }
+    }
     writeLock();
     try {
       TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()];
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 1007948..392e72d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -27,7 +27,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -35,6 +34,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.adapter.CompressionRatio;
+import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.flush.CloseFileListener;
 import org.apache.iotdb.db.engine.flush.FlushListener;
 import org.apache.iotdb.db.engine.flush.FlushManager;
@@ -83,8 +83,6 @@ public class TsFileProcessor {
 
   private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private final boolean enableMemControl = config.isEnableMemControl();
-  private final int waitingTimeWhenInsertBlocked = config.getWaitingTimeWhenInsertBlocked();
-  private final int maxWaitingTimeWhenInsertBlocked = config.getMaxWaitingTimeWhenInsertBlocked();
   private StorageGroupInfo storageGroupInfo;
   private TsFileProcessorInfo tsFileProcessorInfo;
 
@@ -177,7 +175,6 @@ public class TsFileProcessor {
       workMemTable = new PrimitiveMemTable(enableMemControl);
     }
     if (enableMemControl) {
-      blockInsertionIfReject();
       checkMemCostAndAddToTspInfo(insertRowPlan);
     }
 
@@ -218,12 +215,11 @@ public class TsFileProcessor {
     if (workMemTable == null) {
       workMemTable = new PrimitiveMemTable(enableMemControl);
     }
-    if (enableMemControl) {
-      blockInsertionIfReject();
-      checkMemCostAndAddToTspInfo(insertTabletPlan, start, end);
-    }
 
     try {
+      if (enableMemControl) {
+        checkMemCostAndAddToTspInfo(insertTabletPlan, start, end);
+      }
       workMemTable.insertTablet(insertTabletPlan, start, end);
       if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
         insertTabletPlan.setStart(start);
@@ -283,22 +279,8 @@ public class TsFileProcessor {
         textDataIncrement += MemUtils.getBinarySize((Binary) insertRowPlan.getValues()[i]);
       }
     }
-    memTableIncrement += textDataIncrement;
-    storageGroupInfo.addStorageGroupMemCost(memTableIncrement);
-    tsFileProcessorInfo.addTSPMemCost(unsealedResourceIncrement + chunkMetadataIncrement);
-    if (storageGroupInfo.needToReportToSystem()) {
-      SystemInfo.getInstance().reportStorageGroupStatus(storageGroupInfo);
-      try {
-        blockInsertionIfReject();
-      } catch (WriteProcessException e) {
-        storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement);
-        tsFileProcessorInfo.releaseTSPMemCost(unsealedResourceIncrement + chunkMetadataIncrement);
-        SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo, false);
-        throw e;
-      }
-    }
-    workMemTable.addTVListRamCost(memTableIncrement);
-    workMemTable.addTextDataSize(textDataIncrement);
+    updateMemoryInfo(memTableIncrement, unsealedResourceIncrement, 
+        chunkMetadataIncrement, textDataIncrement);
   }
 
   private void checkMemCostAndAddToTspInfo(InsertTabletPlan insertTabletPlan, int start, int end)
@@ -347,13 +329,19 @@ public class TsFileProcessor {
         textDataIncrement += MemUtils.getBinaryColumnSize(column, start, end);
       }
     }
+    updateMemoryInfo(memTableIncrement, unsealedResourceIncrement, 
+        chunkMetadataIncrement, textDataIncrement);
+  }
+
+  private void updateMemoryInfo(long memTableIncrement, long unsealedResourceIncrement,
+      long chunkMetadataIncrement, long textDataIncrement) throws WriteProcessException {
     memTableIncrement += textDataIncrement;
     storageGroupInfo.addStorageGroupMemCost(memTableIncrement);
     tsFileProcessorInfo.addTSPMemCost(unsealedResourceIncrement + chunkMetadataIncrement);
     if (storageGroupInfo.needToReportToSystem()) {
       SystemInfo.getInstance().reportStorageGroupStatus(storageGroupInfo);
       try {
-        blockInsertionIfReject();
+        StorageEngine.blockInsertionIfReject();
       } catch (WriteProcessException e) {
         storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement);
         tsFileProcessorInfo.releaseTSPMemCost(unsealedResourceIncrement + chunkMetadataIncrement);
@@ -365,21 +353,6 @@ public class TsFileProcessor {
     workMemTable.addTextDataSize(textDataIncrement);
   }
 
-  private void blockInsertionIfReject() throws WriteProcessException {
-    long startTime = System.currentTimeMillis();
-    while (SystemInfo.getInstance().isRejected()) {
-      try {
-        TimeUnit.MILLISECONDS.sleep(waitingTimeWhenInsertBlocked);
-        if (System.currentTimeMillis() - startTime > maxWaitingTimeWhenInsertBlocked) {
-          throw new WriteProcessException("System rejected over " + maxWaitingTimeWhenInsertBlocked + "ms");
-        }
-      } catch (InterruptedException e) {
-        logger.error("Failed when waiting for getting memory for insertion ", e);
-        Thread.currentThread().interrupt();
-      }
-    }
-  }
-
   /**
    * Delete data which belongs to the timeseries `deviceId.measurementId` and the timestamp of which
    * <= 'timestamp' in the deletion. <br/>