You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2022/07/18 03:22:59 UTC
[iotdb] 02/02: temp
This is an automated email from the ASF dual-hosted git repository.
marklau99 pushed a commit to branch IOTDB-3164
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 43b8012725c4b75891c7a452a9f198350f236c12
Author: Liu Xuxin <li...@outlook.com>
AuthorDate: Mon Jul 18 11:22:43 2022 +0800
temp
---
.../iotdb/db/engine/flush/MemTableFlushTask.java | 7 ++++---
.../db/engine/storagegroup/StorageGroupInfo.java | 10 +++++++---
.../java/org/apache/iotdb/db/rescon/SystemInfo.java | 4 ++--
.../db/rescon/memory/WriteMemoryController.java | 21 ++++++++++++++++-----
4 files changed, 29 insertions(+), 13 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index eeea38fcd0..f91c346fb6 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.engine.memtable.IWritableMemChunkGroup;
import org.apache.iotdb.db.exception.runtime.FlushRunTimeException;
import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
import org.apache.iotdb.db.rescon.SystemInfo;
+import org.apache.iotdb.db.rescon.memory.WriteMemoryController;
import org.apache.iotdb.db.service.metrics.MetricsService;
import org.apache.iotdb.db.service.metrics.enums.Metric;
import org.apache.iotdb.db.service.metrics.enums.Tag;
@@ -103,8 +104,7 @@ public class MemTableFlushTask {
if (config.isEnableMemControl() && SystemInfo.getInstance().isEncodingFasterThanIo()) {
estimatedTemporaryMemSize =
memTable.memSize() / memTable.getSeriesNumber() * config.getIoTaskQueueSizeForFlushing();
- SystemInfo.getInstance().applyTemporaryMemoryForFlushing(estimatedTemporaryMemSize);
- // TODO: ALLOCATE IN WRITE MEMORY CONTROLLER
+ WriteMemoryController.getInstance().applyExternalMemoryForFlushing(estimatedTemporaryMemSize);
}
long start = System.currentTimeMillis();
long sortTime = 0;
@@ -152,7 +152,8 @@ public class MemTableFlushTask {
if (config.isEnableMemControl()) {
if (estimatedTemporaryMemSize != 0) {
- SystemInfo.getInstance().releaseTemporaryMemoryForFlushing(estimatedTemporaryMemSize);
+ WriteMemoryController.getInstance()
+ .releaseExternalMemoryForFlushing(estimatedTemporaryMemSize);
}
SystemInfo.getInstance().setEncodingFasterThanIo(ioTime >= memSerializeTime);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
index e3f3769dfa..996bc0c7e4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
@@ -39,7 +39,7 @@ public class StorageGroupInfo {
private long storageGroupSizeReportThreshold =
IoTDBDescriptor.getInstance().getConfig().getStorageGroupSizeReportThreshold();
- private AtomicLong lastReportedSize = new AtomicLong();
+ private AtomicLong lastAllocateSize = new AtomicLong();
/** A set of all unclosed TsFileProcessors in this SG */
private List<TsFileProcessor> reportedTsps = new CopyOnWriteArrayList<>();
@@ -76,8 +76,12 @@ public class StorageGroupInfo {
return reportedTsps;
}
- public void setLastReportedSize(long size) {
- lastReportedSize.set(size);
+ public void setLastAllocateSize(long size) {
+ lastAllocateSize.set(size);
+ }
+
+ public boolean needToAllocate(long newSize) {
+ return true;
}
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
index cb65841996..7e9961141f 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
@@ -77,7 +77,7 @@ public class SystemInfo {
totalStorageGroupMemCost);
}
reportedStorageGroupMemCostMap.put(storageGroupInfo, storageGroupInfo.getMemCost());
- storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost());
+ storageGroupInfo.setLastAllocateSize(storageGroupInfo.getMemCost());
if (totalStorageGroupMemCost < FLUSH_THERSHOLD) {
return true;
} else if (totalStorageGroupMemCost >= FLUSH_THERSHOLD
@@ -123,7 +123,7 @@ public class SystemInfo {
if (reportedStorageGroupMemCostMap.containsKey(storageGroupInfo)) {
delta = reportedStorageGroupMemCostMap.get(storageGroupInfo) - storageGroupInfo.getMemCost();
this.totalStorageGroupMemCost -= delta;
- storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost());
+ storageGroupInfo.setLastAllocateSize(storageGroupInfo.getMemCost());
reportedStorageGroupMemCostMap.put(storageGroupInfo, storageGroupInfo.getMemCost());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
index a758cb6c17..9d1af8425f 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
@@ -37,15 +37,14 @@ public class WriteMemoryController extends MemoryController<TsFileProcessor> {
private static final Logger logger = LoggerFactory.getLogger(WriteMemoryController.class);
private static volatile WriteMemoryController INSTANCE;
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- private static final long memorySizeForWrite = config.getAllocateMemoryForWrite();
- private static final double FLUSH_THRESHOLD = memorySizeForWrite * config.getFlushProportion();
- private static final double END_FLUSH_THRESHOLD = 0.7 * FLUSH_THRESHOLD;
- private static final double REJECT_THRESHOLD = memorySizeForWrite * config.getRejectProportion();
+ private static long memorySizeForWrite = config.getAllocateMemoryForWrite();
+ private static double FLUSH_THRESHOLD = memorySizeForWrite * config.getFlushProportion();
+ private static double REJECT_THRESHOLD = memorySizeForWrite * config.getRejectProportion();
private volatile boolean rejected = false;
private AtomicLong flushingMemory = new AtomicLong(0);
private Set<StorageGroupInfo> infoSet = new CopyOnWriteArraySet<>();
private ExecutorService flushTaskSubmitThreadPool =
- IoTDBThreadPoolFactory.newFixedThreadPool(2, "FlushTask-Submit-Pool");
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "FlushTask-Submit-Pool");
public WriteMemoryController(long limitSize) {
super(limitSize);
@@ -97,6 +96,18 @@ public class WriteMemoryController extends MemoryController<TsFileProcessor> {
return INSTANCE;
}
+ public void applyExternalMemoryForFlushing(long size) {
+ memorySizeForWrite -= size;
+ FLUSH_THRESHOLD = memorySizeForWrite * config.getFlushProportion();
+ REJECT_THRESHOLD = memorySizeForWrite * config.getRejectProportion();
+ }
+
+ public void releaseExternalMemoryForFlushing(long size) {
+ memorySizeForWrite -= size;
+ FLUSH_THRESHOLD = memorySizeForWrite * config.getFlushProportion();
+ REJECT_THRESHOLD = memorySizeForWrite * config.getRejectProportion();
+ }
+
protected void chooseMemtableToFlush(TsFileProcessor currentTsFileProcessor) {
// If invoke flush by replaying logs, do not flush now!
if (infoSet.size() == 0) {