You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/27 07:35:08 UTC
[incubator-iotdb] branch feature_async_close_tsfile updated:
improve naming in MemtableFlushTaskV2 and close in TsFileResourceV2
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
new efdb818 improve naming in MemtableFlushTaskV2 and close in TsFileResourceV2
new da98910 Merge remote-tracking branch 'origin/feature_async_close_tsfile' into feature_async_close_tsfile
efdb818 is described below
commit efdb8184b5ab7805c72f21f698c4f9b859962c4a
Author: qiaojialin <64...@qq.com>
AuthorDate: Thu Jun 27 15:34:19 2019 +0800
improve naming in MemtableFlushTaskV2 and close in TsFileResourceV2
---
.../CopyOnReadLinkedList.java | 2 +-
.../db/engine/filenodeV2/FileNodeProcessorV2.java | 6 +-
.../db/engine/filenodeV2/TsFileResourceV2.java | 13 +---
.../filenodeV2/UnsealedTsFileProcessorV2.java | 2 +-
.../db/engine/memtable/MemTableFlushTaskV2.java | 71 +++++++++++-----------
.../filenodeV2/UnsealedTsFileProcessorV2Test.java | 2 +-
6 files changed, 42 insertions(+), 54 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/CopyOnReadLinkedList.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/CopyOnReadLinkedList.java
similarity index 97%
rename from iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/CopyOnReadLinkedList.java
rename to iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/CopyOnReadLinkedList.java
index e6c8249..4be1116 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/CopyOnReadLinkedList.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/CopyOnReadLinkedList.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.engine.filenode;
+package org.apache.iotdb.db.engine.filenodeV2;
import java.util.ArrayList;
import java.util.Iterator;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index 7c093b8..0f1a46a 100755
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -35,7 +35,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
-import org.apache.iotdb.db.engine.filenode.CopyOnReadLinkedList;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.querycontext.QueryDataSourceV2;
@@ -48,7 +47,6 @@ import org.apache.iotdb.db.exception.UnsealedTsFileProcessorException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
import org.apache.iotdb.db.writelog.recover.TsFileRecoverPerformer;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -67,12 +65,12 @@ public class FileNodeProcessorV2 {
private FileSchema fileSchema;
- // includes sealed and unsealed sequnce tsfiles
+ // includes sealed and unsealed sequence tsfiles
private List<TsFileResourceV2> sequenceFileList = new ArrayList<>();
private UnsealedTsFileProcessorV2 workSequenceTsFileProcessor = null;
private CopyOnReadLinkedList<UnsealedTsFileProcessorV2> closingSequenceTsFileProcessor = new CopyOnReadLinkedList<>();
- // includes sealed and unsealed unsequnce tsfiles
+ // includes sealed and unsealed unSequnce tsfiles
private List<TsFileResourceV2> unSequenceFileList = new ArrayList<>();
private UnsealedTsFileProcessorV2 workUnSequenceTsFileProcessor = null;
private CopyOnReadLinkedList<UnsealedTsFileProcessorV2> closingUnSequenceTsFileProcessor = new CopyOnReadLinkedList<>();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
index 2bfb04f..acbf8c2 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
@@ -95,14 +95,6 @@ public class TsFileResourceV2 {
this.readOnlyMemChunk = readOnlyMemChunk;
}
- public TsFileResourceV2(File file, Map<String, Long> startTimeMap, Map<String, Long> endTimeMap) {
- this.file = file;
- this.startTimeMap = startTimeMap;
- this.endTimeMap = endTimeMap;
- this.closed = true;
- }
-
-
public void serialize() throws IOException {
try (OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(file + RESOURCE_SUFFIX))){
ReadWriteIOUtils.write(this.startTimeMap.size(), outputStream);
@@ -118,7 +110,6 @@ public class TsFileResourceV2 {
}
}
-
public void deSerialize() throws IOException {
try (InputStream inputStream = new BufferedInputStream(new FileInputStream(file + RESOURCE_SUFFIX))) {
int size = ReadWriteIOUtils.readInt(inputStream);
@@ -191,8 +182,8 @@ public class TsFileResourceV2 {
return closed;
}
- public void setClosed(boolean closed) {
- this.closed = closed;
+ public void close() {
+ closed = true;
processor = null;
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
index 2464821..c15fac1 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
@@ -389,7 +389,7 @@ public class UnsealedTsFileProcessorV2 {
}
public void close() throws IOException {
- tsFileResource.setClosed(true);
+ tsFileResource.close();
MultiFileLogNodeManager.getInstance().deleteNode(storageGroupName + "-" + tsFileResource.getFile().getName());
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
index abee737..f933939 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
@@ -15,7 +15,6 @@
package org.apache.iotdb.db.engine.memtable;
import java.io.IOException;
-import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -44,15 +43,15 @@ public class MemTableFlushTaskV2 {
private NativeRestorableIOWriter tsFileIoWriter;
private ConcurrentLinkedQueue ioTaskQueue = new ConcurrentLinkedQueue();
- private ConcurrentLinkedQueue memoryTaskQueue = new ConcurrentLinkedQueue();
+ private ConcurrentLinkedQueue encodingTaskQueue = new ConcurrentLinkedQueue();
private String storageGroup;
private Consumer<IMemTable> flushCallBack;
private IMemTable memTable;
private FileSchema fileSchema;
- private volatile boolean memoryFlushNoMoreTask = false;
- private volatile boolean ioFlushTaskCanStop = false;
+ private volatile boolean noMoreEncodingTask = false;
+ private volatile boolean noMoreIOTask = false;
public MemTableFlushTaskV2(IMemTable memTable, FileSchema fileSchema, NativeRestorableIOWriter writer, String storageGroup,
Consumer<IMemTable> callBack) {
@@ -61,8 +60,8 @@ public class MemTableFlushTaskV2 {
this.tsFileIoWriter = writer;
this.storageGroup = storageGroup;
this.flushCallBack = callBack;
- subTaskPoolManager.submit(encodingTask);
- this.ioFlushTaskFuture = subTaskPoolManager.submit(ioFlushTask);
+ subTaskPoolManager.submit(EncodingTask);
+ this.ioFlushTaskFuture = subTaskPoolManager.submit(IOTask);
LOGGER.info("flush task of Storage group {} memtable {} is created ",
storageGroup, memTable.getVersion());
}
@@ -74,7 +73,7 @@ public class MemTableFlushTaskV2 {
public void flushMemTable() {
long sortTime = 0;
for (String deviceId : memTable.getMemTableMap().keySet()) {
- memoryTaskQueue.add(deviceId);
+ encodingTaskQueue.add(deviceId);
int seriesNumber = memTable.getMemTableMap().get(deviceId).size();
for (String measurementId : memTable.getMemTableMap().get(deviceId).keySet()) {
long startTime = System.currentTimeMillis();
@@ -83,11 +82,11 @@ public class MemTableFlushTaskV2 {
MeasurementSchema desc = fileSchema.getMeasurementSchema(measurementId);
DeduplicatedSortedData sortedTimeValuePairs = series.getDeduplicatedSortedData();
sortTime += System.currentTimeMillis() - startTime;
- memoryTaskQueue.add(new Pair<>(sortedTimeValuePairs, desc));
+ encodingTaskQueue.add(new Pair<>(sortedTimeValuePairs, desc));
}
- memoryTaskQueue.add(new ChunkGroupIoTask(seriesNumber, deviceId, memTable.getVersion()));
+ encodingTaskQueue.add(new ChunkGroupIoTask(seriesNumber, deviceId, memTable.getVersion()));
}
- memoryFlushNoMoreTask = true;
+ noMoreEncodingTask = true;
LOGGER.info(
"Storage group {} memtable {}, flushing into disk: data sort time cost {} ms.",
storageGroup, memTable.getVersion(), sortTime);
@@ -103,27 +102,27 @@ public class MemTableFlushTaskV2 {
}
- private Runnable encodingTask = new Runnable() {
+ private Runnable EncodingTask = new Runnable() {
@Override
public void run() {
try {
long memSerializeTime = 0;
- boolean returnWhenNoTask = false;
- LOGGER.info("Storage group {} memtable {}, starts to serialize data into mem.", storageGroup,
+ boolean noMoreMessages = false;
+ LOGGER.info("Storage group {} memtable {}, starts to encoding data.", storageGroup,
memTable.getVersion());
while (true) {
- if (memoryFlushNoMoreTask) {
- returnWhenNoTask = true;
+ if (noMoreEncodingTask) {
+ noMoreMessages = true;
}
- Object task = memoryTaskQueue.poll();
+ Object task = encodingTaskQueue.poll();
if (task == null) {
- if (returnWhenNoTask) {
+ if (noMoreMessages) {
break;
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
- LOGGER.error("Storage group {} memtable {}, io flush task is interrupted.",
+ LOGGER.error("Storage group {} memtable {}, encoding task is interrupted.",
storageGroup, memTable.getVersion(), e);
}
} else {
@@ -133,16 +132,16 @@ public class MemTableFlushTaskV2 {
ioTaskQueue.add(task);
} else {
long starTime = System.currentTimeMillis();
- Pair<DeduplicatedSortedData, MeasurementSchema> memorySerializeTask = (Pair<DeduplicatedSortedData, MeasurementSchema>) task;
- ChunkBuffer chunkBuffer = new ChunkBuffer(memorySerializeTask.right);
- IChunkWriter seriesWriter = new ChunkWriterImpl(memorySerializeTask.right, chunkBuffer,
+ Pair<DeduplicatedSortedData, MeasurementSchema> encodingMessage = (Pair<DeduplicatedSortedData, MeasurementSchema>) task;
+ ChunkBuffer chunkBuffer = new ChunkBuffer(encodingMessage.right);
+ IChunkWriter seriesWriter = new ChunkWriterImpl(encodingMessage.right, chunkBuffer,
PAGE_SIZE_THRESHOLD);
try {
- writeOneSeries(memorySerializeTask.left, seriesWriter,
- memorySerializeTask.right.getType());
+ writeOneSeries(encodingMessage.left, seriesWriter,
+ encodingMessage.right.getType());
ioTaskQueue.add(seriesWriter);
} catch (IOException e) {
- LOGGER.error("Storage group {} memtable {}, io error.", storageGroup,
+ LOGGER.error("Storage group {} memtable {}, encoding task error.", storageGroup,
memTable.getVersion(), e);
throw new RuntimeException(e);
}
@@ -150,8 +149,8 @@ public class MemTableFlushTaskV2 {
}
}
}
- ioFlushTaskCanStop = true;
- LOGGER.info("Storage group {}, flushing memtable {} into disk: serialize data into mem cost "
+ noMoreIOTask = true;
+ LOGGER.info("Storage group {}, flushing memtable {} into disk: Encoding data cost "
+ "{} ms.",
storageGroup, memTable.getVersion(), memSerializeTime);
} catch (RuntimeException e) {
@@ -163,7 +162,7 @@ public class MemTableFlushTaskV2 {
//TODO a better way is: for each TsFile, assign it a Executors.singleThreadPool,
// rather than per each memtable.
- private Runnable ioFlushTask = new Runnable() {
+ private Runnable IOTask = new Runnable() {
@Override
public void run() {
try {
@@ -171,11 +170,11 @@ public class MemTableFlushTaskV2 {
boolean returnWhenNoTask = false;
LOGGER.info("Storage group {} memtable {}, start io.", storageGroup, memTable.getVersion());
while (true) {
- if (ioFlushTaskCanStop) {
+ if (noMoreIOTask) {
returnWhenNoTask = true;
}
- Object ioTask = ioTaskQueue.poll();
- if (ioTask == null) {
+ Object ioMessage = ioTaskQueue.poll();
+ if (ioMessage == null) {
if (returnWhenNoTask) {
break;
}
@@ -188,12 +187,12 @@ public class MemTableFlushTaskV2 {
} else {
long starTime = System.currentTimeMillis();
try {
- if (ioTask instanceof String) {
- tsFileIoWriter.startChunkGroup((String) ioTask);
- } else if (ioTask instanceof IChunkWriter) {
- ((IChunkWriter) ioTask).writeToFileWriter(tsFileIoWriter);
+ if (ioMessage instanceof String) {
+ tsFileIoWriter.startChunkGroup((String) ioMessage);
+ } else if (ioMessage instanceof IChunkWriter) {
+ ((IChunkWriter) ioMessage).writeToFileWriter(tsFileIoWriter);
} else {
- ChunkGroupIoTask endGroupTask = (ChunkGroupIoTask) ioTask;
+ ChunkGroupIoTask endGroupTask = (ChunkGroupIoTask) ioMessage;
tsFileIoWriter.endChunkGroup(endGroupTask.version);
endGroupTask.finished = true;
}
@@ -208,7 +207,7 @@ public class MemTableFlushTaskV2 {
LOGGER.info("flushing a memtable {} in storage group {}, io cost {}ms", memTable.getVersion(),
storageGroup, ioTime);
} catch (RuntimeException e) {
- LOGGER.error("ioflush thread is dead", e);
+ LOGGER.error("io thread is dead", e);
}
}
};
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
index ce00700..6d7619c 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
@@ -157,7 +157,7 @@ public class UnsealedTsFileProcessorV2Test {
String deviceId = startTime.getKey();
resource.getEndTimeMap().put(deviceId, resource.getStartTimeMap().get(deviceId));
}
- resource.setClosed(true);
+ resource.close(true);
}
}, ()->true);