You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2019/06/07 04:12:27 UTC
[incubator-iotdb] branch feature_async_close_tsfile updated: change
flushTask as 3 parts (using 3 threads)
This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
new 8c4d79f change flushTask as 3 parts (using 3 threads)
8c4d79f is described below
commit 8c4d79fb3a58cf3d9fafffe5be16ada8d4c26c86
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Fri Jun 7 12:12:16 2019 +0800
change flushTask as 3 parts (using 3 threads)
---
.../engine/bufferwrite/BufferWriteProcessor.java | 5 +-
.../db/engine/memtable/MemTableFlushTask.java | 228 +++++++++++++++++++++
.../db/engine/memtable/MemTableFlushUtil.java | 3 -
.../bufferwrite/RestorableTsFileIOWriterTest.java | 10 +-
4 files changed, 239 insertions(+), 7 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
index 2a458fd..03b6c9e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.db.engine.filenode.TsFileResource;
import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.MemSeriesLazyMerger;
+import org.apache.iotdb.db.engine.memtable.MemTableFlushTask;
import org.apache.iotdb.db.engine.memtable.MemTableFlushUtil;
import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
import org.apache.iotdb.db.engine.pool.FlushManager;
@@ -317,8 +318,8 @@ public class BufferWriteProcessor extends Processor {
try {
if (flushMemTable != null && !flushMemTable.isEmpty()) {
// flush data
- MemTableFlushUtil.flushMemTable(fileSchema, writer, flushMemTable,
- version);
+ MemTableFlushTask tableFlushTask = new MemTableFlushTask(writer, getProcessorName());
+ tableFlushTask.flushMemTable(fileSchema, flushMemTable, version);
// write restore information
writer.flush();
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java
new file mode 100644
index 0000000..7b28992
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.memtable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.chunk.ChunkBuffer;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
+import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
+import org.apache.iotdb.tsfile.write.schema.FileSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MemTableFlushTask {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(MemTableFlushTask.class);
+ private static final int PAGE_SIZE_THRESHOLD = TSFileConfig.pageSizeInByte;
+
+ private TsFileIOWriter tsFileIoWriter;
+
+ private ConcurrentLinkedQueue ioTaskQueue = new ConcurrentLinkedQueue();
+ private ConcurrentLinkedQueue memoryTaskQueue = new ConcurrentLinkedQueue();
+ private boolean stop = false;
+ private String processName;
+
+ //the position of the tsfile when a flush task begins.
+ long startPos;
+
+ //TODO more better way is: for each TsFile, assign it a Executors.singleThreadPool,
+ // rather than per each memtable.
+ private Thread ioFlushThread = new Thread(() -> {
+ long ioTime = 0;
+ while (!stop) {
+ Object seriesWriterOrEndChunkGroupTask = ioTaskQueue.poll();
+ if (seriesWriterOrEndChunkGroupTask == null ) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ LOGGER.error("BufferWrite Processor {}, io flush task is interrupted.", processName, e);
+ }
+ } else {
+ long starTime = System.currentTimeMillis();
+ try {
+ if (seriesWriterOrEndChunkGroupTask instanceof IChunkWriter) {
+ ((IChunkWriter) seriesWriterOrEndChunkGroupTask).writeToFileWriter(tsFileIoWriter);
+ } else if (seriesWriterOrEndChunkGroupTask instanceof String) {
+ tsFileIoWriter.startFlushChunkGroup((String)seriesWriterOrEndChunkGroupTask);
+ } else {
+ ChunkGroupIoTask task = (ChunkGroupIoTask) seriesWriterOrEndChunkGroupTask;
+ long memSize = tsFileIoWriter.getPos() - startPos;
+ ChunkGroupFooter footer = new ChunkGroupFooter(task.deviceId, memSize, task.seriesNumber);
+ tsFileIoWriter.endChunkGroup(footer, task.version);
+ startPos = tsFileIoWriter.getPos();
+ task.finished = true;
+ }
+ } catch (IOException e) {
+ LOGGER.error("BufferWrite Processor {}, io error.", processName, e);
+ throw new RuntimeException(e);
+ }
+ ioTime += System.currentTimeMillis() - starTime;
+ }
+ }
+ LOGGER.info("BufferWrite Processor {}, flushing a memtable into disk: io cost {} ms.", processName, ioTime );
+ });
+
+ private Thread memoryFlushThread = new Thread(() -> {
+ long memSerializeTime = 0;
+ while (!stop) {
+ Object task = memoryTaskQueue.poll();
+ if (task == null ) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ LOGGER.error("BufferWrite Processor {}, io flush task is interrupted.", processName, e);
+ }
+ } else {
+ if (task instanceof String) {
+ ioTaskQueue.add(task);
+ } else if (task instanceof ChunkGroupIoTask) {
+ ioTaskQueue.add(task);
+ }else {
+ long starTime = System.currentTimeMillis();
+ Pair<List<TimeValuePair>, MeasurementSchema> memorySerializeTask = (Pair<List<TimeValuePair>, MeasurementSchema>) task;
+ ChunkBuffer chunkBuffer = new ChunkBuffer(memorySerializeTask.right);
+ IChunkWriter seriesWriter = new ChunkWriterImpl(memorySerializeTask.right, chunkBuffer,
+ PAGE_SIZE_THRESHOLD);
+ try {
+ writeOneSeries(memorySerializeTask.left, seriesWriter,
+ memorySerializeTask.right.getType());
+ ioTaskQueue.add(seriesWriter);
+ } catch (IOException e) {
+ LOGGER.error("BufferWrite Processor {}, io error.", processName, e);
+ throw new RuntimeException(e);
+ }
+ memSerializeTime += System.currentTimeMillis() - starTime;
+ }
+ }
+ }
+ LOGGER.info("BufferWrite Processor {},flushing a memtable into disk: serialize data into mem cost {} ms.", processName, memSerializeTime );
+ });
+
+ public MemTableFlushTask(TsFileIOWriter writer, String processName) {
+ this.tsFileIoWriter = writer;
+ this.processName = processName;
+ ioFlushThread.start();
+ memoryFlushThread.start();
+ }
+
+ private void writeOneSeries(List<TimeValuePair> tvPairs, IChunkWriter seriesWriterImpl,
+ TSDataType dataType)
+ throws IOException {
+ for (TimeValuePair timeValuePair : tvPairs) {
+ switch (dataType) {
+ case BOOLEAN:
+ seriesWriterImpl
+ .write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean());
+ break;
+ case INT32:
+ seriesWriterImpl.write(timeValuePair.getTimestamp(),
+ timeValuePair.getValue().getInt());
+ break;
+ case INT64:
+ seriesWriterImpl.write(timeValuePair.getTimestamp(),
+ timeValuePair.getValue().getLong());
+ break;
+ case FLOAT:
+ seriesWriterImpl.write(timeValuePair.getTimestamp(),
+ timeValuePair.getValue().getFloat());
+ break;
+ case DOUBLE:
+ seriesWriterImpl
+ .write(timeValuePair.getTimestamp(),
+ timeValuePair.getValue().getDouble());
+ break;
+ case TEXT:
+ seriesWriterImpl
+ .write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary());
+ break;
+ default:
+ LOGGER.error("BufferWrite Processor {}, don't support data type: {}", processName, dataType);
+ break;
+ }
+ }
+ }
+
+ /**
+ * the function for flushing memtable.
+ */
+ public void flushMemTable(FileSchema fileSchema,
+ IMemTable imemTable, long version) throws IOException {
+ long sortTime = 0;
+ startPos = tsFileIoWriter.getPos();
+ ChunkGroupIoTask theLastTask = EMPTY_TASK;
+ for (String deviceId : imemTable.getMemTableMap().keySet()) {
+ memoryTaskQueue.add(deviceId);
+ int seriesNumber = imemTable.getMemTableMap().get(deviceId).size();
+ for (String measurementId : imemTable.getMemTableMap().get(deviceId).keySet()) {
+ long startTime = System.currentTimeMillis();
+ // TODO if we can not use TSFileIO writer, then we have to redesign the class of TSFileIO.
+ IWritableMemChunk series = imemTable.getMemTableMap().get(deviceId).get(measurementId);
+ MeasurementSchema desc = fileSchema.getMeasurementSchema(measurementId);
+ List<TimeValuePair> sortedTimeValuePairs = series.getSortedTimeValuePairList();
+ sortTime += System.currentTimeMillis() - startTime;
+ memoryTaskQueue.add(new Pair<>(sortedTimeValuePairs, desc));
+ }
+ theLastTask = new ChunkGroupIoTask(seriesNumber, deviceId, version);
+ memoryTaskQueue.add(theLastTask);
+ }
+ LOGGER.info(
+ "BufferWrite Processor {}, flushing a memtable into disk: data sort time cost {} ms.", processName, sortTime );
+ while (!theLastTask.finished) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ LOGGER.error("BufferWrite Processor {}, flush memtable table thread is interrupted.", processName, e);
+ throw new RuntimeException(e);
+ }
+ }
+ stop = true;
+ }
+
+
+ static class ChunkGroupIoTask {
+ int seriesNumber;
+ String deviceId;
+ long version;
+ boolean finished;
+
+ public ChunkGroupIoTask(int seriesNumber, String deviceId, long version) {
+ this(seriesNumber, deviceId, version, false);
+ }
+ public ChunkGroupIoTask(int seriesNumber, String deviceId, long version, boolean finished) {
+ this.seriesNumber = seriesNumber;
+ this.deviceId = deviceId;
+ this.version = version;
+ this.finished = finished;
+ }
+ }
+ private static ChunkGroupIoTask EMPTY_TASK = new ChunkGroupIoTask(0, "", 0, true);
+
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushUtil.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushUtil.java
index 8cd0ec7..4aa12e9 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushUtil.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushUtil.java
@@ -19,15 +19,12 @@
package org.apache.iotdb.db.engine.memtable;
import java.io.IOException;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.chunk.ChunkBuffer;
import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java
index f2c393a..1f8e3e2 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java
@@ -31,6 +31,7 @@ import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.engine.memtable.MemTableFlushTask;
import org.apache.iotdb.db.engine.memtable.MemTableFlushUtil;
import org.apache.iotdb.db.engine.memtable.MemTableTestUtils;
import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
@@ -165,7 +166,9 @@ public class RestorableTsFileIOWriterTest {
memTable.write("d1", "s2", TSDataType.INT32, 3, "1");
memTable.write("d2", "s2", TSDataType.INT32, 2, "1");
memTable.write("d2", "s2", TSDataType.INT32, 4, "1");
- MemTableFlushUtil.flushMemTable(schema, writer, memTable, 0);
+ //MemTableFlushUtil.flushMemTable(schema, writer, memTable, 0);
+ MemTableFlushTask tableFlushTask = new MemTableFlushTask(writer, "test");
+ tableFlushTask.flushMemTable(schema, memTable, 0);
writer.flush();
writer.appendMetadata();
writer.getOutput().close();
@@ -218,7 +221,10 @@ public class RestorableTsFileIOWriterTest {
MemTableTestUtils.measurementId0,
MemTableTestUtils.dataType0);
- MemTableFlushUtil.flushMemTable(MemTableTestUtils.getFileSchema(), writer, memTable, 0);
+ //MemTableFlushUtil.flushMemTable(MemTableTestUtils.getFileSchema(), writer, memTable, 0);
+ MemTableFlushTask tableFlushTask = new MemTableFlushTask(writer, "test");
+ tableFlushTask.flushMemTable(MemTableTestUtils.getFileSchema(), memTable, 0);
+
writer.flush();
assertEquals(0,