You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2022/07/21 12:37:05 UTC
[iotdb] 03/03: finish for test
This is an automated email from the ASF dual-hosted git repository.
marklau99 pushed a commit to branch IOTDB-3164-Allocate-By-Tablets
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2ad223946bb48fcad5589234591870e1aeb221a1
Author: Liu Xuxin <li...@outlook.com>
AuthorDate: Thu Jul 21 20:36:43 2022 +0800
finish for test
---
.../db/engine/storagegroup/TsFileProcessor.java | 25 ++++++++++++++++----
.../iotdb/db/rescon/memory/MemoryController.java | 14 ++++++-----
.../db/rescon/memory/WriteMemoryController.java | 27 ++++++++++++++--------
3 files changed, 45 insertions(+), 21 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 4d1e2891d1..c2b7ff861e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.adapter.CompressionRatio;
-import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.flush.CloseFileListener;
import org.apache.iotdb.db.engine.flush.FlushListener;
import org.apache.iotdb.db.engine.flush.FlushManager;
@@ -85,6 +84,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -786,11 +786,26 @@ public class TsFileProcessor {
tsFileProcessorInfo.addTSPMemCost(chunkMetadataIncrement);
WriteMemoryController controller = WriteMemoryController.getInstance();
boolean allocateMemory = false;
+ long startTime = System.currentTimeMillis();
try {
- allocateMemory = controller.tryAllocateMemory(memTableIncrement, storageGroupInfo, this);
- if (!allocateMemory) {
- StorageEngine.blockInsertionIfReject(this);
- }
+ do {
+ if (this.shouldFlush()) {
+ break;
+ }
+ allocateMemory = controller.tryAllocateMemory(memTableIncrement, storageGroupInfo, this);
+ try {
+ if (!allocateMemory) {
+ TimeUnit.MILLISECONDS.sleep(config.getCheckPeriodWhenInsertBlocked());
+ if (System.currentTimeMillis() - startTime
+ > config.getMaxWaitingTimeWhenInsertBlocked()) {
+ throw new WriteProcessRejectException(
+ "System rejected over " + (System.currentTimeMillis() - startTime) + "ms");
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ } while (!allocateMemory);
} catch (WriteProcessRejectException e) {
storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement);
tsFileProcessorInfo.releaseTSPMemCost(chunkMetadataIncrement);
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java b/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java
index 80fe1063a7..cffe20e9f4 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java
@@ -58,7 +58,7 @@ public class MemoryController<T> {
* @param size
* @return true if success to allocate else false
*/
- public boolean tryAllocateMemory(long size, T triggerParam) {
+ public boolean tryAllocateMemory(long size, T triggerParam, boolean runTrigger) {
while (true) {
long current = memoryUsage.get();
long newUsage = current + size;
@@ -70,7 +70,9 @@ public class MemoryController<T> {
}
if (memoryUsage.compareAndSet(current, newUsage)) {
- checkTrigger(newUsage, triggerParam);
+ if (runTrigger) {
+ checkTrigger(newUsage, triggerParam);
+ }
return true;
}
}
@@ -84,10 +86,10 @@ public class MemoryController<T> {
* @throws InterruptedException
*/
public void allocateMemoryMayBlock(long size, T triggerParam) throws InterruptedException {
- if (!tryAllocateMemory(size, triggerParam)) {
+ if (!tryAllocateMemory(size, triggerParam, true)) {
lock.lock();
try {
- while (!tryAllocateMemory(size, triggerParam)) {
+ while (!tryAllocateMemory(size, triggerParam, true)) {
condition.await();
}
} finally {
@@ -107,10 +109,10 @@ public class MemoryController<T> {
public boolean allocateMemoryMayBlock(long size, long timeout, T triggerParam)
throws InterruptedException {
long startTime = System.currentTimeMillis();
- if (!tryAllocateMemory(size, triggerParam)) {
+ if (!tryAllocateMemory(size, triggerParam, true)) {
lock.lock();
try {
- while (tryAllocateMemory(size, triggerParam)) {
+ while (tryAllocateMemory(size, triggerParam, true)) {
if (System.currentTimeMillis() - startTime >= timeout) {
return false;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
index 6fe2feb6e2..c2538bf59d 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
@@ -53,7 +53,14 @@ public class WriteMemoryController extends MemoryController<TsFileProcessor> {
}
public boolean tryAllocateMemory(long size, StorageGroupInfo info, TsFileProcessor processor) {
- boolean success = super.tryAllocateMemory(size, processor);
+ boolean success = false;
+ if (size < REJECT_THRESHOLD) {
+ success = super.tryAllocateMemory(size, processor, true);
+ } else {
+ if (chooseMemtableToFlush(processor)) {
+ success = super.tryAllocateMemory(size, processor, false);
+ }
+ }
if (memoryUsage.get() > REJECT_THRESHOLD && !rejected) {
logger.info(
"Change system to reject status. Triggered by: logical SG ({}), mem cost delta ({}), totalSgMemCost ({}).",
@@ -100,15 +107,15 @@ public class WriteMemoryController extends MemoryController<TsFileProcessor> {
flushingMemory.addAndGet(size);
}
- protected void chooseMemtableToFlush(TsFileProcessor currentTsFileProcessor) {
+ protected boolean chooseMemtableToFlush(TsFileProcessor currentTsFileProcessor) {
// If invoke flush by replaying logs, do not flush now!
if (infoSet.size() == 0) {
- return;
+ return false;
}
long memCost = 0;
long activeMemSize = memoryUsage.get() - flushingMemory.get();
if (activeMemSize - memCost < FLUSH_THRESHOLD) {
- return;
+ return false;
}
PriorityQueue<TsFileProcessor> allTsFileProcessors =
new PriorityQueue<>(
@@ -117,10 +124,11 @@ public class WriteMemoryController extends MemoryController<TsFileProcessor> {
allTsFileProcessors.addAll(storageGroupInfo.getAllReportedTsp());
}
long selectedCount = 0;
+ boolean currentTsFileProcessorSelected = false;
while (activeMemSize - memCost > FLUSH_THRESHOLD) {
if (allTsFileProcessors.isEmpty()
|| allTsFileProcessors.peek().getWorkMemTableRamCost() == 0) {
- return;
+ return false;
}
TsFileProcessor selectedTsFileProcessor = allTsFileProcessors.poll();
if (selectedTsFileProcessor == null) {
@@ -130,15 +138,14 @@ public class WriteMemoryController extends MemoryController<TsFileProcessor> {
|| selectedTsFileProcessor.getWorkMemTable().shouldFlush()) {
continue;
}
+ if (selectedTsFileProcessor == currentTsFileProcessor) {
+ currentTsFileProcessorSelected = true;
+ }
memCost += selectedTsFileProcessor.getWorkMemTableRamCost();
selectedTsFileProcessor.setWorkMemTableShouldFlush();
flushTaskSubmitThreadPool.submit(selectedTsFileProcessor::submitAFlushTask);
selectedCount++;
}
- logger.info(
- "Select {} memtable to flush, flushing memory is {}, remaining memory is {}",
- selectedCount,
- flushingMemory.get(),
- memoryUsage.get() - flushingMemory.get());
+ return currentTsFileProcessorSelected;
}
}