You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/02/04 08:45:06 UTC
[iotdb] 01/02: s
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch pipeline_test
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 621856f78c0e7747abe236326b85977ab84b3296
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Thu Feb 4 16:28:34 2021 +0800
s
---
.../iotdb/db/engine/flush/MemTableFlushTask.java | 173 +++++++++------------
1 file changed, 74 insertions(+), 99 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index f67b486..b06e170 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -19,19 +19,16 @@
package org.apache.iotdb.db.engine.flush;
import java.io.IOException;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import java.util.concurrent.TimeUnit;
import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
import org.apache.iotdb.db.exception.runtime.FlushRunTimeException;
import org.apache.iotdb.db.utils.datastructure.TVList;
-import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
@@ -46,23 +43,18 @@ public class MemTableFlushTask {
private static final Logger LOGGER = LoggerFactory.getLogger(MemTableFlushTask.class);
private static final FlushSubTaskPoolManager SUB_TASK_POOL_MANAGER = FlushSubTaskPoolManager
.getInstance();
- private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private final Future<?> encodingTaskFuture;
private final Future<?> ioTaskFuture;
private RestorableTsFileIOWriter writer;
- private final LinkedBlockingQueue<Object> encodingTaskQueue = new LinkedBlockingQueue<>();
- private final LinkedBlockingQueue<Object> ioTaskQueue = (config.isEnableMemControl()
- && SystemInfo.getInstance().isEncodingFasterThanIo())
- ? new LinkedBlockingQueue<>(config.getIoTaskQueueSizeForFlushing())
- : new LinkedBlockingQueue<>();
-
+ private final ConcurrentLinkedQueue<Object> ioTaskQueue = new ConcurrentLinkedQueue<>();
+ private final ConcurrentLinkedQueue<Object> encodingTaskQueue = new ConcurrentLinkedQueue<>();
private String storageGroup;
private IMemTable memTable;
- private volatile long memSerializeTime = 0L;
- private volatile long ioTime = 0L;
+ private volatile boolean noMoreEncodingTask = false;
+ private volatile boolean noMoreIOTask = false;
/**
* @param memTable the memTable to flush
@@ -77,7 +69,7 @@ public class MemTableFlushTask {
this.encodingTaskFuture = SUB_TASK_POOL_MANAGER.submit(encodingTask);
this.ioTaskFuture = SUB_TASK_POOL_MANAGER.submit(ioTask);
LOGGER.debug("flush task of Storage group {} memtable is created, flushing to file {}.",
- storageGroup, writer.getFile().getName());
+ storageGroup, writer.getFile().getName());
}
/**
@@ -89,19 +81,12 @@ public class MemTableFlushTask {
storageGroup,
memTable.memSize(),
memTable.getTotalPointsNum() / memTable.getSeriesNumber());
-
- long estimatedTemporaryMemSize = 0L;
- if (config.isEnableMemControl() && SystemInfo.getInstance().isEncodingFasterThanIo()) {
- estimatedTemporaryMemSize = memTable.memSize() / memTable.getSeriesNumber()
- * config.getIoTaskQueueSizeForFlushing();
- SystemInfo.getInstance().applyTemporaryMemoryForFlushing(estimatedTemporaryMemSize);
- }
long start = System.currentTimeMillis();
long sortTime = 0;
//for map do not use get(key) to iteratate
for (Map.Entry<String, Map<String, IWritableMemChunk>> memTableEntry : memTable.getMemTableMap().entrySet()) {
- encodingTaskQueue.put(new StartFlushGroupIOTask(memTableEntry.getKey()));
+ encodingTaskQueue.add(new StartFlushGroupIOTask(memTableEntry.getKey()));
final Map<String, IWritableMemChunk> value = memTableEntry.getValue();
for (Map.Entry<String, IWritableMemChunk> iWritableMemChunkEntry : value.entrySet()) {
@@ -110,12 +95,13 @@ public class MemTableFlushTask {
MeasurementSchema desc = series.getSchema();
TVList tvList = series.getSortedTVListForFlush();
sortTime += System.currentTimeMillis() - startTime;
- encodingTaskQueue.put(new Pair<>(tvList, desc));
+ encodingTaskQueue.add(new Pair<>(tvList, desc));
}
- encodingTaskQueue.put(new EndChunkGroupIoTask());
+ encodingTaskQueue.add(new EndChunkGroupIoTask());
}
- encodingTaskQueue.put(new TaskEnd());
+
+ noMoreEncodingTask = true;
LOGGER.info(
"Storage group {} memtable flushing into file {}: data sort time cost {} ms.",
storageGroup, writer.getFile().getName(), sortTime);
@@ -123,6 +109,8 @@ public class MemTableFlushTask {
try {
encodingTaskFuture.get();
} catch (InterruptedException | ExecutionException e) {
+ // avoid ioTask waiting forever
+ noMoreIOTask = true;
ioTaskFuture.cancel(true);
throw e;
}
@@ -135,13 +123,6 @@ public class MemTableFlushTask {
throw new ExecutionException(e);
}
- if (config.isEnableMemControl()) {
- if (estimatedTemporaryMemSize != 0) {
- SystemInfo.getInstance().releaseTemporaryMemoryForFlushing(estimatedTemporaryMemSize);
- }
- SystemInfo.getInstance().setEncodingFasterThanIo(ioTime >= memSerializeTime);
- }
-
LOGGER.info(
"Storage group {} memtable {} flushing a memtable has finished! Time consumption: {}ms",
storageGroup, memTable, System.currentTimeMillis() - start);
@@ -188,103 +169,97 @@ public class MemTableFlushTask {
@SuppressWarnings("squid:S135")
@Override
public void run() {
+ long memSerializeTime = 0;
+ boolean noMoreMessages = false;
LOGGER.debug("Storage group {} memtable flushing to file {} starts to encoding data.",
storageGroup, writer.getFile().getName());
while (true) {
-
- Object task = null;
- try {
- task = encodingTaskQueue.take();
- } catch (InterruptedException e1) {
- LOGGER.error("Take task into ioTaskQueue Interrupted");
- Thread.currentThread().interrupt();
- break;
+ if (noMoreEncodingTask) {
+ noMoreMessages = true;
}
- if (task instanceof StartFlushGroupIOTask || task instanceof EndChunkGroupIoTask) {
+ Object task = encodingTaskQueue.poll();
+ if (task == null) {
+ if (noMoreMessages) {
+ break;
+ }
try {
- ioTaskQueue.put(task);
+ TimeUnit.MILLISECONDS.sleep(10);
} catch (@SuppressWarnings("squid:S2142") InterruptedException e) {
LOGGER.error("Storage group {} memtable flushing to file {}, encoding task is interrupted.",
storageGroup, writer.getFile().getName(), e);
// generally it is because the thread pool is shutdown so the task should be aborted
break;
}
- } else if (task instanceof TaskEnd) {
- break;
} else {
- long starTime = System.currentTimeMillis();
- Pair<TVList, MeasurementSchema> encodingMessage = (Pair<TVList, MeasurementSchema>) task;
- IChunkWriter seriesWriter = new ChunkWriterImpl(encodingMessage.right);
- writeOneSeries(encodingMessage.left, seriesWriter, encodingMessage.right.getType());
- seriesWriter.sealCurrentPage();
- seriesWriter.clearPageWriter();
- try {
- ioTaskQueue.put(seriesWriter);
- } catch (InterruptedException e) {
- LOGGER.error("Put task into ioTaskQueue Interrupted");
- Thread.currentThread().interrupt();
+ if (task instanceof StartFlushGroupIOTask || task instanceof EndChunkGroupIoTask) {
+ ioTaskQueue.add(task);
+ } else {
+ long starTime = System.currentTimeMillis();
+ Pair<TVList, MeasurementSchema> encodingMessage = (Pair<TVList, MeasurementSchema>) task;
+ IChunkWriter seriesWriter = new ChunkWriterImpl(encodingMessage.right);
+ writeOneSeries(encodingMessage.left, seriesWriter, encodingMessage.right.getType());
+ seriesWriter.sealCurrentPage();
+ seriesWriter.clearPageWriter();
+ memSerializeTime += System.currentTimeMillis() - starTime;
+ ioTaskQueue.add(seriesWriter);
}
- memSerializeTime += System.currentTimeMillis() - starTime;
}
}
- try {
- ioTaskQueue.put(new TaskEnd());
- } catch (InterruptedException e) {
- LOGGER.error("Put task into ioTaskQueue Interrupted");
- Thread.currentThread().interrupt();
- }
-
- LOGGER.info("Storage group {}, flushing memtable {} into disk: Encoding data cost "
- + "{} ms.",
+ noMoreIOTask = true;
+ LOGGER.info("Storage group {}, flushing memtable into file {}: Encoding data cost "
+ + "{} ms.",
storageGroup, writer.getFile().getName(), memSerializeTime);
}
};
@SuppressWarnings("squid:S135")
private Runnable ioTask = () -> {
+ long ioTime = 0;
+ boolean returnWhenNoTask = false;
LOGGER.debug("Storage group {} memtable flushing to file {} start io.",
storageGroup, writer.getFile().getName());
while (true) {
- Object ioMessage = null;
- try {
- ioMessage = ioTaskQueue.take();
- } catch (InterruptedException e1) {
- LOGGER.error("take task from ioTaskQueue Interrupted");
- Thread.currentThread().interrupt();
- break;
+ if (noMoreIOTask) {
+ returnWhenNoTask = true;
}
- long starTime = System.currentTimeMillis();
- try {
- if (ioMessage instanceof StartFlushGroupIOTask) {
- this.writer.startChunkGroup(((StartFlushGroupIOTask) ioMessage).deviceId);
- } else if (ioMessage instanceof TaskEnd) {
+ Object ioMessage = ioTaskQueue.poll();
+ if (ioMessage == null) {
+ if (returnWhenNoTask) {
+ break;
+ }
+ try {
+ TimeUnit.MILLISECONDS.sleep(10);
+ } catch (@SuppressWarnings("squid:S2142") InterruptedException e) {
+ LOGGER.error("Storage group {} memtable flushing to file {}, io task is interrupted.",
+ storageGroup, writer.getFile().getName());
+ // generally it is because the thread pool is shutdown so the task should be aborted
break;
- } else if (ioMessage instanceof IChunkWriter) {
- ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
- chunkWriter.writeToFileWriter(this.writer);
- } else {
- this.writer.setMinPlanIndex(memTable.getMinPlanIndex());
- this.writer.setMaxPlanIndex(memTable.getMaxPlanIndex());
- this.writer.endChunkGroup();
}
- } catch (IOException e) {
- LOGGER.error("Storage group {} memtable {}, io task meets error.", storageGroup,
- memTable, e);
- throw new FlushRunTimeException(e);
+ } else {
+ long starTime = System.currentTimeMillis();
+ try {
+ if (ioMessage instanceof StartFlushGroupIOTask) {
+// this.writer.startChunkGroup(((StartFlushGroupIOTask) ioMessage).deviceId);
+ } else if (ioMessage instanceof IChunkWriter) {
+ ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
+// chunkWriter.writeToFileWriter(this.writer);
+ } else {
+// this.writer.setMinPlanIndex(memTable.getMinPlanIndex());
+// this.writer.setMaxPlanIndex(memTable.getMaxPlanIndex());
+ this.writer.endChunkGroup();
+ }
+ } catch (IOException e) {
+ LOGGER.error("Storage group {} memtable flushing to file {}, io task meets error.",
+ storageGroup, writer.getFile().getName(), e);
+ throw new FlushRunTimeException(e);
+ }
+ ioTime += System.currentTimeMillis() - starTime;
}
- ioTime += System.currentTimeMillis() - starTime;
}
LOGGER.info("flushing a memtable to file {} in storage group {}, io cost {}ms",
- writer.getFile().getName(), storageGroup, ioTime);
+ writer.getFile().getName(), storageGroup, ioTime);
};
- static class TaskEnd {
-
- TaskEnd() {
-
- }
- }
-
static class EndChunkGroupIoTask {
EndChunkGroupIoTask() {
@@ -300,4 +275,4 @@ public class MemTableFlushTask {
this.deviceId = deviceId;
}
}
-}
+}
\ No newline at end of file