You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/06/25 01:36:13 UTC
[incubator-iotdb] branch feature_async_close_tsfile updated: modify
mem table flush task
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
new 199c075 modify mem table flush task
199c075 is described below
commit 199c07595ea89fb21389361d6e2bd25a10899d5b
Author: lta <li...@163.com>
AuthorDate: Tue Jun 25 09:35:57 2019 +0800
modify mem table flush task
---
.../db/engine/memtable/MemTableFlushTaskV2.java | 64 +++++++++++++---------
.../db/engine/memtable/MemTableFlushTaskV3.java | 64 +++++++++-------------
.../engine/filenodeV2/FileNodeProcessorV2Test.java | 2 +-
3 files changed, 65 insertions(+), 65 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
index 724b021..43e6a78 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
@@ -17,7 +17,6 @@ package org.apache.iotdb.db.engine.memtable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import org.apache.iotdb.db.engine.pool.FlushSubTaskPoolManager;
@@ -40,8 +39,8 @@ public class MemTableFlushTaskV2 {
private static final int PAGE_SIZE_THRESHOLD = TSFileConfig.pageSizeInByte;
private static final FlushSubTaskPoolManager subTaskPoolManager = FlushSubTaskPoolManager
.getInstance();
- private Future memoryFlushTaskFuture;
- private Future ioFlushTaskFuture;
+ private Future memoryFlushTask;
+ private Future ioFlushTask;
private NativeRestorableIOWriter tsFileIoWriter;
private ConcurrentLinkedQueue ioTaskQueue = new ConcurrentLinkedQueue();
@@ -57,17 +56,21 @@ public class MemTableFlushTaskV2 {
this.tsFileIoWriter = writer;
this.storageGroup = storageGroup;
this.flushCallBack = callBack;
- this.memoryFlushTaskFuture = subTaskPoolManager.submit(memoryFlushTask);
- this.ioFlushTaskFuture = subTaskPoolManager.submit(ioFlushTask);
+// this.memoryFlushTask = subTaskPoolManager.submit(memoryFlushThread);
+
+ memoryFlushThread.start();
+ ioFlushThread.start();
LOGGER.info("flush task created in Storage group {} ", storageGroup);
}
- private Runnable memoryFlushTask = new Runnable() {
- @Override
- public void run() {
+
+ private Thread memoryFlushThread = new Thread(() -> {
long memSerializeTime = 0;
LOGGER.info("Storage group {},start serialize data into mem.", storageGroup);
while (!stop) {
+ if (!memoryTaskQueue.isEmpty()) {
+ LOGGER.info("memory task queue is {}", memoryTaskQueue);
+ }
Object task = memoryTaskQueue.poll();
if (task == null) {
try {
@@ -77,10 +80,10 @@ public class MemTableFlushTaskV2 {
}
} else {
if (task instanceof String) {
+ LOGGER.info("add String {} to io queue", task);
ioTaskQueue.add(task);
- } else if (task instanceof ChunkGroupIoTask) {
- ioTaskQueue.add(task);
- } else {
+ } else if (task instanceof Pair) {
+ LOGGER.info("add chunk writer {}", task);
long starTime = System.currentTimeMillis();
Pair<List<TimeValuePair>, MeasurementSchema> memorySerializeTask = (Pair<List<TimeValuePair>, MeasurementSchema>) task;
ChunkBuffer chunkBuffer = new ChunkBuffer(memorySerializeTask.right);
@@ -95,20 +98,20 @@ public class MemTableFlushTaskV2 {
throw new RuntimeException(e);
}
memSerializeTime += System.currentTimeMillis() - starTime;
+ } else {
+ LOGGER.info("end chunk group {} io task to io task queue", task.toString());
+ ioTaskQueue.add(task);
}
}
}
LOGGER.info("Storage group {}, flushing a memtable into disk: serialize data into mem cost {} ms.",
storageGroup, memSerializeTime);
- }
- };
+ }, Thread.currentThread().getId() + "-1");
//TODO a better way is: for each TsFile, assign it a Executors.singleThreadPool,
// rather than per each memtable.
- private Runnable ioFlushTask = new Runnable() {
- @Override
- public void run() {
+ private Thread ioFlushThread = new Thread(() -> {
long ioTime = 0;
LOGGER.info("Storage group {}, start io cost.", storageGroup);
while (!stop) {
@@ -123,15 +126,17 @@ public class MemTableFlushTaskV2 {
long starTime = System.currentTimeMillis();
try {
if (seriesWriterOrEndChunkGroupTask instanceof IChunkWriter) {
+ LOGGER.info("write series to disk");
((IChunkWriter) seriesWriterOrEndChunkGroupTask).writeToFileWriter(tsFileIoWriter);
} else if (seriesWriterOrEndChunkGroupTask instanceof String) {
+ LOGGER.info("start chunk group");
tsFileIoWriter.startChunkGroup((String) seriesWriterOrEndChunkGroupTask);
} else {
+ LOGGER.info("end chunk group {} io task from task queue", seriesWriterOrEndChunkGroupTask.toString());
+ LOGGER.info("end chunk group");
ChunkGroupIoTask task = (ChunkGroupIoTask) seriesWriterOrEndChunkGroupTask;
tsFileIoWriter.endChunkGroup(task.version);
- synchronized (task){
- task.notify();
- }
+ task.finished = true;
}
} catch (IOException e) {
LOGGER.error("Storage group {}, io error.", storageGroup, e);
@@ -141,8 +146,7 @@ public class MemTableFlushTaskV2 {
}
}
LOGGER.info("flushing a memtable in storage group {}, cost {}ms", storageGroup, ioTime);
- }
- };
+ }, Thread.currentThread().getId() + "-2");
private void writeOneSeries(List<TimeValuePair> tvPairs, IChunkWriter seriesWriterImpl,
@@ -190,9 +194,12 @@ public class MemTableFlushTaskV2 {
long sortTime = 0;
ChunkGroupIoTask theLastTask = EMPTY_TASK;
this.memTable = imemTable;
+ LOGGER.info("Current thread id is {}" , Thread.currentThread().getId());
for (String deviceId : imemTable.getMemTableMap().keySet()) {
memoryTaskQueue.add(deviceId);
int seriesNumber = imemTable.getMemTableMap().get(deviceId).size();
+ LOGGER.info("series number: {}", seriesNumber);
+ LOGGER.info("add device, memory queue {}", memoryTaskQueue);
for (String measurementId : imemTable.getMemTableMap().get(deviceId).keySet()) {
long startTime = System.currentTimeMillis();
// TODO if we can not use TSFileIO writer, then we have to redesign the class of TSFileIO.
@@ -200,17 +207,21 @@ public class MemTableFlushTaskV2 {
MeasurementSchema desc = fileSchema.getMeasurementSchema(measurementId);
List<TimeValuePair> sortedTimeValuePairs = series.getSortedTimeValuePairList();
sortTime += System.currentTimeMillis() - startTime;
+ LOGGER.info("add seies writer in flush thread {}", sortedTimeValuePairs);
memoryTaskQueue.add(new Pair<>(sortedTimeValuePairs, desc));
+ LOGGER.info("add series writer, memory queue {}", memoryTaskQueue);
}
theLastTask = new ChunkGroupIoTask(seriesNumber, deviceId, imemTable.getVersion());
+ LOGGER.info("ChunkGroupIoTask task {}", theLastTask.toString());
memoryTaskQueue.add(theLastTask);
+ LOGGER.info("add chunk group to task, memory queue {}", memoryTaskQueue);
}
LOGGER.info(
"{}, flushing a memtable into disk: data sort time cost {} ms.",
storageGroup, sortTime);
- synchronized (theLastTask){
+ while (!theLastTask.finished) {
try {
- theLastTask.wait();
+ Thread.sleep(10);
} catch (InterruptedException e) {
LOGGER.error("Storage group {}, flush memtable table thread is interrupted.",
storageGroup, e);
@@ -219,10 +230,7 @@ public class MemTableFlushTaskV2 {
}
stop = true;
- try {
- ioFlushTaskFuture.get();
- } catch (InterruptedException | ExecutionException e) {
- LOGGER.error("Waiting for IO flush task end meets error", e);
+ while (ioFlushThread.isAlive()) {
}
LOGGER.info("flushing a memtable finished!");
@@ -235,6 +243,7 @@ public class MemTableFlushTaskV2 {
int seriesNumber;
String deviceId;
long version;
+ volatile boolean finished;
public ChunkGroupIoTask(int seriesNumber, String deviceId, long version) {
this(seriesNumber, deviceId, version, false);
@@ -244,6 +253,7 @@ public class MemTableFlushTaskV2 {
this.seriesNumber = seriesNumber;
this.deviceId = deviceId;
this.version = version;
+ this.finished = finished;
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV3.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV3.java
index 12e00d2..71f7613 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV3.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV3.java
@@ -17,6 +17,7 @@ package org.apache.iotdb.db.engine.memtable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import org.apache.iotdb.db.engine.pool.FlushSubTaskPoolManager;
@@ -39,8 +40,8 @@ public class MemTableFlushTaskV3 {
private static final int PAGE_SIZE_THRESHOLD = TSFileConfig.pageSizeInByte;
private static final FlushSubTaskPoolManager subTaskPoolManager = FlushSubTaskPoolManager
.getInstance();
- private Future memoryFlushTask;
- private Future ioFlushTask;
+ private Future memoryFlushTaskFuture;
+ private Future ioFlushTaskFuture;
private NativeRestorableIOWriter tsFileIoWriter;
private ConcurrentLinkedQueue ioTaskQueue = new ConcurrentLinkedQueue();
@@ -56,21 +57,17 @@ public class MemTableFlushTaskV3 {
this.tsFileIoWriter = writer;
this.storageGroup = storageGroup;
this.flushCallBack = callBack;
-// this.memoryFlushTask = subTaskPoolManager.submit(memoryFlushThread);
-
- memoryFlushThread.start();
- ioFlushThread.start();
+ this.memoryFlushTaskFuture = subTaskPoolManager.submit(memoryFlushTask);
+ this.ioFlushTaskFuture = subTaskPoolManager.submit(ioFlushTask);
LOGGER.info("flush task created in Storage group {} ", storageGroup);
}
-
- private Thread memoryFlushThread = new Thread(() -> {
+ private Runnable memoryFlushTask = new Runnable() {
+ @Override
+ public void run() {
long memSerializeTime = 0;
LOGGER.info("Storage group {},start serialize data into mem.", storageGroup);
while (!stop) {
- if (!memoryTaskQueue.isEmpty()) {
- LOGGER.info("memory task queue is {}", memoryTaskQueue);
- }
Object task = memoryTaskQueue.poll();
if (task == null) {
try {
@@ -80,10 +77,10 @@ public class MemTableFlushTaskV3 {
}
} else {
if (task instanceof String) {
- LOGGER.info("add String {} to io queue", task);
ioTaskQueue.add(task);
- } else if (task instanceof Pair) {
- LOGGER.info("add chunk writer {}", task);
+ } else if (task instanceof ChunkGroupIoTask) {
+ ioTaskQueue.add(task);
+ } else {
long starTime = System.currentTimeMillis();
Pair<List<TimeValuePair>, MeasurementSchema> memorySerializeTask = (Pair<List<TimeValuePair>, MeasurementSchema>) task;
ChunkBuffer chunkBuffer = new ChunkBuffer(memorySerializeTask.right);
@@ -98,20 +95,20 @@ public class MemTableFlushTaskV3 {
throw new RuntimeException(e);
}
memSerializeTime += System.currentTimeMillis() - starTime;
- } else {
- LOGGER.info("end chunk group {} io task to io task queue", task.toString());
- ioTaskQueue.add(task);
}
}
}
LOGGER.info("Storage group {}, flushing a memtable into disk: serialize data into mem cost {} ms.",
storageGroup, memSerializeTime);
- }, Thread.currentThread().getId() + "-1");
+ }
+ };
//TODO a better way is: for each TsFile, assign it a Executors.singleThreadPool,
// rather than per each memtable.
- private Thread ioFlushThread = new Thread(() -> {
+ private Runnable ioFlushTask = new Runnable() {
+ @Override
+ public void run() {
long ioTime = 0;
LOGGER.info("Storage group {}, start io cost.", storageGroup);
while (!stop) {
@@ -126,17 +123,15 @@ public class MemTableFlushTaskV3 {
long starTime = System.currentTimeMillis();
try {
if (seriesWriterOrEndChunkGroupTask instanceof IChunkWriter) {
- LOGGER.info("write series to disk");
((IChunkWriter) seriesWriterOrEndChunkGroupTask).writeToFileWriter(tsFileIoWriter);
} else if (seriesWriterOrEndChunkGroupTask instanceof String) {
- LOGGER.info("start chunk group");
tsFileIoWriter.startChunkGroup((String) seriesWriterOrEndChunkGroupTask);
} else {
- LOGGER.info("end chunk group {} io task from task queue", seriesWriterOrEndChunkGroupTask.toString());
- LOGGER.info("end chunk group");
ChunkGroupIoTask task = (ChunkGroupIoTask) seriesWriterOrEndChunkGroupTask;
tsFileIoWriter.endChunkGroup(task.version);
- task.finished = true;
+ synchronized (task){
+ task.notify();
+ }
}
} catch (IOException e) {
LOGGER.error("Storage group {}, io error.", storageGroup, e);
@@ -146,7 +141,8 @@ public class MemTableFlushTaskV3 {
}
}
LOGGER.info("flushing a memtable in storage group {}, cost {}ms", storageGroup, ioTime);
- }, Thread.currentThread().getId() + "-2");
+ }
+ };
private void writeOneSeries(List<TimeValuePair> tvPairs, IChunkWriter seriesWriterImpl,
@@ -194,12 +190,9 @@ public class MemTableFlushTaskV3 {
long sortTime = 0;
ChunkGroupIoTask theLastTask = EMPTY_TASK;
this.memTable = imemTable;
- LOGGER.info("Current thread id is {}" , Thread.currentThread().getId());
for (String deviceId : imemTable.getMemTableMap().keySet()) {
memoryTaskQueue.add(deviceId);
int seriesNumber = imemTable.getMemTableMap().get(deviceId).size();
- LOGGER.info("series number: {}", seriesNumber);
- LOGGER.info("add device, memory queue {}", memoryTaskQueue);
for (String measurementId : imemTable.getMemTableMap().get(deviceId).keySet()) {
long startTime = System.currentTimeMillis();
// TODO if we can not use TSFileIO writer, then we have to redesign the class of TSFileIO.
@@ -207,21 +200,17 @@ public class MemTableFlushTaskV3 {
MeasurementSchema desc = fileSchema.getMeasurementSchema(measurementId);
List<TimeValuePair> sortedTimeValuePairs = series.getSortedTimeValuePairList();
sortTime += System.currentTimeMillis() - startTime;
- LOGGER.info("add seies writer in flush thread {}", sortedTimeValuePairs);
memoryTaskQueue.add(new Pair<>(sortedTimeValuePairs, desc));
- LOGGER.info("add series writer, memory queue {}", memoryTaskQueue);
}
theLastTask = new ChunkGroupIoTask(seriesNumber, deviceId, imemTable.getVersion());
- LOGGER.info("ChunkGroupIoTask task {}", theLastTask.toString());
memoryTaskQueue.add(theLastTask);
- LOGGER.info("add chunk group to task, memory queue {}", memoryTaskQueue);
}
LOGGER.info(
"{}, flushing a memtable into disk: data sort time cost {} ms.",
storageGroup, sortTime);
- while (!theLastTask.finished) {
+ synchronized (theLastTask){
try {
- Thread.sleep(10);
+ theLastTask.wait();
} catch (InterruptedException e) {
LOGGER.error("Storage group {}, flush memtable table thread is interrupted.",
storageGroup, e);
@@ -230,7 +219,10 @@ public class MemTableFlushTaskV3 {
}
stop = true;
- while (ioFlushThread.isAlive()) {
+ try {
+ ioFlushTaskFuture.get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOGGER.error("Waiting for IO flush task end meets error", e);
}
LOGGER.info("flushing a memtable finished!");
@@ -243,7 +235,6 @@ public class MemTableFlushTaskV3 {
int seriesNumber;
String deviceId;
long version;
- volatile boolean finished;
public ChunkGroupIoTask(int seriesNumber, String deviceId, long version) {
this(seriesNumber, deviceId, version, false);
@@ -253,7 +244,6 @@ public class MemTableFlushTaskV3 {
this.seriesNumber = seriesNumber;
this.deviceId = deviceId;
this.version = version;
- this.finished = finished;
}
}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
index a3915aa..7f59f19 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
@@ -61,9 +61,9 @@ public class FileNodeProcessorV2Test {
processor.asyncForceClose();
}
- System.out.println("reach");
processor.syncCloseFileNode();
+ System.out.println("reach");
QueryDataSourceV2 queryDataSource = null;
try {
queryDataSource = processor.query(deviceId, measurementId);