You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2019/04/18 00:10:05 UTC

[incubator-iotdb] branch refactor_bufferwrite updated: handle memory cost when writing data into mem

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch refactor_bufferwrite
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/refactor_bufferwrite by this push:
     new 5e7a284  handle memory cost when writing data into mem
5e7a284 is described below

commit 5e7a28412dee15227a53ec30f610c74a546d7ca8
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Thu Apr 18 08:09:52 2019 +0800

    handle memory cost when writing data into mem
---
 .../db/engine/overflow/OverflowProcessor.java      | 29 +++++++++++
 .../db/engine/tsfiledata/TsFileProcessor.java      | 58 ++++++++++++++++++----
 .../db/engine/tsfiledata/TsFileProcessorTest.java  | 16 +++---
 3 files changed, 86 insertions(+), 17 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/OverflowProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/OverflowProcessor.java
new file mode 100644
index 0000000..fdccb80
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/OverflowProcessor.java
@@ -0,0 +1,29 @@
+package org.apache.iotdb.db.engine.overflow;
+
+import java.io.IOException;
+import org.apache.iotdb.db.engine.bufferwrite.Action;
+import org.apache.iotdb.db.engine.tsfiledata.TsFileProcessor;
+import org.apache.iotdb.db.engine.version.VersionController;
+import org.apache.iotdb.db.exception.BufferWriteProcessorException;
+import org.apache.iotdb.tsfile.write.schema.FileSchema;
+
+public class OverflowProcessor extends TsFileProcessor {
+
+  /**
+   * constructor of BufferWriteProcessor. data will be stored in baseDir/processorName/ folder.
+   *
+   * @param processorName processor name
+   * @param fileSchemaRef file schema
+   * @throws BufferWriteProcessorException BufferWriteProcessorException
+   */
+  public OverflowProcessor(String processorName,
+      Action beforeFlushAction,
+      Action afterFlushAction,
+      Action afterCloseAction,
+      VersionController versionController,
+      FileSchema fileSchemaRef)
+      throws BufferWriteProcessorException, IOException {
+    super(processorName, beforeFlushAction, afterFlushAction, afterCloseAction, versionController,
+        fileSchemaRef);
+  }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
index 7ef1001..3e7bba4 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
@@ -48,6 +48,7 @@ import org.apache.iotdb.db.engine.bufferwrite.RestorableTsFileIOWriter;
 import org.apache.iotdb.db.engine.filenode.FileNodeManager;
 import org.apache.iotdb.db.engine.filenode.TsFileResource;
 import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
+import org.apache.iotdb.db.engine.memcontrol.BasicMemController.UsageLevel;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.memtable.MemSeriesLazyMerger;
 import org.apache.iotdb.db.engine.memtable.MemTableFlushUtil;
@@ -90,6 +91,10 @@ import org.slf4j.LoggerFactory;
 public class TsFileProcessor extends Processor {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(TsFileProcessor.class);
+  public static final int WRITE_SUCCESS = 1;
+  public static final int WRITE_IN_WARNING_MEM= 0;
+  public static final int WRITE_REJECT_BY_TIME  = -1;
+  public static final int WRITE_REJECT_BY_MEM = -2;
 
   //this is just a part of fileSchemaRef: only the measurements that belong to this TsFileProcessor
   // are in this fileSchemaRef. And, this filed is shared with other classes (i.e., storage group
@@ -259,27 +264,65 @@ public class TsFileProcessor extends Processor {
    * flushing operation will be called.
    *
    * @param plan data to be written
-   * @return true if the tsRecord can be inserted into tsFile. otherwise false (, then you need to
-   * insert it into overflow)
+   * @return - 1 (WRITE_SUCCESS) if the tsRecord can be inserted into tsFile.
+   * - 0 (WRITE_IN_WARNING_MEM) if the memory is UsageLevel.WARNING
+   * - -1 (WRITE_REJECT_BY_TIME) if you need to insert it into overflow
+   * - -2 (WRITE_REJECT_BY_MEM) if the memory is UsageLevel.ERROR
    * @throws BufferWriteProcessorException if a flushing operation occurs and failed.
    */
-  public boolean insert(InsertPlan plan) throws BufferWriteProcessorException, IOException {
+  public int insert(InsertPlan plan) throws BufferWriteProcessorException, IOException {
     if (lastFlushedTimeForEachDevice.containsKey(plan.getDeviceId()) && plan.getTime() <= lastFlushedTimeForEachDevice.get(plan.getDeviceId())) {
-      return false;
+      return WRITE_REJECT_BY_TIME;
     }
     if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
       logNode.write(plan);
     }
+    long memUsage = 0;
+    TSDataType type;
+    String measurement;
+    for (int i=0; i < plan.getMeasurements().length; i++){
+      measurement = plan.getMeasurements()[i];
+      type = fileSchemaRef.getMeasurementDataType(measurement);
+      memUsage += MemUtils.getPointSize(type, measurement);
+    }
+    UsageLevel level = BasicMemController.getInstance().acquireUsage(this, memUsage);
+    switch (level) {
+      case SAFE:
+        doInsert(plan);
+        checkMemThreshold4Flush(memUsage);
+        return WRITE_SUCCESS;
+      case WARNING:
+        if(LOGGER.isWarnEnabled()) {
+          LOGGER.warn("Memory usage will exceed warning threshold, current : {}.",
+              MemUtils.bytesCntToStr(BasicMemController.getInstance().getTotalUsage()));
+        }
+        doInsert(plan);
+        try {
+          flush();
+        } catch (IOException e) {
+          throw new BufferWriteProcessorException(e);
+        }
+        return WRITE_IN_WARNING_MEM;
+      case DANGEROUS:
+        if (LOGGER.isWarnEnabled()) {
+          LOGGER.warn("Memory usage will exceed dangerous threshold, current : {}.",
+              MemUtils.bytesCntToStr(BasicMemController.getInstance().getTotalUsage()));
+        }
+        return WRITE_REJECT_BY_MEM;
+      default:
+          return WRITE_REJECT_BY_MEM;
+    }
+  }
+
+  private void doInsert(InsertPlan plan) {
     String deviceId = plan.getDeviceId();
     long time = plan.getTime();
-    long memUsage = 0;
     TSDataType type;
     String measurement;
     for (int i=0; i < plan.getMeasurements().length; i++){
       measurement = plan.getMeasurements()[i];
       type = fileSchemaRef.getMeasurementDataType(measurement);
       workMemTable.write(deviceId, measurement, type, time, plan.getValues()[i]);
-      memUsage += MemUtils.getPointSize(type, measurement);
     }
     if (!minWrittenTimeForEachDeviceInCurrentFile.containsKey(deviceId)) {
       minWrittenTimeForEachDeviceInCurrentFile.put(deviceId, time);
@@ -289,9 +332,6 @@ public class TsFileProcessor extends Processor {
       maxWrittenTimeForEachDeviceInCurrentFile.put(deviceId, time);
     }
     valueCount++;
-    BasicMemController.getInstance().acquireUsage(this, memUsage);
-    checkMemThreshold4Flush(memUsage);
-    return true;
   }
 
   /**
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessorTest.java
index 1464bcf..22d815c 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessorTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessorTest.java
@@ -116,9 +116,9 @@ public class TsFileProcessorTest {
     String[] s2 = new String[]{"s2"};
     String[] value = new String[]{"5.0"};
     ;
-    Assert.assertTrue(processor.insert(new InsertPlan("root.test.d1",  10, s1, value)));
-    Assert.assertTrue(processor.insert(new InsertPlan("root.test.d1",  10, s2, value)));
-    Assert.assertTrue(processor.insert(new InsertPlan("root.test.d1",  12, s1, value)));
+    Assert.assertEquals(TsFileProcessor.WRITE_SUCCESS, processor.insert(new InsertPlan("root.test.d1",  10, s1, value)));
+    Assert.assertEquals(TsFileProcessor.WRITE_SUCCESS, processor.insert(new InsertPlan("root.test.d1",  10, s2, value)));
+    Assert.assertEquals(TsFileProcessor.WRITE_SUCCESS, processor.insert(new InsertPlan("root.test.d1",  12, s1, value)));
     Future<Boolean> ok = processor.flush();
     ok.get();
     ok = processor.flush();
@@ -129,12 +129,12 @@ public class TsFileProcessorTest {
     ok.get();
 
     //let's rewrite timestamp =12 again..
-    Assert.assertFalse(processor.insert(new InsertPlan("root.test.d1",  12, s1, value)));
+    Assert.assertEquals(TsFileProcessor.WRITE_REJECT_BY_TIME, processor.insert(new InsertPlan("root.test.d1",  12, s1, value)));
     processor.delete("root.test.d1", "s1",12);
-    Assert.assertTrue(processor.insert(new InsertPlan("root.test.d1",  12, s1, value)));
-    Assert.assertTrue(processor.insert(new InsertPlan("root.test.d1",  13, s1, value)));
-    Assert.assertTrue(processor.insert(new InsertPlan("root.test.d2",  10, s1, value)));
-    Assert.assertTrue(processor.insert(new InsertPlan("root.test.d1",  14, s1, value)));
+    Assert.assertEquals(TsFileProcessor.WRITE_SUCCESS, processor.insert(new InsertPlan("root.test.d1",  12, s1, value)));
+    Assert.assertEquals(TsFileProcessor.WRITE_SUCCESS, processor.insert(new InsertPlan("root.test.d1",  13, s1, value)));
+    Assert.assertEquals(TsFileProcessor.WRITE_SUCCESS, processor.insert(new InsertPlan("root.test.d2",  10, s1, value)));
+    Assert.assertEquals(TsFileProcessor.WRITE_SUCCESS, processor.insert(new InsertPlan("root.test.d1",  14, s1, value)));
     processor.delete("root.test.d1", "s1",12);
     processor.delete("root.test.d3", "s1",12);