You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2022/07/07 12:44:33 UTC
[iotdb] branch IOTDB-3164 updated: temp
This is an automated email from the ASF dual-hosted git repository.
marklau99 pushed a commit to branch IOTDB-3164
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/IOTDB-3164 by this push:
new dd3a0ce957 temp
dd3a0ce957 is described below
commit dd3a0ce9572d232df6dd7fa360b0912874edf615
Author: Liu Xuxin <li...@outlook.com>
AuthorDate: Thu Jul 7 20:44:20 2022 +0800
temp
---
.../org/apache/iotdb/db/engine/StorageEngine.java | 4 +-
.../db/engine/storagegroup/TsFileProcessor.java | 4 +-
.../db/rescon/{ => memory}/MemoryController.java | 42 +++++++------
.../db/rescon/memory/MemoryControllerTrigger.java | 23 +++++++
.../db/rescon/memory/WriteMemoryController.java | 73 ++++++++++++++++++++++
5 files changed, 123 insertions(+), 23 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index e41870df46..701e5acc96 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -59,7 +59,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
-import org.apache.iotdb.db.rescon.SystemInfo;
+import org.apache.iotdb.db.rescon.memory.WriteMemoryController;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.ThreadUtils;
import org.apache.iotdb.db.utils.UpgradeUtils;
@@ -195,7 +195,7 @@ public class StorageEngine implements IService {
public static void blockInsertionIfReject(TsFileProcessor tsFileProcessor)
throws WriteProcessRejectException {
long startTime = System.currentTimeMillis();
- while (SystemInfo.getInstance().isRejected()) {
+ while (WriteMemoryController.getInstance().isRejected()) {
if (tsFileProcessor != null && tsFileProcessor.shouldFlush()) {
break;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 621eba790b..f906c47a4d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -56,6 +56,7 @@ import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.rescon.MemTableManager;
import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
import org.apache.iotdb.db.rescon.SystemInfo;
+import org.apache.iotdb.db.rescon.memory.WriteMemoryController;
import org.apache.iotdb.db.sync.sender.manager.TsFileSyncManager;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
@@ -785,8 +786,9 @@ public class TsFileProcessor {
storageGroupInfo.addStorageGroupMemCost(memTableIncrement);
tsFileProcessorInfo.addTSPMemCost(chunkMetadataIncrement);
if (storageGroupInfo.needToReportToSystem()) {
+ WriteMemoryController controller = WriteMemoryController.getInstance();
try {
- if (!SystemInfo.getInstance().reportStorageGroupStatus(storageGroupInfo, this)) {
+ if (!controller.tryAllocateMemory(memTableIncrement, this)) {
StorageEngine.blockInsertionIfReject(this);
}
} catch (WriteProcessRejectException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/MemoryController.java b/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java
similarity index 76%
rename from server/src/main/java/org/apache/iotdb/db/rescon/MemoryController.java
rename to server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java
index 39263ff8e7..044b691ef7 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/MemoryController.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.rescon;
+package org.apache.iotdb.db.rescon.memory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,15 +27,15 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
-public class MemoryController {
+public class MemoryController<T> {
private static Logger log = LoggerFactory.getLogger(MemoryController.class);
- private AtomicLong memoryUsage = new AtomicLong(0);
- private AtomicBoolean triggerRunning = new AtomicBoolean();
- private long triggerThreshold = -1;
- private long limitSize = -1;
- private ReentrantLock lock = new ReentrantLock(false);
- private Condition condition = lock.newCondition();
- private Runnable trigger = null;
+ protected AtomicLong memoryUsage = new AtomicLong(0);
+ protected AtomicBoolean triggerRunning = new AtomicBoolean();
+ protected long triggerThreshold = -1;
+ protected long limitSize = -1;
+ protected ReentrantLock lock = new ReentrantLock(false);
+ protected Condition condition = lock.newCondition();
+ protected MemoryControllerTrigger<T> trigger = null;
public MemoryController(long limitSize) {
this.limitSize = limitSize;
@@ -45,7 +45,8 @@ public class MemoryController {
* Initialize MemoryController with a trigger. The trigger will run if the memory usage exceeds
* the trigger threshold.
*/
- public MemoryController(long limitSize, long triggerThreshold, Runnable trigger) {
+ public MemoryController(
+ long limitSize, long triggerThreshold, MemoryControllerTrigger<T> trigger) {
this.limitSize = limitSize;
this.triggerThreshold = triggerThreshold;
this.trigger = trigger;
@@ -57,7 +58,7 @@ public class MemoryController {
* @param size
* @return true if success to allocate else false
*/
- public boolean tryAllocateMemory(long size) {
+ public boolean tryAllocateMemory(long size, T triggerParam) {
while (true) {
long current = memoryUsage.get();
long newUsage = current + size;
@@ -69,7 +70,7 @@ public class MemoryController {
}
if (memoryUsage.compareAndSet(current, newUsage)) {
- checkTrigger(current, newUsage);
+ checkTrigger(current, newUsage, triggerParam);
return true;
}
}
@@ -82,11 +83,11 @@ public class MemoryController {
* @param size
* @throws InterruptedException
*/
- public void allocateMemoryMayBlock(long size) throws InterruptedException {
- if (!tryAllocateMemory(size)) {
+ public void allocateMemoryMayBlock(long size, T triggerParam) throws InterruptedException {
+ if (!tryAllocateMemory(size, triggerParam)) {
lock.lock();
try {
- while (!tryAllocateMemory(size)) {
+ while (!tryAllocateMemory(size, triggerParam)) {
condition.await();
}
} finally {
@@ -103,12 +104,13 @@ public class MemoryController {
* @throws InterruptedException
* @return true if success to allocate else false
*/
- public boolean allocateMemoryMayBlock(long size, long timeout) throws InterruptedException {
+ public boolean allocateMemoryMayBlock(long size, long timeout, T triggerParam)
+ throws InterruptedException {
long startTime = System.currentTimeMillis();
- if (!tryAllocateMemory(size)) {
+ if (!tryAllocateMemory(size, triggerParam)) {
lock.lock();
try {
- while (tryAllocateMemory(size)) {
+ while (tryAllocateMemory(size, triggerParam)) {
if (System.currentTimeMillis() - startTime >= timeout) {
return false;
}
@@ -135,11 +137,11 @@ public class MemoryController {
}
}
- private void checkTrigger(long prevUsage, long newUsage) {
+ private void checkTrigger(long prevUsage, long newUsage, T triggerParam) {
if (newUsage >= triggerThreshold && prevUsage < triggerThreshold && trigger != null) {
if (triggerRunning.compareAndSet(false, true)) {
try {
- trigger.run();
+ trigger.run(triggerParam);
} finally {
triggerRunning.set(false);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryControllerTrigger.java b/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryControllerTrigger.java
new file mode 100644
index 0000000000..1651b52752
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryControllerTrigger.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.rescon.memory;
+
+public interface MemoryControllerTrigger<T> {
+ void run(T e);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
new file mode 100644
index 0000000000..8dc235fdb5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.rescon.memory;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.storagegroup.StorageGroupInfo;
+import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+
+public class WriteMemoryController extends MemoryController<TsFileProcessor> {
+ private static volatile WriteMemoryController INSTANCE;
+ private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ private static final long memorySizeForWrite = config.getAllocateMemoryForWrite();
+ private static final double FLUSH_THRESHOLD = memorySizeForWrite * config.getFlushProportion();
+ private static final double REJECT_THRESHOLD = memorySizeForWrite * config.getRejectProportion();
+ private volatile boolean rejected = false;
+ private volatile long flushingMemory = 0;
+ private Map<StorageGroupInfo, Long> reportedStorageGroupMemCostMap = new ConcurrentHashMap<>();
+ private ExecutorService flushTaskSubmitThreadPool =
+ IoTDBThreadPoolFactory.newSingleThreadExecutor("FlushTask-Submit-Pool");
+
+ public WriteMemoryController(long limitSize) {
+ super(limitSize);
+ this.triggerThreshold = (long) FLUSH_THRESHOLD;
+ this.trigger = this::chooseMemtableToFlush;
+ }
+
+ public boolean tryAllocateMemory(long size, StorageGroupInfo info, TsFileProcessor processor) {
+ boolean success = super.tryAllocateMemory(size, processor);
+ if (memoryUsage.get() > REJECT_THRESHOLD) {
+ rejected = true;
+ }
+ return success;
+ }
+
+ public boolean isRejected() {
+ return rejected;
+ }
+
+ public static WriteMemoryController getInstance() {
+ if (INSTANCE == null) {
+ synchronized (WriteMemoryController.class) {
+ if (INSTANCE == null) {
+ INSTANCE = new WriteMemoryController(memorySizeForWrite);
+ }
+ }
+ }
+ return INSTANCE;
+ }
+
+ protected void chooseMemtableToFlush(TsFileProcessor processor) {}
+}