You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2020/11/24 15:00:12 UTC
[iotdb] 01/01: change reject error log to warn
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch mem_control_11_op
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2f56819f717c141bdc6da48a1797b3ed25801e8b
Author: HTHou <hh...@outlook.com>
AuthorDate: Tue Nov 24 22:58:50 2020 +0800
change reject error log to warn
---
.../engine/storagegroup/StorageGroupProcessor.java | 5 +++-
.../db/engine/storagegroup/TsFileProcessor.java | 19 +++++++-----
.../db/exception/WriteProcessRejectException.java | 35 ++++++++++++++++++++++
.../org/apache/iotdb/db/rescon/SystemInfo.java | 19 ++++++------
4 files changed, 60 insertions(+), 18 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 5cd831a..3df849f 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -68,6 +68,7 @@ import org.apache.iotdb.db.exception.LoadFileException;
import org.apache.iotdb.db.exception.StorageGroupProcessorException;
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.exception.WriteProcessException;
+import org.apache.iotdb.db.exception.WriteProcessRejectException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.OutOfTTLException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -821,6 +822,9 @@ public class StorageGroupProcessor {
try {
tsFileProcessor.insertTablet(insertTabletPlan, start, end, results);
+ } catch (WriteProcessRejectException e) {
+ logger.warn("insert to TsFileProcessor rejected ", e);
+ return false;
} catch (WriteProcessException e) {
logger.error("insert to TsFileProcessor error ", e);
return false;
@@ -875,7 +879,6 @@ public class StorageGroupProcessor {
return;
}
- // insert TsFileProcessor
tsFileProcessor.insert(insertRowPlan);
// try to update the latest time of the device of this tsRecord
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 1007948..3486a1e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -48,6 +48,7 @@ import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.UpdateEndTimeCallBack;
import org.apache.iotdb.db.engine.version.VersionController;
+import org.apache.iotdb.db.exception.WriteProcessRejectException;
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -171,7 +172,8 @@ public class TsFileProcessor {
*
* @param insertRowPlan physical plan of insertion
*/
- public void insert(InsertRowPlan insertRowPlan) throws WriteProcessException {
+ public void insert(InsertRowPlan insertRowPlan)
+ throws WriteProcessException, WriteProcessRejectException {
if (workMemTable == null) {
workMemTable = new PrimitiveMemTable(enableMemControl);
@@ -213,7 +215,7 @@ public class TsFileProcessor {
* @param results result array
*/
public void insertTablet(InsertTabletPlan insertTabletPlan, int start, int end,
- TSStatus[] results) throws WriteProcessException {
+ TSStatus[] results) throws WriteProcessException, WriteProcessRejectException {
if (workMemTable == null) {
workMemTable = new PrimitiveMemTable(enableMemControl);
@@ -252,7 +254,8 @@ public class TsFileProcessor {
tsFileResource.updatePlanIndexes(insertTabletPlan.getIndex());
}
- private void checkMemCostAndAddToTspInfo(InsertRowPlan insertRowPlan) throws WriteProcessException {
+ private void checkMemCostAndAddToTspInfo(InsertRowPlan insertRowPlan)
+ throws WriteProcessException, WriteProcessRejectException {
// memory of increased PrimitiveArray and TEXT values, e.g., add a long[128], add 128*8
long memTableIncrement = 0L;
long textDataIncrement = 0L;
@@ -290,7 +293,7 @@ public class TsFileProcessor {
SystemInfo.getInstance().reportStorageGroupStatus(storageGroupInfo);
try {
blockInsertionIfReject();
- } catch (WriteProcessException e) {
+ } catch (WriteProcessRejectException e) {
storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement);
tsFileProcessorInfo.releaseTSPMemCost(unsealedResourceIncrement + chunkMetadataIncrement);
SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo, false);
@@ -302,7 +305,7 @@ public class TsFileProcessor {
}
private void checkMemCostAndAddToTspInfo(InsertTabletPlan insertTabletPlan, int start, int end)
- throws WriteProcessException {
+ throws WriteProcessException, WriteProcessRejectException {
if (start >= end) {
return;
}
@@ -354,7 +357,7 @@ public class TsFileProcessor {
SystemInfo.getInstance().reportStorageGroupStatus(storageGroupInfo);
try {
blockInsertionIfReject();
- } catch (WriteProcessException e) {
+ } catch (WriteProcessRejectException e) {
storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement);
tsFileProcessorInfo.releaseTSPMemCost(unsealedResourceIncrement + chunkMetadataIncrement);
SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo, false);
@@ -365,13 +368,13 @@ public class TsFileProcessor {
workMemTable.addTextDataSize(textDataIncrement);
}
- private void blockInsertionIfReject() throws WriteProcessException {
+ private void blockInsertionIfReject() throws WriteProcessRejectException {
long startTime = System.currentTimeMillis();
while (SystemInfo.getInstance().isRejected()) {
try {
TimeUnit.MILLISECONDS.sleep(waitingTimeWhenInsertBlocked);
if (System.currentTimeMillis() - startTime > maxWaitingTimeWhenInsertBlocked) {
- throw new WriteProcessException("System rejected over " + maxWaitingTimeWhenInsertBlocked + "ms");
+ throw new WriteProcessRejectException("System rejected over " + maxWaitingTimeWhenInsertBlocked + "ms");
}
} catch (InterruptedException e) {
logger.error("Failed when waiting for getting memory for insertion ", e);
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/WriteProcessRejectException.java b/server/src/main/java/org/apache/iotdb/db/exception/WriteProcessRejectException.java
new file mode 100644
index 0000000..2609ab2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/exception/WriteProcessRejectException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.exception;
+
+import org.apache.iotdb.rpc.TSStatusCode;
+
+public class WriteProcessRejectException extends WriteProcessException {
+
+ private static final long serialVersionUID = -4217324287547595610L;
+
+ public WriteProcessRejectException(String message) {
+ super(message, TSStatusCode.WRITE_PROCESS_ERROR.getStatusCode());
+ }
+
+ public WriteProcessRejectException(String message, int errorCode) {
+ super(message, errorCode);
+ }
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
index 74c00f0..e4f6c84 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
@@ -44,8 +44,10 @@ public class SystemInfo {
private Map<StorageGroupInfo, Long> reportedSgMemCostMap = new ConcurrentHashMap<>();
- private static final double FLUSH_PROPORTION = config.getFlushProportion();
- private static final double REJECT_PROPORTION = config.getRejectProportion();
+ private static final double FLUSH_THERSHOLD =
+ config.getAllocateMemoryForWrite() * config.getFlushProportion();
+ private static final double REJECT_THERSHOLD =
+ config.getAllocateMemoryForWrite() * config.getRejectProportion();
/**
* Report current mem cost of storage group to system. Called when the memory of
@@ -63,12 +65,12 @@ public class SystemInfo {
}
reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getMemCost());
storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost());
- if (totalSgMemCost.get() >= config.getAllocateMemoryForWrite() * FLUSH_PROPORTION) {
+ if (totalSgMemCost.get() >= FLUSH_THERSHOLD) {
logger.debug("The total storage group mem costs are too large, call for flushing. "
+ "Current sg cost is {}", totalSgMemCost);
chooseTSPToMarkFlush();
}
- if (totalSgMemCost.get() >= config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {
+ if (totalSgMemCost.get() >= REJECT_THERSHOLD) {
logger.info("Change system to reject status...");
rejected = true;
}
@@ -94,8 +96,8 @@ public class SystemInfo {
}
private void checkSystemToInvokeFlush() {
- if (totalSgMemCost.get() >= config.getAllocateMemoryForWrite() * FLUSH_PROPORTION &&
- totalSgMemCost.get() < config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {
+ if (totalSgMemCost.get() >= FLUSH_THERSHOLD &&
+ totalSgMemCost.get() < REJECT_THERSHOLD) {
logger.debug("Some sg memory released but still exceeding flush proportion, call flush.");
if (rejected) {
logger.info("Some sg memory released, set system to normal status.");
@@ -104,7 +106,7 @@ public class SystemInfo {
rejected = false;
forceAsyncFlush();
}
- else if (totalSgMemCost.get() >= config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {
+ else if (totalSgMemCost.get() >= REJECT_THERSHOLD) {
logger.warn("Some sg memory released, but system is still in reject status.");
logCurrentTotalSGMemory();
rejected = true;
@@ -169,8 +171,7 @@ public class SystemInfo {
}
List<TsFileProcessor> processors = new ArrayList<>();
long memCost = 0;
- while (totalSgMemCost.get() - memCost > config.getAllocateMemoryForWrite() *
- FLUSH_PROPORTION / 2) {
+ while (totalSgMemCost.get() - memCost > FLUSH_THERSHOLD / 2) {
if (tsps.isEmpty() || tsps.peek().getWorkMemTableRamCost() == 0) {
return processors;
}