You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/25 07:01:58 UTC
[incubator-iotdb] branch feature_async_close_tsfile updated: fix
memtable flush task null pointer bug
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
new b029d1f fix memtable flush task null pointer bug
b029d1f is described below
commit b029d1ffb787854586239aa14ff95e40bbf9a94f
Author: qiaojialin <64...@qq.com>
AuthorDate: Tue Jun 25 15:01:44 2019 +0800
fix memtable flush task null pointer bug
---
.../filenodeV2/UnsealedTsFileProcessorV2.java | 4 +-
.../db/engine/memtable/MemTableFlushTaskV2.java | 202 +++++++++++----------
.../engine/memtable/MemTableFlushTaskV2Test.java | 4 +-
3 files changed, 110 insertions(+), 100 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
index f16e0ab..b3074a9 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
@@ -296,9 +296,9 @@ public class UnsealedTsFileProcessorV2 {
// null memtable only appears when calling asyncClose()
if (memTableToFlush.isManagedByMemPool()) {
- MemTableFlushTaskV2 flushTask = new MemTableFlushTaskV2(writer, storageGroupName,
+ MemTableFlushTaskV2 flushTask = new MemTableFlushTaskV2(memTableToFlush, fileSchema, writer, storageGroupName,
this::releaseFlushedMemTableCallback);
- flushTask.flushMemTable(fileSchema, memTableToFlush);
+ flushTask.flushMemTable();
MemTablePool.getInstance().putBack(memTableToFlush, storageGroupName);
logNode.notifyEndFlush();
LOGGER.info("flush a memtable has finished");
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
index a66809d..cf700fb 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
@@ -49,12 +49,15 @@ public class MemTableFlushTaskV2 {
private Consumer<IMemTable> flushCallBack;
private IMemTable memTable;
+ private FileSchema fileSchema;
private boolean memoryFlushNoMoreTask = false;
private boolean ioFlushTaskCanStop = false;
- public MemTableFlushTaskV2(NativeRestorableIOWriter writer, String storageGroup,
+ public MemTableFlushTaskV2(IMemTable memTable, FileSchema fileSchema, NativeRestorableIOWriter writer, String storageGroup,
Consumer<IMemTable> callBack) {
+ this.memTable = memTable;
+ this.fileSchema = fileSchema;
this.tsFileIoWriter = writer;
this.storageGroup = storageGroup;
this.flushCallBack = callBack;
@@ -68,59 +71,63 @@ public class MemTableFlushTaskV2 {
private Runnable memoryFlushTask = new Runnable() {
@Override
public void run() {
- long memSerializeTime = 0;
- boolean returnWhenNoTask = false;
- LOGGER.info("Storage group {} memtable {}, starts to serialize data into mem.", storageGroup,
- memTable.getVersion());
- while (true) {
- if (memoryFlushNoMoreTask) {
- returnWhenNoTask = true;
- }
- Object task = memoryTaskQueue.poll();
- if (task == null) {
- if (returnWhenNoTask) {
- break;
- }
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- LOGGER.error("Storage group {} memtable {}, io flush task is interrupted.",
- storageGroup, memTable.getVersion(), e);
+ try {
+ long memSerializeTime = 0;
+ boolean returnWhenNoTask = false;
+ LOGGER.info("Storage group {} memtable {}, starts to serialize data into mem.", storageGroup,
+ memTable.getVersion());
+ while (true) {
+ if (memoryFlushNoMoreTask) {
+ returnWhenNoTask = true;
}
- } else {
- if (task instanceof String) {
- LOGGER.info("Storage group {} memtable {}, issues a start flush chunk group task.",
- storageGroup, memTable.getVersion());
- ioTaskQueue.add(task);
- } else if (task instanceof ChunkGroupIoTask) {
- LOGGER.info("Storage group {} memtable {}, issues a end flush chunk group task.",
- storageGroup, memTable.getVersion());
- ioTaskQueue.add(task);
- } else {
- long starTime = System.currentTimeMillis();
- Pair<List<TimeValuePair>, MeasurementSchema> memorySerializeTask = (Pair<List<TimeValuePair>, MeasurementSchema>) task;
- ChunkBuffer chunkBuffer = new ChunkBuffer(memorySerializeTask.right);
- IChunkWriter seriesWriter = new ChunkWriterImpl(memorySerializeTask.right, chunkBuffer,
- PAGE_SIZE_THRESHOLD);
+ Object task = memoryTaskQueue.poll();
+ if (task == null) {
+ if (returnWhenNoTask) {
+ break;
+ }
try {
- writeOneSeries(memorySerializeTask.left, seriesWriter,
- memorySerializeTask.right.getType());
- ioTaskQueue.add(seriesWriter);
- } catch (IOException e) {
- LOGGER.error("Storage group {} memtable {}, io error.", storageGroup,
- memTable.getVersion(), e);
- throw new RuntimeException(e);
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ LOGGER.error("Storage group {} memtable {}, io flush task is interrupted.",
+ storageGroup, memTable.getVersion(), e);
+ }
+ } else {
+ if (task instanceof String) {
+ LOGGER.info("Storage group {} memtable {}, issues a start flush chunk group task.",
+ storageGroup, memTable.getVersion());
+ ioTaskQueue.add(task);
+ } else if (task instanceof ChunkGroupIoTask) {
+ LOGGER.info("Storage group {} memtable {}, issues a end flush chunk group task.",
+ storageGroup, memTable.getVersion());
+ ioTaskQueue.add(task);
+ } else {
+ long starTime = System.currentTimeMillis();
+ Pair<List<TimeValuePair>, MeasurementSchema> memorySerializeTask = (Pair<List<TimeValuePair>, MeasurementSchema>) task;
+ ChunkBuffer chunkBuffer = new ChunkBuffer(memorySerializeTask.right);
+ IChunkWriter seriesWriter = new ChunkWriterImpl(memorySerializeTask.right, chunkBuffer,
+ PAGE_SIZE_THRESHOLD);
+ try {
+ writeOneSeries(memorySerializeTask.left, seriesWriter,
+ memorySerializeTask.right.getType());
+ ioTaskQueue.add(seriesWriter);
+ } catch (IOException e) {
+ LOGGER.error("Storage group {} memtable {}, io error.", storageGroup,
+ memTable.getVersion(), e);
+ throw new RuntimeException(e);
+ }
+ LOGGER.info("Storage group {} memtable {}, issues a write chunk task.",
+ storageGroup, memTable.getVersion());
+ memSerializeTime += System.currentTimeMillis() - starTime;
}
- LOGGER.info("Storage group {} memtable {}, issues a write chunk task.",
- storageGroup, memTable.getVersion());
- memSerializeTime += System.currentTimeMillis() - starTime;
}
}
+ ioFlushTaskCanStop = true;
+ LOGGER.info("Storage group {}, flushing memtable {} into disk: serialize data into mem cost "
+ + "{} ms.",
+ storageGroup, memTable.getVersion(), memSerializeTime);
+ } catch (RuntimeException e) {
+ LOGGER.error("memoryFlush thread is dead", e);
}
- ioFlushTaskCanStop = true;
- LOGGER.info("Storage group {}, flushing memtable {} into disk: serialize data into mem cost "
- + "{} ms.",
- storageGroup, memTable.getVersion(), memSerializeTime);
}
};
@@ -130,52 +137,56 @@ public class MemTableFlushTaskV2 {
private Runnable ioFlushTask = new Runnable() {
@Override
public void run() {
- long ioTime = 0;
- boolean returnWhenNoTask = false;
- LOGGER.info("Storage group {} memtable {}, start io.", storageGroup, memTable.getVersion());
- while (true) {
- if (ioFlushTaskCanStop) {
- returnWhenNoTask = true;
- }
- Object seriesWriterOrEndChunkGroupTask = ioTaskQueue.poll();
- if (seriesWriterOrEndChunkGroupTask == null) {
- if (returnWhenNoTask) {
- break;
+ try {
+ long ioTime = 0;
+ boolean returnWhenNoTask = false;
+ LOGGER.info("Storage group {} memtable {}, start io.", storageGroup, memTable.getVersion());
+ while (true) {
+ if (ioFlushTaskCanStop) {
+ returnWhenNoTask = true;
}
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- LOGGER.error("Storage group {} memtable, io flush task is interrupted.", storageGroup
- , memTable.getVersion(), e);
- }
- } else {
- long starTime = System.currentTimeMillis();
- try {
- if (seriesWriterOrEndChunkGroupTask instanceof IChunkWriter) {
- LOGGER.info("Storage group {} memtable {}, writing a series to file.", storageGroup,
- memTable.getVersion());
- ((IChunkWriter) seriesWriterOrEndChunkGroupTask).writeToFileWriter(tsFileIoWriter);
- } else if (seriesWriterOrEndChunkGroupTask instanceof String) {
- LOGGER.info("Storage group {} memtable {}, start a chunk group.", storageGroup,
- memTable.getVersion());
- tsFileIoWriter.startChunkGroup((String) seriesWriterOrEndChunkGroupTask);
- } else {
- LOGGER.info("Storage group {} memtable {}, end a chunk group.", storageGroup,
- memTable.getVersion());
- ChunkGroupIoTask task = (ChunkGroupIoTask) seriesWriterOrEndChunkGroupTask;
- tsFileIoWriter.endChunkGroup(task.version);
- task.finished = true;
+ Object seriesWriterOrEndChunkGroupTask = ioTaskQueue.poll();
+ if (seriesWriterOrEndChunkGroupTask == null) {
+ if (returnWhenNoTask) {
+ break;
+ }
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ LOGGER.error("Storage group {} memtable, io flush task is interrupted.", storageGroup
+ , memTable.getVersion(), e);
+ }
+ } else {
+ long starTime = System.currentTimeMillis();
+ try {
+ if (seriesWriterOrEndChunkGroupTask instanceof IChunkWriter) {
+ LOGGER.info("Storage group {} memtable {}, writing a series to file.", storageGroup,
+ memTable.getVersion());
+ ((IChunkWriter) seriesWriterOrEndChunkGroupTask).writeToFileWriter(tsFileIoWriter);
+ } else if (seriesWriterOrEndChunkGroupTask instanceof String) {
+ LOGGER.info("Storage group {} memtable {}, start a chunk group.", storageGroup,
+ memTable.getVersion());
+ tsFileIoWriter.startChunkGroup((String) seriesWriterOrEndChunkGroupTask);
+ } else {
+ LOGGER.info("Storage group {} memtable {}, end a chunk group.", storageGroup,
+ memTable.getVersion());
+ ChunkGroupIoTask task = (ChunkGroupIoTask) seriesWriterOrEndChunkGroupTask;
+ tsFileIoWriter.endChunkGroup(task.version);
+ task.finished = true;
+ }
+ } catch (IOException e) {
+ LOGGER.error("Storage group {} memtable {}, io error.", storageGroup,
+ memTable.getVersion(), e);
+ throw new RuntimeException(e);
}
- } catch (IOException e) {
- LOGGER.error("Storage group {} memtable {}, io error.", storageGroup,
- memTable.getVersion(), e);
- throw new RuntimeException(e);
+ ioTime += System.currentTimeMillis() - starTime;
}
- ioTime += System.currentTimeMillis() - starTime;
}
+ LOGGER.info("flushing a memtable {} in storage group {}, cost {}ms", memTable.getVersion(),
+ storageGroup, ioTime);
+ } catch (RuntimeException e) {
+ LOGGER.error("ioflush thread is dead", e);
}
- LOGGER.info("flushing a memtable {} in storage group {}, cost {}ms", memTable.getVersion(),
- storageGroup, ioTime);
}
};
@@ -221,23 +232,22 @@ public class MemTableFlushTaskV2 {
/**
* the function for flushing memtable.
*/
- public void flushMemTable(FileSchema fileSchema, IMemTable imemTable) {
+ public void flushMemTable() {
long sortTime = 0;
ChunkGroupIoTask theLastTask = EMPTY_TASK;
- this.memTable = imemTable;
- for (String deviceId : imemTable.getMemTableMap().keySet()) {
+ for (String deviceId : memTable.getMemTableMap().keySet()) {
memoryTaskQueue.add(deviceId);
- int seriesNumber = imemTable.getMemTableMap().get(deviceId).size();
- for (String measurementId : imemTable.getMemTableMap().get(deviceId).keySet()) {
+ int seriesNumber = memTable.getMemTableMap().get(deviceId).size();
+ for (String measurementId : memTable.getMemTableMap().get(deviceId).keySet()) {
long startTime = System.currentTimeMillis();
// TODO if we can not use TSFileIO writer, then we have to redesign the class of TSFileIO.
- IWritableMemChunk series = imemTable.getMemTableMap().get(deviceId).get(measurementId);
+ IWritableMemChunk series = memTable.getMemTableMap().get(deviceId).get(measurementId);
MeasurementSchema desc = fileSchema.getMeasurementSchema(measurementId);
List<TimeValuePair> sortedTimeValuePairs = series.getSortedTimeValuePairList();
sortTime += System.currentTimeMillis() - startTime;
memoryTaskQueue.add(new Pair<>(sortedTimeValuePairs, desc));
}
- theLastTask = new ChunkGroupIoTask(seriesNumber, deviceId, imemTable.getVersion());
+ theLastTask = new ChunkGroupIoTask(seriesNumber, deviceId, memTable.getVersion());
memoryTaskQueue.add(theLastTask);
}
memoryFlushNoMoreTask = true;
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2Test.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2Test.java
index 074317d..2566fa3 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2Test.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2Test.java
@@ -59,7 +59,7 @@ public class MemTableFlushTaskV2Test {
MemTableTestUtils.produceData(memTable, startTime, endTime, MemTableTestUtils.deviceId0,
MemTableTestUtils.measurementId0,
MemTableTestUtils.dataType0);
- MemTableFlushTaskV2 memTableFlushTask = new MemTableFlushTaskV2(writer, storageGroup,
+ MemTableFlushTaskV2 memTableFlushTask = new MemTableFlushTaskV2(memTable, MemTableTestUtils.getFileSchema(), writer, storageGroup,
memtable -> {
writer.makeMetadataVisible();
MemTablePool.getInstance().putBack(memtable, storageGroup);
@@ -67,7 +67,7 @@ public class MemTableFlushTaskV2Test {
assertTrue(writer
.getVisibleMetadatas(MemTableTestUtils.deviceId0, MemTableTestUtils.measurementId0,
MemTableTestUtils.dataType0).isEmpty());
- memTableFlushTask.flushMemTable(MemTableTestUtils.getFileSchema(), memTable);
+ memTableFlushTask.flushMemTable();
assertEquals(1, writer
.getVisibleMetadatas(MemTableTestUtils.deviceId0, MemTableTestUtils.measurementId0,
MemTableTestUtils.dataType0).size());