You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2019/04/18 00:10:05 UTC
[incubator-iotdb] branch refactor_bufferwrite updated: handle
memory cost when writing data into mem
This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch refactor_bufferwrite
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/refactor_bufferwrite by this push:
new 5e7a284 handle memory cost when writing data into mem
5e7a284 is described below
commit 5e7a28412dee15227a53ec30f610c74a546d7ca8
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Thu Apr 18 08:09:52 2019 +0800
handle memory cost when writing data into mem
---
.../db/engine/overflow/OverflowProcessor.java | 29 +++++++++++
.../db/engine/tsfiledata/TsFileProcessor.java | 58 ++++++++++++++++++----
.../db/engine/tsfiledata/TsFileProcessorTest.java | 16 +++---
3 files changed, 86 insertions(+), 17 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/OverflowProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/OverflowProcessor.java
new file mode 100644
index 0000000..fdccb80
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/OverflowProcessor.java
@@ -0,0 +1,29 @@
+package org.apache.iotdb.db.engine.overflow;
+
+import java.io.IOException;
+import org.apache.iotdb.db.engine.bufferwrite.Action;
+import org.apache.iotdb.db.engine.tsfiledata.TsFileProcessor;
+import org.apache.iotdb.db.engine.version.VersionController;
+import org.apache.iotdb.db.exception.BufferWriteProcessorException;
+import org.apache.iotdb.tsfile.write.schema.FileSchema;
+
+public class OverflowProcessor extends TsFileProcessor {
+
+ /**
+ * constructor of BufferWriteProcessor. data will be stored in baseDir/processorName/ folder.
+ *
+ * @param processorName processor name
+ * @param fileSchemaRef file schema
+ * @throws BufferWriteProcessorException BufferWriteProcessorException
+ */
+ public OverflowProcessor(String processorName,
+ Action beforeFlushAction,
+ Action afterFlushAction,
+ Action afterCloseAction,
+ VersionController versionController,
+ FileSchema fileSchemaRef)
+ throws BufferWriteProcessorException, IOException {
+ super(processorName, beforeFlushAction, afterFlushAction, afterCloseAction, versionController,
+ fileSchemaRef);
+ }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
index 7ef1001..3e7bba4 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
@@ -48,6 +48,7 @@ import org.apache.iotdb.db.engine.bufferwrite.RestorableTsFileIOWriter;
import org.apache.iotdb.db.engine.filenode.FileNodeManager;
import org.apache.iotdb.db.engine.filenode.TsFileResource;
import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
+import org.apache.iotdb.db.engine.memcontrol.BasicMemController.UsageLevel;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.MemSeriesLazyMerger;
import org.apache.iotdb.db.engine.memtable.MemTableFlushUtil;
@@ -90,6 +91,10 @@ import org.slf4j.LoggerFactory;
public class TsFileProcessor extends Processor {
private static final Logger LOGGER = LoggerFactory.getLogger(TsFileProcessor.class);
+ public static final int WRITE_SUCCESS = 1;
+ public static final int WRITE_IN_WARNING_MEM= 0;
+ public static final int WRITE_REJECT_BY_TIME = -1;
+ public static final int WRITE_REJECT_BY_MEM = -2;
//this is just a part of fileSchemaRef: only the measurements that belong to this TsFileProcessor
// are in this fileSchemaRef. And, this filed is shared with other classes (i.e., storage group
@@ -259,27 +264,65 @@ public class TsFileProcessor extends Processor {
* flushing operation will be called.
*
* @param plan data to be written
- * @return true if the tsRecord can be inserted into tsFile. otherwise false (, then you need to
- * insert it into overflow)
+ * @return - 1 (WRITE_SUCCESS) if the tsRecord can be inserted into tsFile.
+ * - 0 (WRITE_IN_WARNING_MEM) if the memory is UsageLevel.WARNING
+ * - -1 (WRITE_REJECT_BY_TIME) if you need to insert it into overflow
+ * - -2 (WRITE_REJECT_BY_MEM) if the memory is UsageLevel.ERROR
* @throws BufferWriteProcessorException if a flushing operation occurs and failed.
*/
- public boolean insert(InsertPlan plan) throws BufferWriteProcessorException, IOException {
+ public int insert(InsertPlan plan) throws BufferWriteProcessorException, IOException {
if (lastFlushedTimeForEachDevice.containsKey(plan.getDeviceId()) && plan.getTime() <= lastFlushedTimeForEachDevice.get(plan.getDeviceId())) {
- return false;
+ return WRITE_REJECT_BY_TIME;
}
if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
logNode.write(plan);
}
+ long memUsage = 0;
+ TSDataType type;
+ String measurement;
+ for (int i=0; i < plan.getMeasurements().length; i++){
+ measurement = plan.getMeasurements()[i];
+ type = fileSchemaRef.getMeasurementDataType(measurement);
+ memUsage += MemUtils.getPointSize(type, measurement);
+ }
+ UsageLevel level = BasicMemController.getInstance().acquireUsage(this, memUsage);
+ switch (level) {
+ case SAFE:
+ doInsert(plan);
+ checkMemThreshold4Flush(memUsage);
+ return WRITE_SUCCESS;
+ case WARNING:
+ if(LOGGER.isWarnEnabled()) {
+ LOGGER.warn("Memory usage will exceed warning threshold, current : {}.",
+ MemUtils.bytesCntToStr(BasicMemController.getInstance().getTotalUsage()));
+ }
+ doInsert(plan);
+ try {
+ flush();
+ } catch (IOException e) {
+ throw new BufferWriteProcessorException(e);
+ }
+ return WRITE_IN_WARNING_MEM;
+ case DANGEROUS:
+ if (LOGGER.isWarnEnabled()) {
+ LOGGER.warn("Memory usage will exceed dangerous threshold, current : {}.",
+ MemUtils.bytesCntToStr(BasicMemController.getInstance().getTotalUsage()));
+ }
+ return WRITE_REJECT_BY_MEM;
+ default:
+ return WRITE_REJECT_BY_MEM;
+ }
+ }
+
+ private void doInsert(InsertPlan plan) {
String deviceId = plan.getDeviceId();
long time = plan.getTime();
- long memUsage = 0;
TSDataType type;
String measurement;
for (int i=0; i < plan.getMeasurements().length; i++){
measurement = plan.getMeasurements()[i];
type = fileSchemaRef.getMeasurementDataType(measurement);
workMemTable.write(deviceId, measurement, type, time, plan.getValues()[i]);
- memUsage += MemUtils.getPointSize(type, measurement);
}
if (!minWrittenTimeForEachDeviceInCurrentFile.containsKey(deviceId)) {
minWrittenTimeForEachDeviceInCurrentFile.put(deviceId, time);
@@ -289,9 +332,6 @@ public class TsFileProcessor extends Processor {
maxWrittenTimeForEachDeviceInCurrentFile.put(deviceId, time);
}
valueCount++;
- BasicMemController.getInstance().acquireUsage(this, memUsage);
- checkMemThreshold4Flush(memUsage);
- return true;
}
/**
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessorTest.java
index 1464bcf..22d815c 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessorTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessorTest.java
@@ -116,9 +116,9 @@ public class TsFileProcessorTest {
String[] s2 = new String[]{"s2"};
String[] value = new String[]{"5.0"};
;
- Assert.assertTrue(processor.insert(new InsertPlan("root.test.d1", 10, s1, value)));
- Assert.assertTrue(processor.insert(new InsertPlan("root.test.d1", 10, s2, value)));
- Assert.assertTrue(processor.insert(new InsertPlan("root.test.d1", 12, s1, value)));
+ Assert.assertEquals(TsFileProcessor.WRITE_SUCCESS, processor.insert(new InsertPlan("root.test.d1", 10, s1, value)));
+ Assert.assertEquals(TsFileProcessor.WRITE_SUCCESS, processor.insert(new InsertPlan("root.test.d1", 10, s2, value)));
+ Assert.assertEquals(TsFileProcessor.WRITE_SUCCESS, processor.insert(new InsertPlan("root.test.d1", 12, s1, value)));
Future<Boolean> ok = processor.flush();
ok.get();
ok = processor.flush();
@@ -129,12 +129,12 @@ public class TsFileProcessorTest {
ok.get();
//let's rewrite timestamp =12 again..
- Assert.assertFalse(processor.insert(new InsertPlan("root.test.d1", 12, s1, value)));
+ Assert.assertEquals(TsFileProcessor.WRITE_REJECT_BY_TIME, processor.insert(new InsertPlan("root.test.d1", 12, s1, value)));
processor.delete("root.test.d1", "s1",12);
- Assert.assertTrue(processor.insert(new InsertPlan("root.test.d1", 12, s1, value)));
- Assert.assertTrue(processor.insert(new InsertPlan("root.test.d1", 13, s1, value)));
- Assert.assertTrue(processor.insert(new InsertPlan("root.test.d2", 10, s1, value)));
- Assert.assertTrue(processor.insert(new InsertPlan("root.test.d1", 14, s1, value)));
+ Assert.assertEquals(TsFileProcessor.WRITE_SUCCESS, processor.insert(new InsertPlan("root.test.d1", 12, s1, value)));
+ Assert.assertEquals(TsFileProcessor.WRITE_SUCCESS, processor.insert(new InsertPlan("root.test.d1", 13, s1, value)));
+ Assert.assertEquals(TsFileProcessor.WRITE_SUCCESS, processor.insert(new InsertPlan("root.test.d2", 10, s1, value)));
+ Assert.assertEquals(TsFileProcessor.WRITE_SUCCESS, processor.insert(new InsertPlan("root.test.d1", 14, s1, value)));
processor.delete("root.test.d1", "s1",12);
processor.delete("root.test.d3", "s1",12);