You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/25 04:05:28 UTC

[incubator-iotdb] 02/03: call unsealedtsfileprocessor.asyncclose in flushandcheckshouldclose in filenodeprocessor

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 009eb869b66eaa03a8ff12847937b199b61d6197
Merge: bc334d1 c3a36d1
Author: qiaojialin <64...@qq.com>
AuthorDate: Tue Jun 25 12:04:42 2019 +0800

    call unsealedtsfileprocessor.asyncclose in flushandcheckshouldclose in filenodeprocessor

 .../org/apache/iotdb/db/concurrent/ThreadName.java |   1 +
 .../db/engine/filenode/FileNodeProcessor.java      |   8 +-
 .../db/engine/filenodeV2/FileNodeProcessorV2.java  | 168 +++++++++---------
 .../iotdb/db/engine/filenodeV2/FlushManager.java   |   7 +-
 .../filenodeV2/UnsealedTsFileProcessorV2.java      |  25 +--
 .../db/engine/memtable/MemTableFlushTask.java      |  36 ++--
 .../db/engine/memtable/MemTableFlushTaskV2.java    | 191 ++++++++++++---------
 ...leFlushTaskV2.java => MemTableFlushTaskV3.java} | 166 +++++++++---------
 .../db/engine/memtable/MemTableFlushUtil.java      |   3 +-
 .../iotdb/db/engine/memtable/MemTablePool.java     |   2 +-
 .../iotdb/db/engine/pool/FlushPoolManager.java     |   6 +-
 ...olManager.java => FlushSubTaskPoolManager.java} |  51 ++----
 .../EngineExecutorWithoutTimeGenerator.java        |   3 +-
 .../db/query/factory/ISeriesReaderFactory.java     |  27 ++-
 .../db/query/factory/SeriesReaderFactory.java      |   6 +-
 .../db/query/factory/SeriesReaderFactoryImpl.java  |  97 +++++++----
 .../java/org/apache/iotdb/db/query/fill/IFill.java |   2 +-
 ...a => AllDataReaderWithOptGlobalTimeFilter.java} |  11 +-
 .../query/reader/AllDataReaderWithValueFilter.java |  74 ++++++++
 .../query/timegenerator/EngineNodeConstructor.java |   3 +-
 .../iotdb/db/writelog/recover/LogReplayer.java     |  13 +-
 ...rPerformer.java => TsFileRecoverPerformer.java} |  12 +-
 .../recover/UnSeqTsFileRecoverPerformer.java       | 117 -------------
 .../engine/filenodeV2/FileNodeProcessorV2Test.java |   2 +-
 .../filenodeV2/FileNodeProcessorV2Test2.java       | 109 ------------
 .../iotdb/db/integration/IoTDBCompleteIT.java      |   2 +
 ... AllDataReaderWithOptGlobalTimeFilterTest.java} |  10 +-
 .../db/writelog/recover/SeqTsFileRecoverTest.java  |   5 +-
 .../writelog/recover/UnseqTsFileRecoverTest.java   |   4 +-
 .../apache/iotdb/tsfile/write/TsFileWriter.java    |   2 +-
 .../iotdb/tsfile/write/writer/TsFileIOWriter.java  |   6 +-
 .../iotdb/tsfile/write/TsFileIOWriterTest.java     |   2 +-
 32 files changed, 545 insertions(+), 626 deletions(-)

diff --cc iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index 61b25fa,d7f6f6b..a2f2f77
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@@ -481,14 -474,14 +474,12 @@@ public class FileNodeProcessorV2 
          closingUnSequenceTsFileProcessor.add(unsealedTsFileProcessor);
          workUnSequenceTsFileProcessor = null;
        }
--      unsealedTsFileProcessor.setCloseMark();
        LOGGER.info("The file size {} reaches the threshold, async close tsfile: {}.",
            unsealedTsFileProcessor.getTsFileResource().getFileSize(),
            unsealedTsFileProcessor.getTsFileResource().getFile().getAbsolutePath());
      }
  
--    unsealedTsFileProcessor.asyncFlush();
--
++    unsealedTsFileProcessor.asyncClose();
    }
  
    public void asyncForceClose() {
diff --cc iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
index b8825d1,dcf3ccd..d12f601
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
@@@ -51,55 -57,62 +57,63 @@@ public class MemTableFlushTaskV2 
      this.tsFileIoWriter = writer;
      this.storageGroup = storageGroup;
      this.flushCallBack = callBack;
-     ioFlushThread.start();
-     memoryFlushThread.start();
-     LOGGER.info("flush task created in Storage group {} ", storageGroup);
+     this.memoryFlushTaskFuture = subTaskPoolManager.submit(memoryFlushTask);
+     this.ioFlushTaskFuture = subTaskPoolManager.submit(ioFlushTask);
+     LOGGER.info("flush task of Storage group {} memtable {} is created ",
+         storageGroup, memTable.getVersion());
    }
  
 +
-   private Thread memoryFlushThread = new Thread(() -> {
-     long memSerializeTime = 0;
-     LOGGER.info("Storage group {},start serialize data into mem.", storageGroup);
-     long waitTime = 0;
-     while (!stop) {
-       Object task = memoryTaskQueue.poll();
-       if (task == null) {
-         try {
-           Thread.sleep(10);
-         } catch (InterruptedException e) {
-           LOGGER.error("Storage group {}, io flush task is interrupted.", storageGroup, e);
-         }
-         waitTime += 10;
-         if (waitTime % 1000 == 0) {
-           LOGGER.info("encoding thread has waited {}ms", waitTime);
-         }
-       } else {
-         waitTime = 0;
-         if (task instanceof String) {
-           ioTaskQueue.add(task);
-         } else if (task instanceof ChunkGroupIoTask) {
-           ioTaskQueue.add(task);
-         } else {
-           long starTime = System.currentTimeMillis();
-           Pair<List<TimeValuePair>, MeasurementSchema> memorySerializeTask = (Pair<List<TimeValuePair>, MeasurementSchema>) task;
-           ChunkBuffer chunkBuffer = new ChunkBuffer(memorySerializeTask.right);
-           IChunkWriter seriesWriter = new ChunkWriterImpl(memorySerializeTask.right, chunkBuffer,
-               PAGE_SIZE_THRESHOLD);
+   private Runnable memoryFlushTask = new Runnable() {
+     @Override
+     public void run() {
+       long memSerializeTime = 0;
+       LOGGER.info("Storage group {} memtable {}, starts to serialize data into mem.", storageGroup,
+           memTable.getVersion());
+       while (!stop) {
+         Object task = memoryTaskQueue.poll();
+         if (task == null) {
            try {
-             writeOneSeries(memorySerializeTask.left, seriesWriter,
-                 memorySerializeTask.right.getType());
-             ioTaskQueue.add(seriesWriter);
-           } catch (IOException e) {
-             LOGGER.error("Storage group {}, io error.", storageGroup, e);
-             throw new RuntimeException(e);
+             Thread.sleep(10);
+           } catch (InterruptedException e) {
+             LOGGER.error("Storage group {} memtable {}, io flush task is interrupted.",
+                 storageGroup, memTable.getVersion(), e);
+           }
+         } else {
+           if (task instanceof String) {
+             LOGGER.info("Storage group {} memtable {}, issues a start flush chunk group task.",
+                 storageGroup, memTable.getVersion());
+             ioTaskQueue.add(task);
+           } else if (task instanceof ChunkGroupIoTask) {
+             LOGGER.info("Storage group {} memtable {}, issues a end flush chunk group task.",
+                 storageGroup, memTable.getVersion());
+             ioTaskQueue.add(task);
+           } else {
+             long starTime = System.currentTimeMillis();
+             Pair<List<TimeValuePair>, MeasurementSchema> memorySerializeTask = (Pair<List<TimeValuePair>, MeasurementSchema>) task;
+             ChunkBuffer chunkBuffer = new ChunkBuffer(memorySerializeTask.right);
+             IChunkWriter seriesWriter = new ChunkWriterImpl(memorySerializeTask.right, chunkBuffer,
+                 PAGE_SIZE_THRESHOLD);
+             try {
+               writeOneSeries(memorySerializeTask.left, seriesWriter,
+                   memorySerializeTask.right.getType());
+               ioTaskQueue.add(seriesWriter);
+             } catch (IOException e) {
+               LOGGER.error("Storage group {} memtable {}, io error.", storageGroup,
+                   memTable.getVersion(), e);
+               throw new RuntimeException(e);
+             }
+             LOGGER.info("Storage group {} memtable {}, issues a write chunk task.",
+                 storageGroup, memTable.getVersion());
+             memSerializeTime += System.currentTimeMillis() - starTime;
            }
-           memSerializeTime += System.currentTimeMillis() - starTime;
          }
        }
+       LOGGER.info("Storage group {}, flushing memtable {} into disk: serialize data into mem cost "
+               + "{} ms.",
+           storageGroup, memTable.getVersion(), memSerializeTime);
      }
-     LOGGER.info("Storage group {}, flushing a memtable into disk: serialize data into mem cost {} ms.",
-         storageGroup, memSerializeTime);
-   });
+   };
  
  
    //TODO a better way is: for each TsFile, assign it a Executors.singleThreadPool,