You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/02/04 08:45:07 UTC
[iotdb] 02/02: correct
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch pipeline_test
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7d1be89c569000c33da67f8197f2255c3295816a
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Thu Feb 4 16:44:34 2021 +0800
correct
---
.../main/java/org/apache/iotdb/SessionExample.java | 37 ++++-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 12 +-
.../iotdb/db/engine/flush/MemTableFlushTask.java | 165 ++++++++++++---------
.../org/apache/iotdb/tsfile/utils/PublicBAOS.java | 5 +
.../iotdb/tsfile/write/chunk/ChunkWriterImpl.java | 12 +-
5 files changed, 144 insertions(+), 87 deletions(-)
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 5e97342..a8916cf 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -44,10 +44,10 @@ public class SessionExample {
long MAX_ROW_NUM = Long.parseLong(args[0]);
System.out.println("MAX_ROW_NUM: " + MAX_ROW_NUM);
- insertTablet(MAX_ROW_NUM);
+ insertSeqTablet(MAX_ROW_NUM);
session.close();
}
- private static void insertTablet(long MAX_ROW_NUM) throws IoTDBConnectionException, StatementExecutionException {
+ private static void insertRandomTablet(long MAX_ROW_NUM) throws IoTDBConnectionException, StatementExecutionException {
// The schema of measurements of one device
// only measurementId and data type in MeasurementSchema take effects in Tablet
List<MeasurementSchema> schemaList = new ArrayList<>();
@@ -77,4 +77,37 @@ public class SessionExample {
tablet.reset();
}
}
+
+ private static void insertSeqTablet(long MAX_ROW_NUM) throws IoTDBConnectionException, StatementExecutionException {
+ // The schema of measurements of one device
+ // only measurementId and data type in MeasurementSchema take effects in Tablet
+ List<MeasurementSchema> schemaList = new ArrayList<>();
+ for (int i = 0; i < 1000; i++) {
+ schemaList.add(new MeasurementSchema("s" + i, TSDataType.DOUBLE));
+ }
+
+ Tablet tablet = new Tablet(ROOT_SG1_D1, schemaList, 1000);
+
+ //Method 1 to add tablet data
+ long timestamp = System.currentTimeMillis();
+
+ for (long row = 0; row < MAX_ROW_NUM; row++) {
+ int rowIndex = tablet.rowSize++;
+ tablet.addTimestamp(rowIndex, timestamp);
+ for (int s = 0; s < 1000; s++) {
+ double value = new Random().nextDouble();
+ tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value);
+ }
+ if (tablet.rowSize == tablet.getMaxRowNumber()) {
+ session.insertTablet(tablet, true);
+ tablet.reset();
+ }
+ timestamp++;
+ }
+
+ if (tablet.rowSize != 0) {
+ session.insertTablet(tablet);
+ tablet.reset();
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 988469a..665f7e4 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -131,17 +131,17 @@ public class IoTDBConfig {
/**
* Memory allocated for the write process
*/
- private long allocateMemoryForWrite = Runtime.getRuntime().maxMemory() * 4 / 10;
+ private long allocateMemoryForWrite = Runtime.getRuntime().maxMemory() * 6 / 10;
/**
* Memory allocated for the read process
*/
- private long allocateMemoryForRead = Runtime.getRuntime().maxMemory() * 3 / 10;
+ private long allocateMemoryForRead = Runtime.getRuntime().maxMemory() / 10;
/**
* Memory allocated for the mtree
*/
- private long allocateMemoryForSchema = Runtime.getRuntime().maxMemory() * 1 / 10;
+ private long allocateMemoryForSchema = Runtime.getRuntime().maxMemory() / 10;
/**
* Memory allocated for the read process besides cache
@@ -301,7 +301,7 @@ public class IoTDBConfig {
/**
* Is the write mem control for writing enable.
*/
- private boolean enableMemControl = true;
+ private boolean enableMemControl = false;
/**
* Is the write ahead log enable.
@@ -343,12 +343,12 @@ public class IoTDBConfig {
/**
* When a memTable's size (in byte) exceeds this, the memtable is flushed to disk.
*/
- private long memtableSizeThreshold = 1024 * 1024 * 1024L;
+ private long memtableSizeThreshold = 4 * 1024 * 1024 * 1024L;
/**
* When average series point number reaches this, flush the memtable to disk
*/
- private int avgSeriesPointNumberThreshold = 100000;
+ private int avgSeriesPointNumberThreshold = 100000000;
/**
* Work when tsfile_manage_strategy is level_strategy. When merge point number reaches this, merge
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index b06e170..5f2e218 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -19,16 +19,19 @@
package org.apache.iotdb.db.engine.flush;
import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.Map;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
import org.apache.iotdb.db.exception.runtime.FlushRunTimeException;
import org.apache.iotdb.db.utils.datastructure.TVList;
+import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
@@ -43,18 +46,23 @@ public class MemTableFlushTask {
private static final Logger LOGGER = LoggerFactory.getLogger(MemTableFlushTask.class);
private static final FlushSubTaskPoolManager SUB_TASK_POOL_MANAGER = FlushSubTaskPoolManager
.getInstance();
+ private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private final Future<?> encodingTaskFuture;
private final Future<?> ioTaskFuture;
private RestorableTsFileIOWriter writer;
- private final ConcurrentLinkedQueue<Object> ioTaskQueue = new ConcurrentLinkedQueue<>();
- private final ConcurrentLinkedQueue<Object> encodingTaskQueue = new ConcurrentLinkedQueue<>();
+ private final LinkedBlockingQueue<Object> encodingTaskQueue = new LinkedBlockingQueue<>();
+ private final LinkedBlockingQueue<Object> ioTaskQueue = (config.isEnableMemControl()
+ && SystemInfo.getInstance().isEncodingFasterThanIo())
+ ? new LinkedBlockingQueue<>(config.getIoTaskQueueSizeForFlushing())
+ : new LinkedBlockingQueue<>();
+
private String storageGroup;
private IMemTable memTable;
- private volatile boolean noMoreEncodingTask = false;
- private volatile boolean noMoreIOTask = false;
+ private long memSerializeTime = 0L;
+ private long ioTime = 0L;
/**
* @param memTable the memTable to flush
@@ -81,12 +89,19 @@ public class MemTableFlushTask {
storageGroup,
memTable.memSize(),
memTable.getTotalPointsNum() / memTable.getSeriesNumber());
+
+ long estimatedTemporaryMemSize = 0L;
+ if (config.isEnableMemControl() && SystemInfo.getInstance().isEncodingFasterThanIo()) {
+ estimatedTemporaryMemSize = memTable.memSize() / memTable.getSeriesNumber()
+ * config.getIoTaskQueueSizeForFlushing();
+ SystemInfo.getInstance().applyTemporaryMemoryForFlushing(estimatedTemporaryMemSize);
+ }
long start = System.currentTimeMillis();
long sortTime = 0;
//for map do not use get(key) to iteratate
for (Map.Entry<String, Map<String, IWritableMemChunk>> memTableEntry : memTable.getMemTableMap().entrySet()) {
- encodingTaskQueue.add(new StartFlushGroupIOTask(memTableEntry.getKey()));
+ encodingTaskQueue.put(new StartFlushGroupIOTask(memTableEntry.getKey()));
final Map<String, IWritableMemChunk> value = memTableEntry.getValue();
for (Map.Entry<String, IWritableMemChunk> iWritableMemChunkEntry : value.entrySet()) {
@@ -95,13 +110,12 @@ public class MemTableFlushTask {
MeasurementSchema desc = series.getSchema();
TVList tvList = series.getSortedTVListForFlush();
sortTime += System.currentTimeMillis() - startTime;
- encodingTaskQueue.add(new Pair<>(tvList, desc));
+ encodingTaskQueue.put(new Pair<>(tvList, desc));
}
- encodingTaskQueue.add(new EndChunkGroupIoTask());
+ encodingTaskQueue.put(new EndChunkGroupIoTask());
}
-
- noMoreEncodingTask = true;
+ encodingTaskQueue.put(new TaskEnd());
LOGGER.info(
"Storage group {} memtable flushing into file {}: data sort time cost {} ms.",
storageGroup, writer.getFile().getName(), sortTime);
@@ -109,8 +123,6 @@ public class MemTableFlushTask {
try {
encodingTaskFuture.get();
} catch (InterruptedException | ExecutionException e) {
- // avoid ioTask waiting forever
- noMoreIOTask = true;
ioTaskFuture.cancel(true);
throw e;
}
@@ -123,6 +135,13 @@ public class MemTableFlushTask {
throw new ExecutionException(e);
}
+ if (config.isEnableMemControl()) {
+ if (estimatedTemporaryMemSize != 0) {
+ SystemInfo.getInstance().releaseTemporaryMemoryForFlushing(estimatedTemporaryMemSize);
+ }
+ SystemInfo.getInstance().setEncodingFasterThanIo(ioTime >= memSerializeTime);
+ }
+
LOGGER.info(
"Storage group {} memtable {} flushing a memtable has finished! Time consumption: {}ms",
storageGroup, memTable, System.currentTimeMillis() - start);
@@ -169,44 +188,53 @@ public class MemTableFlushTask {
@SuppressWarnings("squid:S135")
@Override
public void run() {
- long memSerializeTime = 0;
- boolean noMoreMessages = false;
LOGGER.debug("Storage group {} memtable flushing to file {} starts to encoding data.",
storageGroup, writer.getFile().getName());
while (true) {
- if (noMoreEncodingTask) {
- noMoreMessages = true;
+
+ Object task;
+ try {
+ task = encodingTaskQueue.take();
+ } catch (InterruptedException e1) {
+ LOGGER.error("Take task into ioTaskQueue Interrupted");
+ Thread.currentThread().interrupt();
+ break;
}
- Object task = encodingTaskQueue.poll();
- if (task == null) {
- if (noMoreMessages) {
- break;
- }
+ if (task instanceof StartFlushGroupIOTask || task instanceof EndChunkGroupIoTask) {
try {
- TimeUnit.MILLISECONDS.sleep(10);
+ ioTaskQueue.put(task);
} catch (@SuppressWarnings("squid:S2142") InterruptedException e) {
LOGGER.error("Storage group {} memtable flushing to file {}, encoding task is interrupted.",
storageGroup, writer.getFile().getName(), e);
// generally it is because the thread pool is shutdown so the task should be aborted
break;
}
+ } else if (task instanceof TaskEnd) {
+ break;
} else {
- if (task instanceof StartFlushGroupIOTask || task instanceof EndChunkGroupIoTask) {
- ioTaskQueue.add(task);
- } else {
- long starTime = System.currentTimeMillis();
- Pair<TVList, MeasurementSchema> encodingMessage = (Pair<TVList, MeasurementSchema>) task;
- IChunkWriter seriesWriter = new ChunkWriterImpl(encodingMessage.right);
- writeOneSeries(encodingMessage.left, seriesWriter, encodingMessage.right.getType());
- seriesWriter.sealCurrentPage();
- seriesWriter.clearPageWriter();
- memSerializeTime += System.currentTimeMillis() - starTime;
- ioTaskQueue.add(seriesWriter);
+ long starTime = System.currentTimeMillis();
+ Pair<TVList, MeasurementSchema> encodingMessage = (Pair<TVList, MeasurementSchema>) task;
+ IChunkWriter seriesWriter = new ChunkWriterImpl(encodingMessage.right);
+ writeOneSeries(encodingMessage.left, seriesWriter, encodingMessage.right.getType());
+ seriesWriter.sealCurrentPage();
+ seriesWriter.clearPageWriter();
+ try {
+ ioTaskQueue.put(seriesWriter);
+ } catch (InterruptedException e) {
+ LOGGER.error("Put task into ioTaskQueue Interrupted");
+ Thread.currentThread().interrupt();
}
+ memSerializeTime += System.currentTimeMillis() - starTime;
}
}
- noMoreIOTask = true;
- LOGGER.info("Storage group {}, flushing memtable into file {}: Encoding data cost "
+ try {
+ ioTaskQueue.put(new TaskEnd());
+ } catch (InterruptedException e) {
+ LOGGER.error("Put task into ioTaskQueue Interrupted");
+ Thread.currentThread().interrupt();
+ }
+
+ LOGGER.info("Storage group {}, flushing memtable {} into disk: Encoding data cost "
+ "{} ms.",
storageGroup, writer.getFile().getName(), memSerializeTime);
}
@@ -214,52 +242,49 @@ public class MemTableFlushTask {
@SuppressWarnings("squid:S135")
private Runnable ioTask = () -> {
- long ioTime = 0;
- boolean returnWhenNoTask = false;
LOGGER.debug("Storage group {} memtable flushing to file {} start io.",
storageGroup, writer.getFile().getName());
while (true) {
- if (noMoreIOTask) {
- returnWhenNoTask = true;
+ Object ioMessage;
+ try {
+ ioMessage = ioTaskQueue.take();
+ } catch (InterruptedException e1) {
+ LOGGER.error("take task from ioTaskQueue Interrupted");
+ Thread.currentThread().interrupt();
+ break;
}
- Object ioMessage = ioTaskQueue.poll();
- if (ioMessage == null) {
- if (returnWhenNoTask) {
+ long starTime = System.currentTimeMillis();
+ try {
+ if (ioMessage instanceof StartFlushGroupIOTask) {
+ this.writer.startChunkGroup(((StartFlushGroupIOTask) ioMessage).deviceId);
+ } else if (ioMessage instanceof TaskEnd) {
break;
+ } else if (ioMessage instanceof IChunkWriter) {
+ ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
+ chunkWriter.writeToFileWriter(this.writer);
+ } else {
+ this.writer.setMinPlanIndex(memTable.getMinPlanIndex());
+ this.writer.setMaxPlanIndex(memTable.getMaxPlanIndex());
+ this.writer.endChunkGroup();
}
- try {
- TimeUnit.MILLISECONDS.sleep(10);
- } catch (@SuppressWarnings("squid:S2142") InterruptedException e) {
- LOGGER.error("Storage group {} memtable flushing to file {}, io task is interrupted.",
- storageGroup, writer.getFile().getName());
- // generally it is because the thread pool is shutdown so the task should be aborted
- break;
- }
- } else {
- long starTime = System.currentTimeMillis();
- try {
- if (ioMessage instanceof StartFlushGroupIOTask) {
-// this.writer.startChunkGroup(((StartFlushGroupIOTask) ioMessage).deviceId);
- } else if (ioMessage instanceof IChunkWriter) {
- ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
-// chunkWriter.writeToFileWriter(this.writer);
- } else {
-// this.writer.setMinPlanIndex(memTable.getMinPlanIndex());
-// this.writer.setMaxPlanIndex(memTable.getMaxPlanIndex());
- this.writer.endChunkGroup();
- }
- } catch (IOException e) {
- LOGGER.error("Storage group {} memtable flushing to file {}, io task meets error.",
- storageGroup, writer.getFile().getName(), e);
- throw new FlushRunTimeException(e);
- }
- ioTime += System.currentTimeMillis() - starTime;
+ } catch (IOException e) {
+ LOGGER.error("Storage group {} memtable {}, io task meets error.", storageGroup,
+ memTable, e);
+ throw new FlushRunTimeException(e);
}
+ ioTime += System.currentTimeMillis() - starTime;
}
LOGGER.info("flushing a memtable to file {} in storage group {}, io cost {}ms",
writer.getFile().getName(), storageGroup, ioTime);
};
+ static class TaskEnd {
+
+ TaskEnd() {
+
+ }
+ }
+
static class EndChunkGroupIoTask {
EndChunkGroupIoTask() {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
index de4c985..085bcef 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
@@ -79,4 +79,9 @@ public class PublicBAOS extends ByteArrayOutputStream {
public void reset() {
count = 0;
}
+
+ @Override
+ public int size() {
+ return count;
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
index a1968c8..0b92c6f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
@@ -52,11 +52,6 @@ public class ChunkWriterImpl implements IChunkWriter {
*/
private PublicBAOS pageBuffer;
- /**
- * current chunk data size, i.e the size of pageBuffer
- */
- private int chunkDataSize;
-
private int numOfPages;
/**
@@ -356,7 +351,6 @@ public class ChunkWriterImpl implements IChunkWriter {
if (pageWriter != null && pageWriter.getPointNumber() > 0) {
writePageToPageBuffer();
}
- chunkDataSize = pageBuffer.size();
}
public void clearPageWriter() {
@@ -424,7 +418,7 @@ public class ChunkWriterImpl implements IChunkWriter {
// start to write this column chunk
writer.startFlushChunk(measurementSchema, compressor.getType(), measurementSchema.getType(),
- measurementSchema.getEncodingType(), statistics, chunkDataSize, numOfPages);
+ measurementSchema.getEncodingType(), statistics, pageBuffer.size(), numOfPages);
long dataOffset = writer.getPos();
@@ -432,10 +426,10 @@ public class ChunkWriterImpl implements IChunkWriter {
writer.writeBytesToStream(pageBuffer);
int dataSize = (int) (writer.getPos() - dataOffset);
- if (dataSize != chunkDataSize) {
+ if (dataSize != pageBuffer.size()) {
throw new IOException(
"Bytes written is inconsistent with the size of data: " + dataSize + " !="
- + " " + chunkDataSize);
+ + " " + pageBuffer.size());
}
writer.endCurrentChunk();