You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2020/12/03 03:24:08 UTC

[iotdb] branch mc_insert_block_11 created (now 59085e0)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a change to branch mc_insert_block_11
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 59085e0  change a config name

This branch includes the following new commits:

     new 1ce8b79  more one insert check out of the sg lock
     new da3547f  fix review
     new 82313f4  fix status code error
     new 4720785  fix error
     new 59085e0  change a config name

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/05: more one insert check out of the sg lock

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch mc_insert_block_11
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1ce8b79c8a9bd87911245069f6bf90bcf0851aa4
Author: HTHou <hh...@outlook.com>
AuthorDate: Tue Dec 1 16:46:44 2020 +0800

    more one insert check out of the sg lock
---
 .../engine/storagegroup/StorageGroupProcessor.java | 33 ++++++++++++++++++++--
 .../db/engine/storagegroup/TsFileProcessor.java    |  2 --
 2 files changed, 31 insertions(+), 4 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 5cd831a..2c4e09f 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -40,6 +40,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.commons.io.FileUtils;
@@ -79,6 +80,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryFileManager;
+import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.service.UpgradeSevice;
 import org.apache.iotdb.db.utils.CopyOnReadLinkedList;
@@ -123,6 +125,8 @@ public class StorageGroupProcessor {
   private static final String FAIL_TO_UPGRADE_FOLDER = "Failed to move {} to upgrade folder";
   private static final Logger DEBUG_LOGGER = LoggerFactory.getLogger("QUERY_DEBUG");
 
+  private final int waitingTimeWhenInsertBlocked = config.getWaitingTimeWhenInsertBlocked();
+  private final int maxWaitingTimeWhenInsertBlocked = config.getMaxWaitingTimeWhenInsertBlocked();
   /**
    * All newly generated chunks after merge have version number 0, so we set merged Modification
    * file version to 1 to take effect
@@ -658,6 +662,9 @@ public class StorageGroupProcessor {
     if (!isAlive(insertRowPlan.getTime())) {
       throw new OutOfTTLException(insertRowPlan.getTime(), (System.currentTimeMillis() - dataTTL));
     }
+    if (enableMemControl) {
+      blockInsertionIfReject();
+    }
     writeLock();
     try {
       // init map
@@ -692,10 +699,17 @@ public class StorageGroupProcessor {
    */
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   public void insertTablet(InsertTabletPlan insertTabletPlan) throws BatchInsertionException {
+    TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()];
+    Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
+    if (enableMemControl) {
+      try {
+        blockInsertionIfReject();
+      } catch (WriteProcessException e) {
+        throw new BatchInsertionException(results);
+      }
+    }
     writeLock();
     try {
-      TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()];
-      Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
       boolean noFailure = true;
 
       /*
@@ -784,6 +798,21 @@ public class StorageGroupProcessor {
     }
   }
 
+  private void blockInsertionIfReject() throws WriteProcessException {
+    long startTime = System.currentTimeMillis();
+    while (SystemInfo.getInstance().isRejected()) {
+      try {
+        TimeUnit.MILLISECONDS.sleep(waitingTimeWhenInsertBlocked);
+        if (System.currentTimeMillis() - startTime > maxWaitingTimeWhenInsertBlocked) {
+          throw new WriteProcessException("System rejected over " + maxWaitingTimeWhenInsertBlocked + "ms");
+        }
+      } catch (InterruptedException e) {
+        logger.error("Failed when waiting for getting memory for insertion ", e);
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
   /**
    * @return whether the given time falls in ttl
    */
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 1007948..dd9b896 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -177,7 +177,6 @@ public class TsFileProcessor {
       workMemTable = new PrimitiveMemTable(enableMemControl);
     }
     if (enableMemControl) {
-      blockInsertionIfReject();
       checkMemCostAndAddToTspInfo(insertRowPlan);
     }
 
@@ -219,7 +218,6 @@ public class TsFileProcessor {
       workMemTable = new PrimitiveMemTable(enableMemControl);
     }
     if (enableMemControl) {
-      blockInsertionIfReject();
       checkMemCostAndAddToTspInfo(insertTabletPlan, start, end);
     }
 


[iotdb] 05/05: change a config name

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch mc_insert_block_11
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 59085e0b9f97f75d2f0a21dd26be4cee0f5e8266
Author: HTHou <hh...@outlook.com>
AuthorDate: Thu Dec 3 11:23:33 2020 +0800

    change a config name
---
 server/src/assembly/resources/conf/iotdb-engine.properties |  8 ++++----
 .../main/java/org/apache/iotdb/db/conf/IoTDBConfig.java    | 14 +++++++-------
 .../java/org/apache/iotdb/db/conf/IoTDBDescriptor.java     |  6 +++---
 .../db/engine/storagegroup/StorageGroupProcessor.java      |  2 +-
 .../iotdb/db/engine/storagegroup/TsFileProcessor.java      |  2 +-
 5 files changed, 16 insertions(+), 16 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 2079a57..20b9e6a 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -245,11 +245,11 @@ storage_group_report_threshold=16777216
 # it's just an advised value, the real limitation will be the smaller one between this and the one we calculated
 max_deduplicated_path_num=1000
 
-# When an inserting is rejected, waiting time (in ms) to check system again, 0 by default.
-waiting_time_when_insert_blocked=0
+# When an inserting is rejected, waiting period (in ms) to check system again, 50 by default.
+waiting_period_when_insert_blocked=50
 
-# When the waiting time (in ms) of an inserting exceeds this, throw an exception. 0 by default.
-max_waiting_time_when_insert_blocked=0
+# When the waiting time (in ms) of an inserting exceeds this, throw an exception. 10000 by default.
+max_waiting_time_when_insert_blocked=10000
 
 # estimated metadata size (in byte) of one timeseries in Mtree
 estimated_series_size=300
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 59e1ae2..0fafb1b 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -162,14 +162,14 @@ public class IoTDBConfig {
   private long storageGroupSizeReportThreshold = 16 * 1024 * 1024L;
 
   /**
-   * When inserting rejected, waiting this time to check system again
+   * When inserting rejected, waiting period to check system again
    */
-  private int waitingTimeWhenInsertBlockedInMs = 0;
+  private int waitingPeriodWhenInsertBlockedInMs = 50;
 
   /**
    * When inserting rejected exceeds this, throw an exception
    */
-  private int maxWaitingTimeWhenInsertBlockedInMs = 0; 
+  private int maxWaitingTimeWhenInsertBlockedInMs = 10000; 
   /**
    * Is the write ahead log enable.
    */
@@ -2006,12 +2006,12 @@ public class IoTDBConfig {
     this.maxQueryDeduplicatedPathNum = maxQueryDeduplicatedPathNum;
   }
 
-  public int getWaitingTimeWhenInsertBlocked() {
-    return waitingTimeWhenInsertBlockedInMs;
+  public int getWaitingPeriodWhenInsertBlocked() {
+    return waitingPeriodWhenInsertBlockedInMs;
   }
 
-  public void setWaitingTimeWhenInsertBlocked(int waitingTimeWhenInsertBlocked) {
-    this.waitingTimeWhenInsertBlockedInMs = waitingTimeWhenInsertBlocked;
+  public void setWaitingPeriodWhenInsertBlocked(int waitingPeriodWhenInsertBlocked) {
+    this.waitingPeriodWhenInsertBlockedInMs = waitingPeriodWhenInsertBlocked;
   }
 
   public int getMaxWaitingTimeWhenInsertBlocked() {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index d7a7058..a5b91dc 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -283,9 +283,9 @@ public class IoTDBDescriptor {
           .getProperty("avg_series_point_number_threshold",
               Integer.toString(conf.getAvgSeriesPointNumberThreshold()))));
 
-      conf.setWaitingTimeWhenInsertBlocked(Integer.parseInt(properties
-          .getProperty("waiting_time_when_insert_blocked",
-              Integer.toString(conf.getWaitingTimeWhenInsertBlocked()))));
+      conf.setWaitingPeriodWhenInsertBlocked(Integer.parseInt(properties
+          .getProperty("waiting_period_when_insert_blocked",
+              Integer.toString(conf.getWaitingPeriodWhenInsertBlocked()))));
 
       conf.setMaxWaitingTimeWhenInsertBlocked(Integer.parseInt(properties
           .getProperty("max_waiting_time_when_insert_blocked",
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 9c7d2b5..becd9fd 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -126,7 +126,7 @@ public class StorageGroupProcessor {
   private static final Logger DEBUG_LOGGER = LoggerFactory.getLogger("QUERY_DEBUG");
 
   private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-  private final int waitingTimeWhenInsertBlocked = config.getWaitingTimeWhenInsertBlocked();
+  private final int waitingTimeWhenInsertBlocked = config.getWaitingPeriodWhenInsertBlocked();
   private final int maxWaitingTimeWhenInsertBlocked = config.getMaxWaitingTimeWhenInsertBlocked();
   /**
    * All newly generated chunks after merge have version number 0, so we set merged Modification
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 108628d..65cf441 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -83,7 +83,7 @@ public class TsFileProcessor {
 
   private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private final boolean enableMemControl = config.isEnableMemControl();
-  private final int waitingTimeWhenInsertBlocked = config.getWaitingTimeWhenInsertBlocked();
+  private final int waitingTimeWhenInsertBlocked = config.getWaitingPeriodWhenInsertBlocked();
   private final int maxWaitingTimeWhenInsertBlocked = config.getMaxWaitingTimeWhenInsertBlocked();
   private StorageGroupInfo storageGroupInfo;
   private TsFileProcessorInfo tsFileProcessorInfo;


[iotdb] 03/05: fix status code error

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch mc_insert_block_11
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 82313f4de915a338a5fecd03a2721f9c8b867d49
Author: HTHou <hh...@outlook.com>
AuthorDate: Thu Dec 3 10:11:42 2020 +0800

    fix status code error
---
 .../apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java  | 2 +-
 .../org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java    | 6 +++---
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 609f625..4a995fa 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -704,7 +704,7 @@ public class StorageGroupProcessor {
         blockInsertionIfReject();
       } catch (WriteProcessException e) {
         TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()];
-        Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
+        Arrays.fill(results, RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_ERROR));
         throw new BatchInsertionException(results);
       }
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index dd9b896..108628d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -217,11 +217,11 @@ public class TsFileProcessor {
     if (workMemTable == null) {
       workMemTable = new PrimitiveMemTable(enableMemControl);
     }
-    if (enableMemControl) {
-      checkMemCostAndAddToTspInfo(insertTabletPlan, start, end);
-    }
 
     try {
+      if (enableMemControl) {
+        checkMemCostAndAddToTspInfo(insertTabletPlan, start, end);
+      }
       workMemTable.insertTablet(insertTabletPlan, start, end);
       if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
         insertTabletPlan.setStart(start);


[iotdb] 02/05: fix review

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch mc_insert_block_11
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit da3547f03a9265a439cfd13dc55e9d680d2be98d
Author: HTHou <hh...@outlook.com>
AuthorDate: Tue Dec 1 17:49:38 2020 +0800

    fix review
---
 .../apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java  | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 2c4e09f..609f625 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -699,17 +699,19 @@ public class StorageGroupProcessor {
    */
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   public void insertTablet(InsertTabletPlan insertTabletPlan) throws BatchInsertionException {
-    TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()];
-    Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
     if (enableMemControl) {
       try {
         blockInsertionIfReject();
       } catch (WriteProcessException e) {
+        TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()];
+        Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
         throw new BatchInsertionException(results);
       }
     }
     writeLock();
     try {
+      TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()];
+      Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
       boolean noFailure = true;
 
       /*


[iotdb] 04/05: fix error

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch mc_insert_block_11
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 47207858eed34692af39257c5f5000b83fa52252
Author: HTHou <hh...@outlook.com>
AuthorDate: Thu Dec 3 11:17:07 2020 +0800

    fix error
---
 .../org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 4a995fa..9c7d2b5 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -125,6 +125,7 @@ public class StorageGroupProcessor {
   private static final String FAIL_TO_UPGRADE_FOLDER = "Failed to move {} to upgrade folder";
   private static final Logger DEBUG_LOGGER = LoggerFactory.getLogger("QUERY_DEBUG");
 
+  private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private final int waitingTimeWhenInsertBlocked = config.getWaitingTimeWhenInsertBlocked();
   private final int maxWaitingTimeWhenInsertBlocked = config.getMaxWaitingTimeWhenInsertBlocked();
   /**
@@ -135,7 +136,6 @@ public class StorageGroupProcessor {
 
   private static final Logger logger = LoggerFactory.getLogger(StorageGroupProcessor.class);
 
-  private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private final boolean enableMemControl = config.isEnableMemControl();
   /**
    * indicating the file to be loaded already exists locally.