You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/01/27 04:59:02 UTC
[iotdb] branch two_stage_pipeline updated: change to one stage
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch two_stage_pipeline
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/two_stage_pipeline by this push:
new 0b91782 change to one stage
0b91782 is described below
commit 0b9178239bd5a91e599af38caf20bbea2168a4ec
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Wed Jan 27 12:58:20 2021 +0800
change to one stage
---
.../iotdb/db/engine/flush/MemTableFlushTask.java | 112 ++++-----------------
1 file changed, 19 insertions(+), 93 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index cb72070..eaf2fc7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -20,14 +20,8 @@ package org.apache.iotdb.db.engine.flush;
import java.io.IOException;
import java.util.Map;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
-import org.apache.iotdb.db.exception.runtime.FlushRunTimeException;
import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
@@ -40,17 +34,12 @@ import org.slf4j.LoggerFactory;
public class MemTableFlushTask {
private static final Logger LOGGER = LoggerFactory.getLogger(MemTableFlushTask.class);
- private static final FlushSubTaskPoolManager SUB_TASK_POOL_MANAGER = FlushSubTaskPoolManager
- .getInstance();
- private final Future<?> ioTaskFuture;
- private RestorableTsFileIOWriter writer;
+ private final RestorableTsFileIOWriter writer;
- private final ConcurrentLinkedQueue<Object> ioTaskQueue = new ConcurrentLinkedQueue<>();
- private String storageGroup;
+ private final String storageGroup;
- private IMemTable memTable;
+ private final IMemTable memTable;
- private volatile boolean noMoreIOTask = false;
/**
* @param memTable the memTable to flush
@@ -62,7 +51,6 @@ public class MemTableFlushTask {
this.memTable = memTable;
this.writer = writer;
this.storageGroup = storageGroup;
- this.ioTaskFuture = SUB_TASK_POOL_MANAGER.submit(ioTask);
LOGGER.debug("flush task of Storage group {} memtable {} is created ",
storageGroup, memTable.getVersion());
}
@@ -71,7 +59,7 @@ public class MemTableFlushTask {
* the function for flushing memtable.
*/
public void syncFlushMemTable()
- throws ExecutionException, InterruptedException {
+ throws InterruptedException, IOException {
LOGGER.info("The memTable size of SG {} is {}, the avg series points num in chunk is {} ",
storageGroup,
memTable.memSize(),
@@ -79,11 +67,11 @@ public class MemTableFlushTask {
long start = System.currentTimeMillis();
long sortTime = 0;
long encodingTime = 0;
+ long ioTime = 0;
//for map do not use get(key) to iteratate
for (Map.Entry<String, Map<String, IWritableMemChunk>> memTableEntry : memTable.getMemTableMap().entrySet()) {
- ioTaskQueue.add(new StartFlushGroupIOTask(memTableEntry.getKey()));
-
+ writer.startChunkGroup(memTableEntry.getKey());
final Map<String, IWritableMemChunk> value = memTableEntry.getValue();
for (Map.Entry<String, IWritableMemChunk> iWritableMemChunkEntry : value.entrySet()) {
long startTime = System.currentTimeMillis();
@@ -96,29 +84,29 @@ public class MemTableFlushTask {
writeOneSeries(tvList, seriesWriter, desc.getType());
seriesWriter.sealCurrentPage();
seriesWriter.clearPageWriter();
- encodingTime += (System.currentTimeMillis() - encodingStartTime);
- ioTaskQueue.add(seriesWriter);
+ long ioStartTime = System.currentTimeMillis();
+ encodingTime += (ioStartTime - encodingStartTime);
+ seriesWriter.writeToFileWriter(this.writer);
+ ioTime += (System.currentTimeMillis() - ioStartTime);
}
-
- ioTaskQueue.add(new EndChunkGroupIoTask());
+ long ioStartTime = System.currentTimeMillis();
+ writer.setMinPlanIndex(memTable.getMinPlanIndex());
+ writer.setMaxPlanIndex(memTable.getMaxPlanIndex());
+ writer.endChunkGroup();
+ ioTime += (System.currentTimeMillis() - ioStartTime);
}
- noMoreIOTask = true;
LOGGER.info(
"Storage group {} memtable {}, flushing into disk: data sort time cost {} ms.",
storageGroup, memTable.getVersion(), sortTime);
LOGGER.info("Storage group {}, flushing memtable {} into disk: Encoding data cost "
+ "{} ms.",
storageGroup, memTable.getVersion(), encodingTime);
+ LOGGER.info("flushing a memtable {} in storage group {}, io cost {}ms", memTable.getVersion(),
+ storageGroup, ioTime);
- ioTaskFuture.get();
-
- try {
- writer.writeVersion(memTable.getVersion());
- writer.writePlanIndices();
- } catch (IOException e) {
- throw new ExecutionException(e);
- }
+ writer.writeVersion(memTable.getVersion());
+ writer.writePlanIndices();
LOGGER.info(
"Storage group {} memtable {} flushing a memtable has finished! Time consumption: {}ms",
@@ -164,66 +152,4 @@ public class MemTableFlushTask {
}
}
- @SuppressWarnings("squid:S135")
- private Runnable ioTask = () -> {
- long ioTime = 0;
- boolean returnWhenNoTask = false;
- LOGGER.debug("Storage group {} memtable {}, start io.", storageGroup, memTable.getVersion());
- while (true) {
- if (noMoreIOTask) {
- returnWhenNoTask = true;
- }
- Object ioMessage = ioTaskQueue.poll();
- if (ioMessage == null) {
- if (returnWhenNoTask) {
- break;
- }
- try {
- TimeUnit.MILLISECONDS.sleep(10);
- } catch (@SuppressWarnings("squid:S2142") InterruptedException e) {
- LOGGER.error("Storage group {} memtable {}, io task is interrupted.", storageGroup
- , memTable.getVersion());
- // generally it is because the thread pool is shutdown so the task should be aborted
- break;
- }
- } else {
- long starTime = System.currentTimeMillis();
- try {
- if (ioMessage instanceof StartFlushGroupIOTask) {
- this.writer.startChunkGroup(((StartFlushGroupIOTask) ioMessage).deviceId);
- } else if (ioMessage instanceof IChunkWriter) {
- ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
- chunkWriter.writeToFileWriter(this.writer);
- } else {
- this.writer.setMinPlanIndex(memTable.getMinPlanIndex());
- this.writer.setMaxPlanIndex(memTable.getMaxPlanIndex());
- this.writer.endChunkGroup();
- }
- } catch (IOException e) {
- LOGGER.error("Storage group {} memtable {}, io task meets error.", storageGroup,
- memTable.getVersion(), e);
- throw new FlushRunTimeException(e);
- }
- ioTime += System.currentTimeMillis() - starTime;
- }
- }
- LOGGER.info("flushing a memtable {} in storage group {}, io cost {}ms", memTable.getVersion(),
- storageGroup, ioTime);
- };
-
- static class EndChunkGroupIoTask {
-
- EndChunkGroupIoTask() {
-
- }
- }
-
- static class StartFlushGroupIOTask {
-
- private final String deviceId;
-
- StartFlushGroupIOTask(String deviceId) {
- this.deviceId = deviceId;
- }
- }
}