You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2022/07/07 12:44:33 UTC

[iotdb] branch IOTDB-3164 updated: temp

This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch IOTDB-3164
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/IOTDB-3164 by this push:
     new dd3a0ce957 temp
dd3a0ce957 is described below

commit dd3a0ce9572d232df6dd7fa360b0912874edf615
Author: Liu Xuxin <li...@outlook.com>
AuthorDate: Thu Jul 7 20:44:20 2022 +0800

    temp
---
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  4 +-
 .../db/engine/storagegroup/TsFileProcessor.java    |  4 +-
 .../db/rescon/{ => memory}/MemoryController.java   | 42 +++++++------
 .../db/rescon/memory/MemoryControllerTrigger.java  | 23 +++++++
 .../db/rescon/memory/WriteMemoryController.java    | 73 ++++++++++++++++++++++
 5 files changed, 123 insertions(+), 23 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index e41870df46..701e5acc96 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -59,7 +59,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
-import org.apache.iotdb.db.rescon.SystemInfo;
+import org.apache.iotdb.db.rescon.memory.WriteMemoryController;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.ThreadUtils;
 import org.apache.iotdb.db.utils.UpgradeUtils;
@@ -195,7 +195,7 @@ public class StorageEngine implements IService {
   public static void blockInsertionIfReject(TsFileProcessor tsFileProcessor)
       throws WriteProcessRejectException {
     long startTime = System.currentTimeMillis();
-    while (SystemInfo.getInstance().isRejected()) {
+    while (WriteMemoryController.getInstance().isRejected()) {
       if (tsFileProcessor != null && tsFileProcessor.shouldFlush()) {
         break;
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 621eba790b..f906c47a4d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -56,6 +56,7 @@ import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.rescon.MemTableManager;
 import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
 import org.apache.iotdb.db.rescon.SystemInfo;
+import org.apache.iotdb.db.rescon.memory.WriteMemoryController;
 import org.apache.iotdb.db.sync.sender.manager.TsFileSyncManager;
 import org.apache.iotdb.db.utils.MemUtils;
 import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
@@ -785,8 +786,9 @@ public class TsFileProcessor {
     storageGroupInfo.addStorageGroupMemCost(memTableIncrement);
     tsFileProcessorInfo.addTSPMemCost(chunkMetadataIncrement);
     if (storageGroupInfo.needToReportToSystem()) {
+      WriteMemoryController controller = WriteMemoryController.getInstance();
       try {
-        if (!SystemInfo.getInstance().reportStorageGroupStatus(storageGroupInfo, this)) {
+        if (!controller.tryAllocateMemory(memTableIncrement, this)) {
           StorageEngine.blockInsertionIfReject(this);
         }
       } catch (WriteProcessRejectException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/MemoryController.java b/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java
similarity index 76%
rename from server/src/main/java/org/apache/iotdb/db/rescon/MemoryController.java
rename to server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java
index 39263ff8e7..044b691ef7 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/MemoryController.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.rescon;
+package org.apache.iotdb.db.rescon.memory;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -27,15 +27,15 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
-public class MemoryController {
+public class MemoryController<T> {
   private static Logger log = LoggerFactory.getLogger(MemoryController.class);
-  private AtomicLong memoryUsage = new AtomicLong(0);
-  private AtomicBoolean triggerRunning = new AtomicBoolean();
-  private long triggerThreshold = -1;
-  private long limitSize = -1;
-  private ReentrantLock lock = new ReentrantLock(false);
-  private Condition condition = lock.newCondition();
-  private Runnable trigger = null;
+  protected AtomicLong memoryUsage = new AtomicLong(0);
+  protected AtomicBoolean triggerRunning = new AtomicBoolean();
+  protected long triggerThreshold = -1;
+  protected long limitSize = -1;
+  protected ReentrantLock lock = new ReentrantLock(false);
+  protected Condition condition = lock.newCondition();
+  protected MemoryControllerTrigger<T> trigger = null;
 
   public MemoryController(long limitSize) {
     this.limitSize = limitSize;
@@ -45,7 +45,8 @@ public class MemoryController {
    * Initialize MemoryController with a trigger. The trigger will run if the memory usage exceeds
    * the trigger threshold.
    */
-  public MemoryController(long limitSize, long triggerThreshold, Runnable trigger) {
+  public MemoryController(
+      long limitSize, long triggerThreshold, MemoryControllerTrigger<T> trigger) {
     this.limitSize = limitSize;
     this.triggerThreshold = triggerThreshold;
     this.trigger = trigger;
@@ -57,7 +58,7 @@ public class MemoryController {
    * @param size
    * @return true if success to allocate else false
    */
-  public boolean tryAllocateMemory(long size) {
+  public boolean tryAllocateMemory(long size, T triggerParam) {
     while (true) {
       long current = memoryUsage.get();
       long newUsage = current + size;
@@ -69,7 +70,7 @@ public class MemoryController {
       }
 
       if (memoryUsage.compareAndSet(current, newUsage)) {
-        checkTrigger(current, newUsage);
+        checkTrigger(current, newUsage, triggerParam);
         return true;
       }
     }
@@ -82,11 +83,11 @@ public class MemoryController {
    * @param size
    * @throws InterruptedException
    */
-  public void allocateMemoryMayBlock(long size) throws InterruptedException {
-    if (!tryAllocateMemory(size)) {
+  public void allocateMemoryMayBlock(long size, T triggerParam) throws InterruptedException {
+    if (!tryAllocateMemory(size, triggerParam)) {
       lock.lock();
       try {
-        while (!tryAllocateMemory(size)) {
+        while (!tryAllocateMemory(size, triggerParam)) {
           condition.await();
         }
       } finally {
@@ -103,12 +104,13 @@ public class MemoryController {
    * @throws InterruptedException
    * @return true if success to allocate else false
    */
-  public boolean allocateMemoryMayBlock(long size, long timeout) throws InterruptedException {
+  public boolean allocateMemoryMayBlock(long size, long timeout, T triggerParam)
+      throws InterruptedException {
     long startTime = System.currentTimeMillis();
-    if (!tryAllocateMemory(size)) {
+    if (!tryAllocateMemory(size, triggerParam)) {
       lock.lock();
       try {
-        while (tryAllocateMemory(size)) {
+        while (tryAllocateMemory(size, triggerParam)) {
           if (System.currentTimeMillis() - startTime >= timeout) {
             return false;
           }
@@ -135,11 +137,11 @@ public class MemoryController {
     }
   }
 
-  private void checkTrigger(long prevUsage, long newUsage) {
+  private void checkTrigger(long prevUsage, long newUsage, T triggerParam) {
     if (newUsage >= triggerThreshold && prevUsage < triggerThreshold && trigger != null) {
       if (triggerRunning.compareAndSet(false, true)) {
         try {
-          trigger.run();
+          trigger.run(triggerParam);
         } finally {
           triggerRunning.set(false);
         }
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryControllerTrigger.java b/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryControllerTrigger.java
new file mode 100644
index 0000000000..1651b52752
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryControllerTrigger.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.rescon.memory;
+
+public interface MemoryControllerTrigger<T> {
+  void run(T e);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
new file mode 100644
index 0000000000..8dc235fdb5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.rescon.memory;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.storagegroup.StorageGroupInfo;
+import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+
+public class WriteMemoryController extends MemoryController<TsFileProcessor> {
+  private static volatile WriteMemoryController INSTANCE;
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final long memorySizeForWrite = config.getAllocateMemoryForWrite();
+  private static final double FLUSH_THRESHOLD = memorySizeForWrite * config.getFlushProportion();
+  private static final double REJECT_THRESHOLD = memorySizeForWrite * config.getRejectProportion();
+  private volatile boolean rejected = false;
+  private volatile long flushingMemory = 0;
+  private Map<StorageGroupInfo, Long> reportedStorageGroupMemCostMap = new ConcurrentHashMap<>();
+  private ExecutorService flushTaskSubmitThreadPool =
+      IoTDBThreadPoolFactory.newSingleThreadExecutor("FlushTask-Submit-Pool");
+
+  public WriteMemoryController(long limitSize) {
+    super(limitSize);
+    this.triggerThreshold = (long) FLUSH_THRESHOLD;
+    this.trigger = this::chooseMemtableToFlush;
+  }
+
+  public boolean tryAllocateMemory(long size, StorageGroupInfo info, TsFileProcessor processor) {
+    boolean success = super.tryAllocateMemory(size, processor);
+    if (memoryUsage.get() > REJECT_THRESHOLD) {
+      rejected = true;
+    }
+    return success;
+  }
+
+  public boolean isRejected() {
+    return rejected;
+  }
+
+  public static WriteMemoryController getInstance() {
+    if (INSTANCE == null) {
+      synchronized (WriteMemoryController.class) {
+        if (INSTANCE == null) {
+          INSTANCE = new WriteMemoryController(memorySizeForWrite);
+        }
+      }
+    }
+    return INSTANCE;
+  }
+
+  protected void chooseMemtableToFlush(TsFileProcessor processor) {}
+}