You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by li...@apache.org on 2021/01/13 08:02:06 UTC

[iotdb] branch darft_serial_flush_task updated: del task thread

This is an automated email from the ASF dual-hosted git repository.

liudw pushed a commit to branch darft_serial_flush_task
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/darft_serial_flush_task by this push:
     new 20e9613  del task thread
20e9613 is described below

commit 20e9613dc3dc8adba2d626da084930261b8a1bbb
Author: liudw <li...@apache.org>
AuthorDate: Wed Jan 13 16:01:26 2021 +0800

    del task thread
---
 .../iotdb/db/engine/flush/MemTableFlushTask.java   | 85 +++++++++++-----------
 1 file changed, 42 insertions(+), 43 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index a403db2..3f1a128 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -61,12 +60,14 @@ public class MemTableFlushTask {
   private volatile boolean noMoreIOTask = false;
 
   /**
-   * @param memTable the memTable to flush
-   * @param writer the writer where memTable will be flushed to (current tsfile writer or vm writer)
+   * @param memTable     the memTable to flush
+   * @param writer       the writer where memTable will be flushed to (current tsfile writer or vm
+   *                     writer)
    * @param storageGroup current storage group
    */
 
-  public MemTableFlushTask(IMemTable memTable, RestorableTsFileIOWriter writer, String storageGroup) {
+  public MemTableFlushTask(IMemTable memTable, RestorableTsFileIOWriter writer,
+      String storageGroup) {
     this.memTable = memTable;
     this.writer = writer;
     this.storageGroup = storageGroup;
@@ -76,49 +77,46 @@ public class MemTableFlushTask {
         storageGroup, memTable.getVersion());
   }
 
-  public void syncSerialFlushMemTable() throws ExecutionException, InterruptedException {
+  public void syncSerialFlushMemTable() {
     LOGGER.info("The memTable size of SG {} is {}, the avg series points num in chunk is {} ",
-        storageGroup,
-        memTable.memSize(),
+        storageGroup, memTable.memSize(),
         memTable.getTotalPointsNum() / memTable.getSeriesNumber());
     long start = System.currentTimeMillis();
-    AtomicLong sortTime = new AtomicLong();
-    AtomicLong memSerializeTime = new AtomicLong();
-
-    CompletableFuture.runAsync(() -> {
-      Set<Entry<String, Map<String, IWritableMemChunk>>> ite = memTable.getMemTableMap().entrySet();
-      try {
-        for (Entry<String, Map<String, IWritableMemChunk>> memTableEntry : ite) {
-          //start flush io
-          this.writer.startChunkGroup(memTableEntry.getKey());
-          final Map<String, IWritableMemChunk> value = memTableEntry.getValue();
-          for (Entry<String, IWritableMemChunk> iWritableMemChunkEntry : value.entrySet()) {
-            long startTime = System.currentTimeMillis();
-            IWritableMemChunk series = iWritableMemChunkEntry.getValue();
-            MeasurementSchema desc = series.getSchema();
-            TVList tvList = series.getSortedTVListForFlush();
-            sortTime.addAndGet(System.currentTimeMillis() - startTime);
-
-            //start flush
-            long starTime = System.currentTimeMillis();
-            IChunkWriter seriesWriter = new ChunkWriterImpl(desc);
-            writeOneSeries(tvList, seriesWriter, desc.getType());
-            seriesWriter.writeToFileWriter(this.writer);
-            memSerializeTime.addAndGet(System.currentTimeMillis() - starTime);
-          }
-          this.writer.setMinPlanIndex(memTable.getMinPlanIndex());
-          this.writer.setMaxPlanIndex(memTable.getMaxPlanIndex());
-          this.writer.endChunkGroup();
-        }
+    long sortTime = 0L;
+    long memSerializeTime = 0L;
 
-        writer.writeVersion(memTable.getVersion());
-        writer.writePlanIndices();
-      } catch (IOException e) {
-        LOGGER.error("Storage group {} memtable {}, io task meets error.", storageGroup,
-            memTable.getVersion(), e);
-        throw new FlushRunTimeException(e);
+    Set<Entry<String, Map<String, IWritableMemChunk>>> ite = memTable.getMemTableMap().entrySet();
+    try {
+      for (Entry<String, Map<String, IWritableMemChunk>> memTableEntry : ite) {
+        //start flush io
+        this.writer.startChunkGroup(memTableEntry.getKey());
+        final Map<String, IWritableMemChunk> value = memTableEntry.getValue();
+        for (Entry<String, IWritableMemChunk> iWritableMemChunkEntry : value.entrySet()) {
+          long startTime = System.currentTimeMillis();
+          IWritableMemChunk series = iWritableMemChunkEntry.getValue();
+          MeasurementSchema desc = series.getSchema();
+          TVList tvList = series.getSortedTVListForFlush();
+          sortTime += (System.currentTimeMillis() - startTime);
+
+          //start flush
+          long starTime = System.currentTimeMillis();
+          IChunkWriter seriesWriter = new ChunkWriterImpl(desc);
+          writeOneSeries(tvList, seriesWriter, desc.getType());
+          seriesWriter.writeToFileWriter(this.writer);
+          memSerializeTime += (System.currentTimeMillis() - starTime);
+        }
+        this.writer.setMinPlanIndex(memTable.getMinPlanIndex());
+        this.writer.setMaxPlanIndex(memTable.getMaxPlanIndex());
+        this.writer.endChunkGroup();
       }
-    }).get();
+
+      writer.writeVersion(memTable.getVersion());
+      writer.writePlanIndices();
+    } catch (IOException e) {
+      LOGGER.error("Storage group {} memtable {}, io task meets error.", storageGroup,
+          memTable.getVersion(), e);
+      throw new FlushRunTimeException(e);
+    }
     noMoreEncodingTask = true;
     LOGGER.debug(
         "Storage group {} memtable {}, flushing into disk: data sort time cost {} ms.",
@@ -142,7 +140,8 @@ public class MemTableFlushTask {
     long sortTime = 0;
 
     //for map do not use get(key) to iteratate
-    for (Map.Entry<String, Map<String, IWritableMemChunk>> memTableEntry : memTable.getMemTableMap().entrySet()) {
+    for (Map.Entry<String, Map<String, IWritableMemChunk>> memTableEntry : memTable.getMemTableMap()
+        .entrySet()) {
       encodingTaskQueue.add(new StartFlushGroupIOTask(memTableEntry.getKey()));
 
       final Map<String, IWritableMemChunk> value = memTableEntry.getValue();