You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2020/12/01 02:54:25 UTC

[iotdb] 01/04: change reject error log to warn

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch mem_control_op
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5b7ea3d370cc3500446baabc55b94f7851aa4527
Author: HTHou <hh...@outlook.com>
AuthorDate: Tue Nov 24 22:58:50 2020 +0800

    change reject error log to warn
---
 .../engine/storagegroup/StorageGroupProcessor.java |  5 +++-
 .../db/engine/storagegroup/TsFileProcessor.java    | 19 +++++++-----
 .../db/exception/WriteProcessRejectException.java  | 35 ++++++++++++++++++++++
 .../org/apache/iotdb/db/rescon/SystemInfo.java     | 19 ++++++------
 4 files changed, 60 insertions(+), 18 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 69aec66..ffa4002 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -68,6 +68,7 @@ import org.apache.iotdb.db.exception.LoadFileException;
 import org.apache.iotdb.db.exception.StorageGroupProcessorException;
 import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.exception.WriteProcessException;
+import org.apache.iotdb.db.exception.WriteProcessRejectException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.OutOfTTLException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -839,6 +840,9 @@ public class StorageGroupProcessor {
 
     try {
       tsFileProcessor.insertTablet(insertTabletPlan, start, end, results);
+    } catch (WriteProcessRejectException e) {
+      logger.warn("insert to TsFileProcessor rejected ", e);
+      return false;
     } catch (WriteProcessException e) {
       logger.error("insert to TsFileProcessor error ", e);
       return false;
@@ -891,7 +895,6 @@ public class StorageGroupProcessor {
       return;
     }
 
-    // insert TsFileProcessor
     tsFileProcessor.insert(insertRowPlan);
 
     // try to update the latest time of the device of this tsRecord
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 4ed4d31..b6e217b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -48,6 +48,7 @@ import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.UpdateEndTimeCallBack;
 import org.apache.iotdb.db.engine.version.VersionController;
+import org.apache.iotdb.db.exception.WriteProcessRejectException;
 import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -171,7 +172,8 @@ public class TsFileProcessor {
    *
    * @param insertRowPlan physical plan of insertion
    */
-  public void insert(InsertRowPlan insertRowPlan) throws WriteProcessException {
+  public void insert(InsertRowPlan insertRowPlan) 
+      throws WriteProcessException, WriteProcessRejectException {
 
     if (workMemTable == null) {
       workMemTable = new PrimitiveMemTable(enableMemControl);
@@ -214,7 +216,7 @@ public class TsFileProcessor {
    * @param results result array
    */
   public void insertTablet(InsertTabletPlan insertTabletPlan, int start, int end,
-      TSStatus[] results) throws WriteProcessException {
+      TSStatus[] results) throws WriteProcessException, WriteProcessRejectException {
 
     if (workMemTable == null) {
       workMemTable = new PrimitiveMemTable(enableMemControl);
@@ -254,7 +256,8 @@ public class TsFileProcessor {
     tsFileResource.updatePlanIndexes(insertTabletPlan.getIndex());
   }
 
-  private void checkMemCostAndAddToTspInfo(InsertRowPlan insertRowPlan) throws WriteProcessException {
+  private void checkMemCostAndAddToTspInfo(InsertRowPlan insertRowPlan) 
+      throws WriteProcessException, WriteProcessRejectException {
     // memory of increased PrimitiveArray and TEXT values, e.g., add a long[128], add 128*8
     long memTableIncrement = 0L;
     long textDataIncrement = 0L;
@@ -292,7 +295,7 @@ public class TsFileProcessor {
       SystemInfo.getInstance().reportStorageGroupStatus(storageGroupInfo);
       try {
         blockInsertionIfReject();
-      } catch (WriteProcessException e) {
+      } catch (WriteProcessRejectException e) {
         storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement);
         tsFileProcessorInfo.releaseTSPMemCost(unsealedResourceIncrement + chunkMetadataIncrement);
         SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo, false);
@@ -304,7 +307,7 @@ public class TsFileProcessor {
   }
 
   private void checkMemCostAndAddToTspInfo(InsertTabletPlan insertTabletPlan, int start, int end)
-      throws WriteProcessException {
+      throws WriteProcessException, WriteProcessRejectException {
     if (start >= end) {
       return;
     }
@@ -356,7 +359,7 @@ public class TsFileProcessor {
       SystemInfo.getInstance().reportStorageGroupStatus(storageGroupInfo);
       try {
         blockInsertionIfReject();
-      } catch (WriteProcessException e) {
+      } catch (WriteProcessRejectException e) {
         storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement);
         tsFileProcessorInfo.releaseTSPMemCost(unsealedResourceIncrement + chunkMetadataIncrement);
         SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo, false);
@@ -367,13 +370,13 @@ public class TsFileProcessor {
     workMemTable.addTextDataSize(textDataIncrement);
   }
 
-  private void blockInsertionIfReject() throws WriteProcessException {
+  private void blockInsertionIfReject() throws WriteProcessRejectException {
     long startTime = System.currentTimeMillis();
     while (SystemInfo.getInstance().isRejected()) {
       try {
         TimeUnit.MILLISECONDS.sleep(waitingTimeWhenInsertBlocked);
         if (System.currentTimeMillis() - startTime > maxWaitingTimeWhenInsertBlocked) {
-          throw new WriteProcessException("System rejected over " + maxWaitingTimeWhenInsertBlocked + "ms");
+          throw new WriteProcessRejectException("System rejected over " + maxWaitingTimeWhenInsertBlocked + "ms");
         }
       } catch (InterruptedException e) {
         logger.error("Failed when waiting for getting memory for insertion ", e);
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/WriteProcessRejectException.java b/server/src/main/java/org/apache/iotdb/db/exception/WriteProcessRejectException.java
new file mode 100644
index 0000000..2609ab2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/exception/WriteProcessRejectException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.exception;
+
+import org.apache.iotdb.rpc.TSStatusCode;
+
+public class WriteProcessRejectException extends WriteProcessException {
+
+  private static final long serialVersionUID = -4217324287547595610L;
+
+  public WriteProcessRejectException(String message) {
+    super(message, TSStatusCode.WRITE_PROCESS_ERROR.getStatusCode());
+  }
+
+  public WriteProcessRejectException(String message, int errorCode) {
+    super(message, errorCode);
+  }
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
index 74c00f0..e4f6c84 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
@@ -44,8 +44,10 @@ public class SystemInfo {
 
   private Map<StorageGroupInfo, Long> reportedSgMemCostMap = new ConcurrentHashMap<>();
 
-  private static final double FLUSH_PROPORTION = config.getFlushProportion();
-  private static final double REJECT_PROPORTION = config.getRejectProportion();
+  private static final double FLUSH_THERSHOLD =
+      config.getAllocateMemoryForWrite() * config.getFlushProportion();
+  private static final double REJECT_THERSHOLD = 
+      config.getAllocateMemoryForWrite() * config.getRejectProportion();
 
   /**
    * Report current mem cost of storage group to system. Called when the memory of
@@ -63,12 +65,12 @@ public class SystemInfo {
     }
     reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getMemCost());
     storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost());
-    if (totalSgMemCost.get() >= config.getAllocateMemoryForWrite() * FLUSH_PROPORTION) {
+    if (totalSgMemCost.get() >= FLUSH_THERSHOLD) {
       logger.debug("The total storage group mem costs are too large, call for flushing. "
           + "Current sg cost is {}", totalSgMemCost);
       chooseTSPToMarkFlush();
     }
-    if (totalSgMemCost.get() >= config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {
+    if (totalSgMemCost.get() >= REJECT_THERSHOLD) {
       logger.info("Change system to reject status...");
       rejected = true;
     }
@@ -94,8 +96,8 @@ public class SystemInfo {
   }
 
   private void checkSystemToInvokeFlush() {
-    if (totalSgMemCost.get() >= config.getAllocateMemoryForWrite() * FLUSH_PROPORTION &&
-        totalSgMemCost.get() < config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {
+    if (totalSgMemCost.get() >= FLUSH_THERSHOLD &&
+        totalSgMemCost.get() < REJECT_THERSHOLD) {
       logger.debug("Some sg memory released but still exceeding flush proportion, call flush.");
       if (rejected) {
         logger.info("Some sg memory released, set system to normal status.");
@@ -104,7 +106,7 @@ public class SystemInfo {
       rejected = false;
       forceAsyncFlush();
     }
-    else if (totalSgMemCost.get() >= config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {
+    else if (totalSgMemCost.get() >= REJECT_THERSHOLD) {
       logger.warn("Some sg memory released, but system is still in reject status.");
       logCurrentTotalSGMemory();
       rejected = true;
@@ -169,8 +171,7 @@ public class SystemInfo {
     }
     List<TsFileProcessor> processors = new ArrayList<>();
     long memCost = 0;
-    while (totalSgMemCost.get() - memCost > config.getAllocateMemoryForWrite() *
-        FLUSH_PROPORTION / 2) {
+    while (totalSgMemCost.get() - memCost > FLUSH_THERSHOLD / 2) {
       if (tsps.isEmpty() || tsps.peek().getWorkMemTableRamCost() == 0) {
         return processors;
       }