You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2020/11/03 13:42:41 UTC

[GitHub] [iotdb] qiaojialin commented on a change in pull request #1524: [IOTDB-776] Control the memory usage of flushing the memtable

qiaojialin commented on a change in pull request #1524:
URL: https://github.com/apache/iotdb/pull/1524#discussion_r515754171



##########
File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.storagegroup;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.rescon.SystemInfo;
+
+/**
+ * The storageGroupInfo records the total memory cost of the Storage Group.
+ */
+public class StorageGroupInfo {
+
+  private StorageGroupProcessor storageGroupProcessor;
+
+  /**
+   * The total Storage group memory cost
+   */
+  private AtomicLong memTableCost;

Review comment:
       memoryCost

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.storagegroup;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.rescon.SystemInfo;
+
+/**
+ * The storageGroupInfo records the total memory cost of the Storage Group.
+ */
+public class StorageGroupInfo {
+
+  private StorageGroupProcessor storageGroupProcessor;
+
+  /**
+   * The total Storage group memory cost
+   */
+  private AtomicLong memTableCost;
+
+  /**
+   * The threshold of reporting it's size to SystemInfo
+   */
+  private long storageGroupSizeReportThreshold = 
+      IoTDBDescriptor.getInstance().getConfig().getStorageGroupSizeReportThreshold();
+
+  private long lastReportedSize = 0L;
+
+  /**
+   * A set of all unclosed TsFileProcessors in this SG
+   */
+  private Set<TsFileProcessor> reportedTsps = new HashSet<>();
+
+  public StorageGroupInfo(StorageGroupProcessor storageGroupProcessor) {
+    this.storageGroupProcessor = storageGroupProcessor;
+    memTableCost = new AtomicLong();
+  }
+
+  public StorageGroupProcessor getStorageGroupProcessor() {
+    return storageGroupProcessor;
+  }
+
+  /**
+   * When create a new TsFileProcessor, call this method to report it
+   */
+  public void reportTsFileProcessorInfo(TsFileProcessor tsFileProcessor) {

Review comment:
       ```suggestion
     public void initTsFileProcessorInfo(TsFileProcessor tsFileProcessor) {
   ```

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
##########
@@ -159,6 +171,21 @@ public void write(InsertTabletPlan insertTabletPlan, int start, int end) {
     }
   }
 
+  @Override
+  public boolean checkIfNeedStartNewChunk(String deviceId, String measurement) {

Review comment:
       ```suggestion
     public boolean checkIfChunkExist(String deviceId, String measurement) {
   ```

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
##########
@@ -159,6 +171,21 @@ public void write(InsertTabletPlan insertTabletPlan, int start, int end) {
     }
   }
 
+  @Override
+  public boolean checkIfNeedStartNewChunk(String deviceId, String measurement) {

Review comment:
       ```suggestion
     public boolean checkIfChunkDoesNotExist(String deviceId, String measurement) {
   ```

##########
File path: server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.rescon;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.flush.FlushManager;
+import org.apache.iotdb.db.engine.storagegroup.StorageGroupInfo;
+import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SystemInfo {
+
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final Logger logger = LoggerFactory.getLogger(SystemInfo.class);
+
+  private long totalSgMemCost;
+  private volatile boolean rejected = false;
+
+  private Map<StorageGroupInfo, Long> reportedSgMemCostMap = new HashMap<>();
+
+  private static final double FLUSH_PROPORTION = config.getFlushProportion();
+  private static final double REJECT_PROPORTION = config.getRejectProportion();
+
+  /**
+   * Report current mem cost of storage group to system.
+   *
+   * @param storageGroupInfo storage group
+   */
+  public synchronized void reportStorageGroupStatus(StorageGroupInfo storageGroupInfo) {
+    long delta = storageGroupInfo.getSgMemCost() -
+        reportedSgMemCostMap.getOrDefault(storageGroupInfo, 0L);
+    totalSgMemCost += delta;
+    logger.debug("Report Storage Group Status to system. "
+          + "Current sg mem cost is {}, delta is {}.", totalSgMemCost, delta);

Review comment:
       ```suggestion
       logger.debug("Report Storage Group Status to the system. "
             + "After adding {}, current sg mem cost is {}.", delta, totalSgMemCost);
   ```

##########
File path: server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.rescon;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.flush.FlushManager;
+import org.apache.iotdb.db.engine.storagegroup.StorageGroupInfo;
+import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SystemInfo {
+
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final Logger logger = LoggerFactory.getLogger(SystemInfo.class);
+
+  private long totalSgMemCost;
+  private volatile boolean rejected = false;
+
+  private Map<StorageGroupInfo, Long> reportedSgMemCostMap = new HashMap<>();
+
+  private static final double FLUSH_PROPORTION = config.getFlushProportion();
+  private static final double REJECT_PROPORTION = config.getRejectProportion();
+
+  /**
+   * Report current mem cost of storage group to system.
+   *
+   * @param storageGroupInfo storage group
+   */
+  public synchronized void reportStorageGroupStatus(StorageGroupInfo storageGroupInfo) {
+    long delta = storageGroupInfo.getSgMemCost() -
+        reportedSgMemCostMap.getOrDefault(storageGroupInfo, 0L);
+    totalSgMemCost += delta;
+    logger.debug("Report Storage Group Status to system. "
+          + "Current sg mem cost is {}, delta is {}.", totalSgMemCost, delta);
+    reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getSgMemCost());
+    storageGroupInfo.setLastReportedSize(storageGroupInfo.getSgMemCost());
+    if (getTotalMemCost() >= config.getAllocateMemoryForWrite() * FLUSH_PROPORTION) {

Review comment:
       ```suggestion
       if (totalSgMemCost >= config.getAllocateMemoryForWrite() * FLUSH_PROPORTION) {
   ```

##########
File path: server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.rescon;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.flush.FlushManager;
+import org.apache.iotdb.db.engine.storagegroup.StorageGroupInfo;
+import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SystemInfo {
+
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final Logger logger = LoggerFactory.getLogger(SystemInfo.class);
+
+  private long totalSgMemCost;
+  private volatile boolean rejected = false;
+
+  private Map<StorageGroupInfo, Long> reportedSgMemCostMap = new HashMap<>();
+
+  private static final double FLUSH_PROPORTION = config.getFlushProportion();
+  private static final double REJECT_PROPORTION = config.getRejectProportion();
+
+  /**
+   * Report current mem cost of storage group to system.
+   *
+   * @param storageGroupInfo storage group
+   */
+  public synchronized void reportStorageGroupStatus(StorageGroupInfo storageGroupInfo) {
+    long delta = storageGroupInfo.getSgMemCost() -
+        reportedSgMemCostMap.getOrDefault(storageGroupInfo, 0L);
+    totalSgMemCost += delta;
+    logger.debug("Report Storage Group Status to system. "
+          + "Current sg mem cost is {}, delta is {}.", totalSgMemCost, delta);
+    reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getSgMemCost());
+    storageGroupInfo.setLastReportedSize(storageGroupInfo.getSgMemCost());
+    if (getTotalMemCost() >= config.getAllocateMemoryForWrite() * FLUSH_PROPORTION) {
+      logger.debug("The total storage group mem costs are too large, call for flushing. "
+          + "Current sg cost is {}", totalSgMemCost);
+      flush();
+    }
+    if (getTotalMemCost() >= config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {
+      logger.debug("Change system to reject status...");

Review comment:
       ```suggestion
         logger.info("Change system to reject status...");
   ```
   Add current memory info

##########
File path: server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.rescon;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.flush.FlushManager;
+import org.apache.iotdb.db.engine.storagegroup.StorageGroupInfo;
+import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SystemInfo {
+
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final Logger logger = LoggerFactory.getLogger(SystemInfo.class);
+
+  private long totalSgMemCost;
+  private volatile boolean rejected = false;
+
+  private Map<StorageGroupInfo, Long> reportedSgMemCostMap = new HashMap<>();
+
+  private static final double FLUSH_PROPORTION = config.getFlushProportion();
+  private static final double REJECT_PROPORTION = config.getRejectProportion();
+
+  /**
+   * Report current mem cost of storage group to system.
+   *
+   * @param storageGroupInfo storage group
+   */
+  public synchronized void reportStorageGroupStatus(StorageGroupInfo storageGroupInfo) {
+    long delta = storageGroupInfo.getSgMemCost() -
+        reportedSgMemCostMap.getOrDefault(storageGroupInfo, 0L);
+    totalSgMemCost += delta;
+    logger.debug("Report Storage Group Status to system. "
+          + "Current sg mem cost is {}, delta is {}.", totalSgMemCost, delta);
+    reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getSgMemCost());
+    storageGroupInfo.setLastReportedSize(storageGroupInfo.getSgMemCost());
+    if (getTotalMemCost() >= config.getAllocateMemoryForWrite() * FLUSH_PROPORTION) {
+      logger.debug("The total storage group mem costs are too large, call for flushing. "
+          + "Current sg cost is {}", totalSgMemCost);
+      flush();
+    }
+    if (getTotalMemCost() >= config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {

Review comment:
       ```suggestion
       if (totalSgMemCost >= config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {
   ```

##########
File path: server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.rescon;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.flush.FlushManager;
+import org.apache.iotdb.db.engine.storagegroup.StorageGroupInfo;
+import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SystemInfo {
+
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final Logger logger = LoggerFactory.getLogger(SystemInfo.class);
+
+  private long totalSgMemCost;
+  private volatile boolean rejected = false;
+
+  private Map<StorageGroupInfo, Long> reportedSgMemCostMap = new HashMap<>();
+
+  private static final double FLUSH_PROPORTION = config.getFlushProportion();
+  private static final double REJECT_PROPORTION = config.getRejectProportion();
+
+  /**
+   * Report current mem cost of storage group to system.
+   *
+   * @param storageGroupInfo storage group
+   */
+  public synchronized void reportStorageGroupStatus(StorageGroupInfo storageGroupInfo) {
+    long delta = storageGroupInfo.getSgMemCost() -
+        reportedSgMemCostMap.getOrDefault(storageGroupInfo, 0L);
+    totalSgMemCost += delta;
+    logger.debug("Report Storage Group Status to system. "
+          + "Current sg mem cost is {}, delta is {}.", totalSgMemCost, delta);
+    reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getSgMemCost());
+    storageGroupInfo.setLastReportedSize(storageGroupInfo.getSgMemCost());
+    if (getTotalMemCost() >= config.getAllocateMemoryForWrite() * FLUSH_PROPORTION) {
+      logger.debug("The total storage group mem costs are too large, call for flushing. "
+          + "Current sg cost is {}", totalSgMemCost);
+      flush();
+    }
+    if (getTotalMemCost() >= config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {
+      logger.debug("Change system to reject status...");
+      rejected = true;
+    }
+  }
+
+  /**
+   * Report resetting the mem cost of sg to system. It will be invoked after closing file.
+   *
+   * @param storageGroupInfo storage group
+   */
+  public synchronized void resetStorageGroupStatus(StorageGroupInfo storageGroupInfo) {
+    if (reportedSgMemCostMap.containsKey(storageGroupInfo)) {
+      this.totalSgMemCost -= reportedSgMemCostMap.get(storageGroupInfo)
+          - storageGroupInfo.getSgMemCost();
+      storageGroupInfo.setLastReportedSize(storageGroupInfo.getSgMemCost());
+      if (getTotalMemCost() > config.getAllocateMemoryForWrite() * FLUSH_PROPORTION) {
+        logger.debug("Some sg memery released, call flush.");
+        logCost();
+        forceFlush();
+      }
+      if (getTotalMemCost() < config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {

Review comment:
       ```suggestion
         if (totalSgMemCost < config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {
   ```

##########
File path: server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.rescon;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.flush.FlushManager;
+import org.apache.iotdb.db.engine.storagegroup.StorageGroupInfo;
+import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SystemInfo {
+
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final Logger logger = LoggerFactory.getLogger(SystemInfo.class);
+
+  private long totalSgMemCost;
+  private volatile boolean rejected = false;
+
+  private Map<StorageGroupInfo, Long> reportedSgMemCostMap = new HashMap<>();
+
+  private static final double FLUSH_PROPORTION = config.getFlushProportion();
+  private static final double REJECT_PROPORTION = config.getRejectProportion();
+
+  /**
+   * Report current mem cost of storage group to system.
+   *
+   * @param storageGroupInfo storage group
+   */
+  public synchronized void reportStorageGroupStatus(StorageGroupInfo storageGroupInfo) {
+    long delta = storageGroupInfo.getSgMemCost() -
+        reportedSgMemCostMap.getOrDefault(storageGroupInfo, 0L);
+    totalSgMemCost += delta;
+    logger.debug("Report Storage Group Status to system. "
+          + "Current sg mem cost is {}, delta is {}.", totalSgMemCost, delta);
+    reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getSgMemCost());
+    storageGroupInfo.setLastReportedSize(storageGroupInfo.getSgMemCost());
+    if (getTotalMemCost() >= config.getAllocateMemoryForWrite() * FLUSH_PROPORTION) {
+      logger.debug("The total storage group mem costs are too large, call for flushing. "
+          + "Current sg cost is {}", totalSgMemCost);
+      flush();
+    }
+    if (getTotalMemCost() >= config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {
+      logger.debug("Change system to reject status...");
+      rejected = true;
+    }
+  }
+
+  /**
+   * Report resetting the mem cost of sg to system. It will be invoked after closing file.
+   *
+   * @param storageGroupInfo storage group
+   */
+  public synchronized void resetStorageGroupStatus(StorageGroupInfo storageGroupInfo) {
+    if (reportedSgMemCostMap.containsKey(storageGroupInfo)) {
+      this.totalSgMemCost -= reportedSgMemCostMap.get(storageGroupInfo)
+          - storageGroupInfo.getSgMemCost();
+      storageGroupInfo.setLastReportedSize(storageGroupInfo.getSgMemCost());
+      if (getTotalMemCost() > config.getAllocateMemoryForWrite() * FLUSH_PROPORTION) {

Review comment:
       ```suggestion
         if (totalSgMemCost > config.getAllocateMemoryForWrite() * FLUSH_PROPORTION) {
   ```

##########
File path: server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.rescon;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.flush.FlushManager;
+import org.apache.iotdb.db.engine.storagegroup.StorageGroupInfo;
+import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SystemInfo {
+
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final Logger logger = LoggerFactory.getLogger(SystemInfo.class);
+
+  private long totalSgMemCost;
+  private volatile boolean rejected = false;
+
+  private Map<StorageGroupInfo, Long> reportedSgMemCostMap = new HashMap<>();
+
+  private static final double FLUSH_PROPORTION = config.getFlushProportion();
+  private static final double REJECT_PROPORTION = config.getRejectProportion();
+
+  /**
+   * Report current mem cost of storage group to system.
+   *
+   * @param storageGroupInfo storage group
+   */
+  public synchronized void reportStorageGroupStatus(StorageGroupInfo storageGroupInfo) {
+    long delta = storageGroupInfo.getSgMemCost() -
+        reportedSgMemCostMap.getOrDefault(storageGroupInfo, 0L);
+    totalSgMemCost += delta;
+    logger.debug("Report Storage Group Status to system. "
+          + "Current sg mem cost is {}, delta is {}.", totalSgMemCost, delta);
+    reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getSgMemCost());
+    storageGroupInfo.setLastReportedSize(storageGroupInfo.getSgMemCost());
+    if (getTotalMemCost() >= config.getAllocateMemoryForWrite() * FLUSH_PROPORTION) {
+      logger.debug("The total storage group mem costs are too large, call for flushing. "
+          + "Current sg cost is {}", totalSgMemCost);
+      flush();
+    }
+    if (getTotalMemCost() >= config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {
+      logger.debug("Change system to reject status...");
+      rejected = true;
+    }
+  }
+
+  /**
+   * Report resetting the mem cost of sg to system. It will be invoked after closing file.
+   *
+   * @param storageGroupInfo storage group
+   */
+  public synchronized void resetStorageGroupStatus(StorageGroupInfo storageGroupInfo) {
+    if (reportedSgMemCostMap.containsKey(storageGroupInfo)) {
+      this.totalSgMemCost -= reportedSgMemCostMap.get(storageGroupInfo)
+          - storageGroupInfo.getSgMemCost();
+      storageGroupInfo.setLastReportedSize(storageGroupInfo.getSgMemCost());
+      if (getTotalMemCost() > config.getAllocateMemoryForWrite() * FLUSH_PROPORTION) {
+        logger.debug("Some sg memery released, call flush.");
+        logCost();
+        forceFlush();
+      }
+      if (getTotalMemCost() < config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {
+        logger.debug("Some sg memery released, set system to normal status.");

Review comment:
       ```suggestion
           logger.debug("Some sg memory released, set system to normal status.");
   ```

##########
File path: server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.rescon;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.flush.FlushManager;
+import org.apache.iotdb.db.engine.storagegroup.StorageGroupInfo;
+import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SystemInfo {
+
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final Logger logger = LoggerFactory.getLogger(SystemInfo.class);
+
+  private long totalSgMemCost;
+  private volatile boolean rejected = false;
+
+  private Map<StorageGroupInfo, Long> reportedSgMemCostMap = new HashMap<>();
+
+  private static final double FLUSH_PROPORTION = config.getFlushProportion();
+  private static final double REJECT_PROPORTION = config.getRejectProportion();
+
+  /**
+   * Report current mem cost of storage group to system.
+   *
+   * @param storageGroupInfo storage group
+   */
+  public synchronized void reportStorageGroupStatus(StorageGroupInfo storageGroupInfo) {
+    long delta = storageGroupInfo.getSgMemCost() -
+        reportedSgMemCostMap.getOrDefault(storageGroupInfo, 0L);
+    totalSgMemCost += delta;
+    logger.debug("Report Storage Group Status to system. "
+          + "Current sg mem cost is {}, delta is {}.", totalSgMemCost, delta);
+    reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getSgMemCost());
+    storageGroupInfo.setLastReportedSize(storageGroupInfo.getSgMemCost());
+    if (getTotalMemCost() >= config.getAllocateMemoryForWrite() * FLUSH_PROPORTION) {
+      logger.debug("The total storage group mem costs are too large, call for flushing. "
+          + "Current sg cost is {}", totalSgMemCost);
+      flush();
+    }
+    if (getTotalMemCost() >= config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {
+      logger.debug("Change system to reject status...");
+      rejected = true;
+    }
+  }
+
+  /**
+   * Report resetting the mem cost of sg to system. It will be invoked after closing file.
+   *
+   * @param storageGroupInfo storage group
+   */
+  public synchronized void resetStorageGroupStatus(StorageGroupInfo storageGroupInfo) {
+    if (reportedSgMemCostMap.containsKey(storageGroupInfo)) {
+      this.totalSgMemCost -= reportedSgMemCostMap.get(storageGroupInfo)
+          - storageGroupInfo.getSgMemCost();
+      storageGroupInfo.setLastReportedSize(storageGroupInfo.getSgMemCost());
+      if (getTotalMemCost() > config.getAllocateMemoryForWrite() * FLUSH_PROPORTION) {
+        logger.debug("Some sg memery released, call flush.");
+        logCost();
+        forceFlush();
+      }
+      if (getTotalMemCost() < config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {
+        logger.debug("Some sg memery released, set system to normal status.");
+        logCost();
+        rejected = false;
+      } else {
+        logger.warn("Some sg memery released, but system is still in reject status.");

Review comment:
       ```suggestion
           logger.warn("Some sg memory released, but system is still in reject status.");
   ```

##########
File path: server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.rescon;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.flush.FlushManager;
+import org.apache.iotdb.db.engine.storagegroup.StorageGroupInfo;
+import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SystemInfo {
+
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final Logger logger = LoggerFactory.getLogger(SystemInfo.class);
+
+  private long totalSgMemCost;
+  private volatile boolean rejected = false;
+
+  private Map<StorageGroupInfo, Long> reportedSgMemCostMap = new HashMap<>();
+
+  private static final double FLUSH_PROPORTION = config.getFlushProportion();
+  private static final double REJECT_PROPORTION = config.getRejectProportion();
+
+  /**
+   * Report current mem cost of storage group to system.
+   *
+   * @param storageGroupInfo storage group
+   */
+  public synchronized void reportStorageGroupStatus(StorageGroupInfo storageGroupInfo) {
+    long delta = storageGroupInfo.getSgMemCost() -
+        reportedSgMemCostMap.getOrDefault(storageGroupInfo, 0L);
+    totalSgMemCost += delta;
+    logger.debug("Report Storage Group Status to system. "
+          + "Current sg mem cost is {}, delta is {}.", totalSgMemCost, delta);
+    reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getSgMemCost());
+    storageGroupInfo.setLastReportedSize(storageGroupInfo.getSgMemCost());
+    if (getTotalMemCost() >= config.getAllocateMemoryForWrite() * FLUSH_PROPORTION) {
+      logger.debug("The total storage group mem costs are too large, call for flushing. "
+          + "Current sg cost is {}", totalSgMemCost);
+      flush();
+    }
+    if (getTotalMemCost() >= config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {
+      logger.debug("Change system to reject status...");
+      rejected = true;
+    }
+  }
+
+  /**
+   * Report resetting the mem cost of sg to system. It will be invoked after closing file.
+   *
+   * @param storageGroupInfo storage group
+   */
+  public synchronized void resetStorageGroupStatus(StorageGroupInfo storageGroupInfo) {
+    if (reportedSgMemCostMap.containsKey(storageGroupInfo)) {
+      this.totalSgMemCost -= reportedSgMemCostMap.get(storageGroupInfo)
+          - storageGroupInfo.getSgMemCost();
+      storageGroupInfo.setLastReportedSize(storageGroupInfo.getSgMemCost());
+      if (getTotalMemCost() > config.getAllocateMemoryForWrite() * FLUSH_PROPORTION) {
+        logger.debug("Some sg memery released, call flush.");

Review comment:
       ```suggestion
           logger.debug("Some sg memory released, call flush.");
   ```

##########
File path: server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.rescon;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.flush.FlushManager;
+import org.apache.iotdb.db.engine.storagegroup.StorageGroupInfo;
+import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SystemInfo {
+
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final Logger logger = LoggerFactory.getLogger(SystemInfo.class);
+
+  private long totalSgMemCost;
+  private volatile boolean rejected = false;
+
+  private Map<StorageGroupInfo, Long> reportedSgMemCostMap = new HashMap<>();
+
+  private static final double FLUSH_PROPORTION = config.getFlushProportion();
+  private static final double REJECT_PROPORTION = config.getRejectProportion();
+
+  /**
+   * Report current mem cost of storage group to system.
+   *
+   * @param storageGroupInfo storage group
+   */
+  public synchronized void reportStorageGroupStatus(StorageGroupInfo storageGroupInfo) {
+    long delta = storageGroupInfo.getSgMemCost() -
+        reportedSgMemCostMap.getOrDefault(storageGroupInfo, 0L);
+    totalSgMemCost += delta;
+    logger.debug("Report Storage Group Status to system. "
+          + "Current sg mem cost is {}, delta is {}.", totalSgMemCost, delta);
+    reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getSgMemCost());
+    storageGroupInfo.setLastReportedSize(storageGroupInfo.getSgMemCost());
+    if (getTotalMemCost() >= config.getAllocateMemoryForWrite() * FLUSH_PROPORTION) {
+      logger.debug("The total storage group mem costs are too large, call for flushing. "
+          + "Current sg cost is {}", totalSgMemCost);
+      flush();
+    }
+    if (getTotalMemCost() >= config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {
+      logger.debug("Change system to reject status...");
+      rejected = true;
+    }
+  }
+
+  /**
+   * Report resetting the mem cost of sg to system. It will be invoked after closing file.
+   *
+   * @param storageGroupInfo storage group
+   */
+  public synchronized void resetStorageGroupStatus(StorageGroupInfo storageGroupInfo) {
+    if (reportedSgMemCostMap.containsKey(storageGroupInfo)) {
+      this.totalSgMemCost -= reportedSgMemCostMap.get(storageGroupInfo)
+          - storageGroupInfo.getSgMemCost();
+      storageGroupInfo.setLastReportedSize(storageGroupInfo.getSgMemCost());
+      if (getTotalMemCost() > config.getAllocateMemoryForWrite() * FLUSH_PROPORTION) {
+        logger.debug("Some sg memery released, call flush.");
+        logCost();
+        forceFlush();
+      }
+      if (getTotalMemCost() < config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {
+        logger.debug("Some sg memery released, set system to normal status.");
+        logCost();
+        rejected = false;
+      } else {
+        logger.warn("Some sg memery released, but system is still in reject status.");
+        logCost();
+        rejected = true;
+      }
+      reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getSgMemCost());
+    }
+  }
+
+  private void logCost() {

Review comment:
       ```suggestion
     private void logCurrentTotalSGMemory() {
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org