You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/12/03 12:10:09 UTC

[iotdb] branch rel/0.11 updated: [To rel/0.11] change mem control lock usage (#2114)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.11 by this push:
     new 75e4ddf  [To rel/0.11] change mem control lock usage (#2114)
75e4ddf is described below

commit 75e4ddfcdb7c1794e2a46e57676a0bf4819c9324
Author: Haonan <hh...@outlook.com>
AuthorDate: Thu Dec 3 20:09:58 2020 +0800

    [To rel/0.11] change mem control lock usage (#2114)
---
 .../resources/conf/iotdb-engine.properties         |  7 ++++
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  5 +--
 .../engine/storagegroup/StorageGroupProcessor.java |  9 ++++--
 .../db/engine/storagegroup/TsFileProcessor.java    | 13 ++++++--
 .../db/exception/WriteProcessRejectException.java  | 35 ++++++++++++++++++++
 .../org/apache/iotdb/db/rescon/SystemInfo.java     | 37 +++++++++++-----------
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |  1 +
 7 files changed, 81 insertions(+), 26 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 9774a3c..0d0fffd 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -174,6 +174,7 @@ timestamp_precision=ms
 wal_buffer_size=16777216
 
 # When a TsFile's file size (in byte) exceeds this, the TsFile is forced closed.
+# It may cause memTable size smaller if it is a large value
 tsfile_size_threshold=1
 
 # When a memTable's size (in byte) exceeds this, the memtable is flushed to disk. The default threshold is 256 MB.
@@ -224,18 +225,22 @@ enable_mem_control=true
 
 # Memory Allocation Ratio: Write, Read, Schema and Free Memory.
 # The parameter form is a:b:c:d, where a, b, c and d are integers. for example: 1:1:1:1 , 6:2:1:1
+# If you have high level of writing pressure and low level of reading pressure, please adjust it to for example 6:1:1:2 
 write_read_schema_free_memory_proportion=4:3:1:2
 
 # primitive array size (length of each array) in array pool
 primitive_array_size=128
 
 # Ratio of write memory for invoking flush disk, 0.4 by default
+# If you have extremely high write load (like batch=1000), it can be set lower than the default value like 0.2
 flush_proportion=0.4
 
 # Ratio of write memory allocated for buffered arrays, 0.6 by default
 buffered_arrays_memory_proportion=0.6
 
 # Ratio of write memory for rejecting insertion, 0.8 by default
+# If you have extremely high write load (like batch=1000) and the physical memory size is large enough, 
+# it can be set higher than the default value like 0.9
 reject_proportion=0.8
 
 # If memory (in byte) of storage group increased more than this threshold, report to system. The default value is 16MB
@@ -246,9 +251,11 @@ storage_group_report_threshold=16777216
 max_deduplicated_path_num=1000
 
 # When an inserting is rejected, waiting time (in ms) to check the status again, 50 by default.
+# If the insertion has been rejected and the read load is low, it can be set larger
 check_period_when_insert_blocked=50
 
 # When the waiting time (in ms) of an inserting exceeds this, throw an exception. 10000 by default.
+# If the insertion has been rejected and the read load is low, it can be set larger
 max_waiting_time_when_insert_blocked=10000
 
 # estimated metadata size (in byte) of one timeseries in Mtree
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 7aeedd4..6f796b0 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -61,6 +61,7 @@ import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.StorageGroupProcessorException;
 import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.exception.WriteProcessException;
+import org.apache.iotdb.db.exception.WriteProcessRejectException;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
@@ -830,13 +831,13 @@ public class StorageEngine implements IService {
   /**
    * block insertion if the insertion is rejected by memory control
    */
-  public static void blockInsertionIfReject() throws WriteProcessException {
+  public static void blockInsertionIfReject() throws WriteProcessRejectException {
     long startTime = System.currentTimeMillis();
     while (SystemInfo.getInstance().isRejected()) {
       try {
         TimeUnit.MILLISECONDS.sleep(config.getCheckPeriodWhenInsertBlocked());
         if (System.currentTimeMillis() - startTime > config.getMaxWaitingTimeWhenInsertBlocked()) {
-          throw new WriteProcessException("System rejected over " + config.getMaxWaitingTimeWhenInsertBlocked() +
+          throw new WriteProcessRejectException("System rejected over " + config.getMaxWaitingTimeWhenInsertBlocked() +
               "ms");
         }
       } catch (InterruptedException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 8702b3b..bfe409d 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -68,6 +68,7 @@ import org.apache.iotdb.db.exception.LoadFileException;
 import org.apache.iotdb.db.exception.StorageGroupProcessorException;
 import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.exception.WriteProcessException;
+import org.apache.iotdb.db.exception.WriteProcessRejectException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.OutOfTTLException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -699,9 +700,9 @@ public class StorageGroupProcessor {
     if (enableMemControl) {
       try {
         StorageEngine.blockInsertionIfReject();
-      } catch (WriteProcessException e) {
+      } catch (WriteProcessRejectException e) {
         TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()];
-        Arrays.fill(results, RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
+        Arrays.fill(results, RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT));
         throw new BatchInsertionException(results);
       }
     }
@@ -834,6 +835,9 @@ public class StorageGroupProcessor {
 
     try {
       tsFileProcessor.insertTablet(insertTabletPlan, start, end, results);
+    } catch (WriteProcessRejectException e) {
+      logger.warn("insert to TsFileProcessor rejected ", e);
+      return false;
     } catch (WriteProcessException e) {
       logger.error("insert to TsFileProcessor error ", e);
       return false;
@@ -888,7 +892,6 @@ public class StorageGroupProcessor {
       return;
     }
 
-    // insert TsFileProcessor
     tsFileProcessor.insert(insertRowPlan);
 
     // try to update the latest time of the device of this tsRecord
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 392e72d..c858cde 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -48,6 +48,7 @@ import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.UpdateEndTimeCallBack;
 import org.apache.iotdb.db.engine.version.VersionController;
+import org.apache.iotdb.db.exception.WriteProcessRejectException;
 import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -220,6 +221,13 @@ public class TsFileProcessor {
       if (enableMemControl) {
         checkMemCostAndAddToTspInfo(insertTabletPlan, start, end);
       }
+    } catch (WriteProcessException e) {
+      for (int i = start; i < end; i++) {
+        results[i] = RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT, e.getMessage());
+      }
+      throw new WriteProcessException(e);
+    }
+    try {
       workMemTable.insertTablet(insertTabletPlan, start, end);
       if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
         insertTabletPlan.setStart(start);
@@ -248,7 +256,8 @@ public class TsFileProcessor {
     tsFileResource.updatePlanIndexes(insertTabletPlan.getIndex());
   }
 
-  private void checkMemCostAndAddToTspInfo(InsertRowPlan insertRowPlan) throws WriteProcessException {
+  private void checkMemCostAndAddToTspInfo(InsertRowPlan insertRowPlan) 
+      throws WriteProcessException {
     // memory of increased PrimitiveArray and TEXT values, e.g., add a long[128], add 128*8
     long memTableIncrement = 0L;
     long textDataIncrement = 0L;
@@ -342,7 +351,7 @@ public class TsFileProcessor {
       SystemInfo.getInstance().reportStorageGroupStatus(storageGroupInfo);
       try {
         StorageEngine.blockInsertionIfReject();
-      } catch (WriteProcessException e) {
+      } catch (WriteProcessRejectException e) {
         storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement);
         tsFileProcessorInfo.releaseTSPMemCost(unsealedResourceIncrement + chunkMetadataIncrement);
         SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo, false);
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/WriteProcessRejectException.java b/server/src/main/java/org/apache/iotdb/db/exception/WriteProcessRejectException.java
new file mode 100644
index 0000000..90b9e58
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/exception/WriteProcessRejectException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.exception;
+
+import org.apache.iotdb.rpc.TSStatusCode;
+
+public class WriteProcessRejectException extends WriteProcessException {
+
+  private static final long serialVersionUID = -4217324287547595610L;
+
+  public WriteProcessRejectException(String message) {
+    super(message, TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode());
+  }
+
+  public WriteProcessRejectException(String message, int errorCode) {
+    super(message, errorCode);
+  }
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
index 74c00f0..593e0ce 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
@@ -20,11 +20,10 @@
 package org.apache.iotdb.db.rescon;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -39,13 +38,15 @@ public class SystemInfo {
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private static final Logger logger = LoggerFactory.getLogger(SystemInfo.class);
 
-  private AtomicLong totalSgMemCost = new AtomicLong();
+  private long totalSgMemCost = 0L;
   private volatile boolean rejected = false;
 
-  private Map<StorageGroupInfo, Long> reportedSgMemCostMap = new ConcurrentHashMap<>();
+  private Map<StorageGroupInfo, Long> reportedSgMemCostMap = new HashMap<>();
 
-  private static final double FLUSH_PROPORTION = config.getFlushProportion();
-  private static final double REJECT_PROPORTION = config.getRejectProportion();
+  private static final double FLUSH_THERSHOLD =
+      config.getAllocateMemoryForWrite() * config.getFlushProportion();
+  private static final double REJECT_THERSHOLD = 
+      config.getAllocateMemoryForWrite() * config.getRejectProportion();
 
   /**
    * Report current mem cost of storage group to system. Called when the memory of
@@ -53,22 +54,22 @@ public class SystemInfo {
    *
    * @param storageGroupInfo storage group
    */
-  public void reportStorageGroupStatus(StorageGroupInfo storageGroupInfo) {
+  public synchronized void reportStorageGroupStatus(StorageGroupInfo storageGroupInfo) {
     long delta = storageGroupInfo.getMemCost() -
         reportedSgMemCostMap.getOrDefault(storageGroupInfo, 0L);
-    totalSgMemCost.addAndGet(delta);
+    totalSgMemCost += delta;
     if (logger.isDebugEnabled()) {
       logger.debug("Report Storage Group Status to the system. "
           + "After adding {}, current sg mem cost is {}.", delta, totalSgMemCost);
     }
     reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getMemCost());
     storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost());
-    if (totalSgMemCost.get() >= config.getAllocateMemoryForWrite() * FLUSH_PROPORTION) {
+    if (totalSgMemCost >= FLUSH_THERSHOLD) {
       logger.debug("The total storage group mem costs are too large, call for flushing. "
           + "Current sg cost is {}", totalSgMemCost);
       chooseTSPToMarkFlush();
     }
-    if (totalSgMemCost.get() >= config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {
+    if (totalSgMemCost >= REJECT_THERSHOLD) {
       logger.info("Change system to reject status...");
       rejected = true;
     }
@@ -80,11 +81,11 @@ public class SystemInfo {
    *
    * @param storageGroupInfo storage group
    */
-  public void resetStorageGroupStatus(StorageGroupInfo storageGroupInfo,
+  public synchronized void resetStorageGroupStatus(StorageGroupInfo storageGroupInfo,
       boolean shouldInvokeFlush) {
     if (reportedSgMemCostMap.containsKey(storageGroupInfo)) {
-      this.totalSgMemCost.addAndGet(storageGroupInfo.getMemCost() -
-          reportedSgMemCostMap.get(storageGroupInfo));
+      this.totalSgMemCost -= (reportedSgMemCostMap.get(storageGroupInfo) -
+          storageGroupInfo.getMemCost());
       storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost());
       reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getMemCost());
       if (shouldInvokeFlush) {
@@ -94,8 +95,7 @@ public class SystemInfo {
   }
 
   private void checkSystemToInvokeFlush() {
-    if (totalSgMemCost.get() >= config.getAllocateMemoryForWrite() * FLUSH_PROPORTION &&
-        totalSgMemCost.get() < config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {
+    if (totalSgMemCost >= FLUSH_THERSHOLD && totalSgMemCost < REJECT_THERSHOLD) {
       logger.debug("Some sg memory released but still exceeding flush proportion, call flush.");
       if (rejected) {
         logger.info("Some sg memory released, set system to normal status.");
@@ -104,7 +104,7 @@ public class SystemInfo {
       rejected = false;
       forceAsyncFlush();
     }
-    else if (totalSgMemCost.get() >= config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {
+    else if (totalSgMemCost >= REJECT_THERSHOLD) {
       logger.warn("Some sg memory released, but system is still in reject status.");
       logCurrentTotalSGMemory();
       rejected = true;
@@ -169,8 +169,7 @@ public class SystemInfo {
     }
     List<TsFileProcessor> processors = new ArrayList<>();
     long memCost = 0;
-    while (totalSgMemCost.get() - memCost > config.getAllocateMemoryForWrite() *
-        FLUSH_PROPORTION / 2) {
+    while (totalSgMemCost - memCost > FLUSH_THERSHOLD / 2) {
       if (tsps.isEmpty() || tsps.peek().getWorkMemTableRamCost() == 0) {
         return processors;
       }
@@ -187,7 +186,7 @@ public class SystemInfo {
 
   public void close() {
     reportedSgMemCostMap.clear();
-    totalSgMemCost.set(0);
+    totalSgMemCost = 0;
     rejected = false;
   }
 
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 3490e51..6e57937 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -59,6 +59,7 @@ public enum TSStatusCode {
   PATH_ERROR(410),
   QUERY_PROCESS_ERROR(411),
   WRITE_PROCESS_ERROR(412),
+  WRITE_PROCESS_REJECT(413),
 
   INTERNAL_SERVER_ERROR(500),
   CLOSE_OPERATION_ERROR(501),