You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by li...@apache.org on 2021/01/13 08:02:06 UTC
[iotdb] branch darft_serial_flush_task updated: del task thread
This is an automated email from the ASF dual-hosted git repository.
liudw pushed a commit to branch darft_serial_flush_task
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/darft_serial_flush_task by this push:
new 20e9613 del task thread
20e9613 is described below
commit 20e9613dc3dc8adba2d626da084930261b8a1bbb
Author: liudw <li...@apache.org>
AuthorDate: Wed Jan 13 16:01:26 2021 +0800
del task thread
---
.../iotdb/db/engine/flush/MemTableFlushTask.java | 85 +++++++++++-----------
1 file changed, 42 insertions(+), 43 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index a403db2..3f1a128 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -61,12 +60,14 @@ public class MemTableFlushTask {
private volatile boolean noMoreIOTask = false;
/**
- * @param memTable the memTable to flush
- * @param writer the writer where memTable will be flushed to (current tsfile writer or vm writer)
+ * @param memTable the memTable to flush
+ * @param writer the writer where memTable will be flushed to (current tsfile writer or vm
+ * writer)
* @param storageGroup current storage group
*/
- public MemTableFlushTask(IMemTable memTable, RestorableTsFileIOWriter writer, String storageGroup) {
+ public MemTableFlushTask(IMemTable memTable, RestorableTsFileIOWriter writer,
+ String storageGroup) {
this.memTable = memTable;
this.writer = writer;
this.storageGroup = storageGroup;
@@ -76,49 +77,46 @@ public class MemTableFlushTask {
storageGroup, memTable.getVersion());
}
- public void syncSerialFlushMemTable() throws ExecutionException, InterruptedException {
+ public void syncSerialFlushMemTable() {
LOGGER.info("The memTable size of SG {} is {}, the avg series points num in chunk is {} ",
- storageGroup,
- memTable.memSize(),
+ storageGroup, memTable.memSize(),
memTable.getTotalPointsNum() / memTable.getSeriesNumber());
long start = System.currentTimeMillis();
- AtomicLong sortTime = new AtomicLong();
- AtomicLong memSerializeTime = new AtomicLong();
-
- CompletableFuture.runAsync(() -> {
- Set<Entry<String, Map<String, IWritableMemChunk>>> ite = memTable.getMemTableMap().entrySet();
- try {
- for (Entry<String, Map<String, IWritableMemChunk>> memTableEntry : ite) {
- //start flush io
- this.writer.startChunkGroup(memTableEntry.getKey());
- final Map<String, IWritableMemChunk> value = memTableEntry.getValue();
- for (Entry<String, IWritableMemChunk> iWritableMemChunkEntry : value.entrySet()) {
- long startTime = System.currentTimeMillis();
- IWritableMemChunk series = iWritableMemChunkEntry.getValue();
- MeasurementSchema desc = series.getSchema();
- TVList tvList = series.getSortedTVListForFlush();
- sortTime.addAndGet(System.currentTimeMillis() - startTime);
-
- //start flush
- long starTime = System.currentTimeMillis();
- IChunkWriter seriesWriter = new ChunkWriterImpl(desc);
- writeOneSeries(tvList, seriesWriter, desc.getType());
- seriesWriter.writeToFileWriter(this.writer);
- memSerializeTime.addAndGet(System.currentTimeMillis() - starTime);
- }
- this.writer.setMinPlanIndex(memTable.getMinPlanIndex());
- this.writer.setMaxPlanIndex(memTable.getMaxPlanIndex());
- this.writer.endChunkGroup();
- }
+ long sortTime = 0L;
+ long memSerializeTime = 0L;
- writer.writeVersion(memTable.getVersion());
- writer.writePlanIndices();
- } catch (IOException e) {
- LOGGER.error("Storage group {} memtable {}, io task meets error.", storageGroup,
- memTable.getVersion(), e);
- throw new FlushRunTimeException(e);
+ Set<Entry<String, Map<String, IWritableMemChunk>>> ite = memTable.getMemTableMap().entrySet();
+ try {
+ for (Entry<String, Map<String, IWritableMemChunk>> memTableEntry : ite) {
+ //start flush io
+ this.writer.startChunkGroup(memTableEntry.getKey());
+ final Map<String, IWritableMemChunk> value = memTableEntry.getValue();
+ for (Entry<String, IWritableMemChunk> iWritableMemChunkEntry : value.entrySet()) {
+ long startTime = System.currentTimeMillis();
+ IWritableMemChunk series = iWritableMemChunkEntry.getValue();
+ MeasurementSchema desc = series.getSchema();
+ TVList tvList = series.getSortedTVListForFlush();
+ sortTime += (System.currentTimeMillis() - startTime);
+
+ //start flush
+ long starTime = System.currentTimeMillis();
+ IChunkWriter seriesWriter = new ChunkWriterImpl(desc);
+ writeOneSeries(tvList, seriesWriter, desc.getType());
+ seriesWriter.writeToFileWriter(this.writer);
+ memSerializeTime += (System.currentTimeMillis() - starTime);
+ }
+ this.writer.setMinPlanIndex(memTable.getMinPlanIndex());
+ this.writer.setMaxPlanIndex(memTable.getMaxPlanIndex());
+ this.writer.endChunkGroup();
}
- }).get();
+
+ writer.writeVersion(memTable.getVersion());
+ writer.writePlanIndices();
+ } catch (IOException e) {
+ LOGGER.error("Storage group {} memtable {}, io task meets error.", storageGroup,
+ memTable.getVersion(), e);
+ throw new FlushRunTimeException(e);
+ }
noMoreEncodingTask = true;
LOGGER.debug(
"Storage group {} memtable {}, flushing into disk: data sort time cost {} ms.",
@@ -142,7 +140,8 @@ public class MemTableFlushTask {
long sortTime = 0;
//for map do not use get(key) to iteratate
- for (Map.Entry<String, Map<String, IWritableMemChunk>> memTableEntry : memTable.getMemTableMap().entrySet()) {
+ for (Map.Entry<String, Map<String, IWritableMemChunk>> memTableEntry : memTable.getMemTableMap()
+ .entrySet()) {
encodingTaskQueue.add(new StartFlushGroupIOTask(memTableEntry.getKey()));
final Map<String, IWritableMemChunk> value = memTableEntry.getValue();