You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/25 04:05:28 UTC
[incubator-iotdb] 02/03: call unsealedtsfileprocessor.asyncclose in
flushandcheckshouldclose in filenodeprocessor
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 009eb869b66eaa03a8ff12847937b199b61d6197
Merge: bc334d1 c3a36d1
Author: qiaojialin <64...@qq.com>
AuthorDate: Tue Jun 25 12:04:42 2019 +0800
call unsealedtsfileprocessor.asyncclose in flushandcheckshouldclose in filenodeprocessor
.../org/apache/iotdb/db/concurrent/ThreadName.java | 1 +
.../db/engine/filenode/FileNodeProcessor.java | 8 +-
.../db/engine/filenodeV2/FileNodeProcessorV2.java | 168 +++++++++---------
.../iotdb/db/engine/filenodeV2/FlushManager.java | 7 +-
.../filenodeV2/UnsealedTsFileProcessorV2.java | 25 +--
.../db/engine/memtable/MemTableFlushTask.java | 36 ++--
.../db/engine/memtable/MemTableFlushTaskV2.java | 191 ++++++++++++---------
...leFlushTaskV2.java => MemTableFlushTaskV3.java} | 166 +++++++++---------
.../db/engine/memtable/MemTableFlushUtil.java | 3 +-
.../iotdb/db/engine/memtable/MemTablePool.java | 2 +-
.../iotdb/db/engine/pool/FlushPoolManager.java | 6 +-
...olManager.java => FlushSubTaskPoolManager.java} | 51 ++----
.../EngineExecutorWithoutTimeGenerator.java | 3 +-
.../db/query/factory/ISeriesReaderFactory.java | 27 ++-
.../db/query/factory/SeriesReaderFactory.java | 6 +-
.../db/query/factory/SeriesReaderFactoryImpl.java | 97 +++++++----
.../java/org/apache/iotdb/db/query/fill/IFill.java | 2 +-
...a => AllDataReaderWithOptGlobalTimeFilter.java} | 11 +-
.../query/reader/AllDataReaderWithValueFilter.java | 74 ++++++++
.../query/timegenerator/EngineNodeConstructor.java | 3 +-
.../iotdb/db/writelog/recover/LogReplayer.java | 13 +-
...rPerformer.java => TsFileRecoverPerformer.java} | 12 +-
.../recover/UnSeqTsFileRecoverPerformer.java | 117 -------------
.../engine/filenodeV2/FileNodeProcessorV2Test.java | 2 +-
.../filenodeV2/FileNodeProcessorV2Test2.java | 109 ------------
.../iotdb/db/integration/IoTDBCompleteIT.java | 2 +
... AllDataReaderWithOptGlobalTimeFilterTest.java} | 10 +-
.../db/writelog/recover/SeqTsFileRecoverTest.java | 5 +-
.../writelog/recover/UnseqTsFileRecoverTest.java | 4 +-
.../apache/iotdb/tsfile/write/TsFileWriter.java | 2 +-
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 6 +-
.../iotdb/tsfile/write/TsFileIOWriterTest.java | 2 +-
32 files changed, 545 insertions(+), 626 deletions(-)
diff --cc iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index 61b25fa,d7f6f6b..a2f2f77
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@@ -481,14 -474,14 +474,12 @@@ public class FileNodeProcessorV2
closingUnSequenceTsFileProcessor.add(unsealedTsFileProcessor);
workUnSequenceTsFileProcessor = null;
}
-- unsealedTsFileProcessor.setCloseMark();
LOGGER.info("The file size {} reaches the threshold, async close tsfile: {}.",
unsealedTsFileProcessor.getTsFileResource().getFileSize(),
unsealedTsFileProcessor.getTsFileResource().getFile().getAbsolutePath());
}
-- unsealedTsFileProcessor.asyncFlush();
--
++ unsealedTsFileProcessor.asyncClose();
}
public void asyncForceClose() {
diff --cc iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
index b8825d1,dcf3ccd..d12f601
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
@@@ -51,55 -57,62 +57,63 @@@ public class MemTableFlushTaskV2
this.tsFileIoWriter = writer;
this.storageGroup = storageGroup;
this.flushCallBack = callBack;
- ioFlushThread.start();
- memoryFlushThread.start();
- LOGGER.info("flush task created in Storage group {} ", storageGroup);
+ this.memoryFlushTaskFuture = subTaskPoolManager.submit(memoryFlushTask);
+ this.ioFlushTaskFuture = subTaskPoolManager.submit(ioFlushTask);
+ LOGGER.info("flush task of Storage group {} memtable {} is created ",
+ storageGroup, memTable.getVersion());
}
+
- private Thread memoryFlushThread = new Thread(() -> {
- long memSerializeTime = 0;
- LOGGER.info("Storage group {},start serialize data into mem.", storageGroup);
- long waitTime = 0;
- while (!stop) {
- Object task = memoryTaskQueue.poll();
- if (task == null) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- LOGGER.error("Storage group {}, io flush task is interrupted.", storageGroup, e);
- }
- waitTime += 10;
- if (waitTime % 1000 == 0) {
- LOGGER.info("encoding thread has waited {}ms", waitTime);
- }
- } else {
- waitTime = 0;
- if (task instanceof String) {
- ioTaskQueue.add(task);
- } else if (task instanceof ChunkGroupIoTask) {
- ioTaskQueue.add(task);
- } else {
- long starTime = System.currentTimeMillis();
- Pair<List<TimeValuePair>, MeasurementSchema> memorySerializeTask = (Pair<List<TimeValuePair>, MeasurementSchema>) task;
- ChunkBuffer chunkBuffer = new ChunkBuffer(memorySerializeTask.right);
- IChunkWriter seriesWriter = new ChunkWriterImpl(memorySerializeTask.right, chunkBuffer,
- PAGE_SIZE_THRESHOLD);
+ private Runnable memoryFlushTask = new Runnable() {
+ @Override
+ public void run() {
+ long memSerializeTime = 0;
+ LOGGER.info("Storage group {} memtable {}, starts to serialize data into mem.", storageGroup,
+ memTable.getVersion());
+ while (!stop) {
+ Object task = memoryTaskQueue.poll();
+ if (task == null) {
try {
- writeOneSeries(memorySerializeTask.left, seriesWriter,
- memorySerializeTask.right.getType());
- ioTaskQueue.add(seriesWriter);
- } catch (IOException e) {
- LOGGER.error("Storage group {}, io error.", storageGroup, e);
- throw new RuntimeException(e);
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ LOGGER.error("Storage group {} memtable {}, io flush task is interrupted.",
+ storageGroup, memTable.getVersion(), e);
+ }
+ } else {
+ if (task instanceof String) {
+ LOGGER.info("Storage group {} memtable {}, issues a start flush chunk group task.",
+ storageGroup, memTable.getVersion());
+ ioTaskQueue.add(task);
+ } else if (task instanceof ChunkGroupIoTask) {
+ LOGGER.info("Storage group {} memtable {}, issues a end flush chunk group task.",
+ storageGroup, memTable.getVersion());
+ ioTaskQueue.add(task);
+ } else {
+ long starTime = System.currentTimeMillis();
+ Pair<List<TimeValuePair>, MeasurementSchema> memorySerializeTask = (Pair<List<TimeValuePair>, MeasurementSchema>) task;
+ ChunkBuffer chunkBuffer = new ChunkBuffer(memorySerializeTask.right);
+ IChunkWriter seriesWriter = new ChunkWriterImpl(memorySerializeTask.right, chunkBuffer,
+ PAGE_SIZE_THRESHOLD);
+ try {
+ writeOneSeries(memorySerializeTask.left, seriesWriter,
+ memorySerializeTask.right.getType());
+ ioTaskQueue.add(seriesWriter);
+ } catch (IOException e) {
+ LOGGER.error("Storage group {} memtable {}, io error.", storageGroup,
+ memTable.getVersion(), e);
+ throw new RuntimeException(e);
+ }
+ LOGGER.info("Storage group {} memtable {}, issues a write chunk task.",
+ storageGroup, memTable.getVersion());
+ memSerializeTime += System.currentTimeMillis() - starTime;
}
- memSerializeTime += System.currentTimeMillis() - starTime;
}
}
+ LOGGER.info("Storage group {}, flushing memtable {} into disk: serialize data into mem cost "
+ + "{} ms.",
+ storageGroup, memTable.getVersion(), memSerializeTime);
}
- LOGGER.info("Storage group {}, flushing a memtable into disk: serialize data into mem cost {} ms.",
- storageGroup, memSerializeTime);
- });
+ };
//TODO a better way is: for each TsFile, assign it a Executors.singleThreadPool,