You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/19 03:39:45 UTC
[incubator-iotdb] branch feature_async_close_tsfile updated: add
force flush
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
new 8c82ad9 add force flush
8c82ad9 is described below
commit 8c82ad915d500838c08affd8c523c33bfb774538
Author: qiaojialin <64...@qq.com>
AuthorDate: Wed Jun 19 11:39:33 2019 +0800
add force flush
---
.../bufferwriteV2/BufferWriteProcessorV2.java | 42 +++++++++------
.../db/engine/bufferwriteV2/FlushManager.java | 7 ++-
.../db/engine/filenodeV2/FileNodeProcessorV2.java | 60 ++++++++++++++++------
.../db/engine/filenodeV2/TsFileResourceV2.java | 1 +
4 files changed, 76 insertions(+), 34 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/BufferWriteProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/BufferWriteProcessorV2.java
index 3ecf5e7..b5c97d8 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/BufferWriteProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/BufferWriteProcessorV2.java
@@ -52,12 +52,12 @@ public class BufferWriteProcessorV2 {
private volatile boolean managedByFlushManager;
- private ReadWriteLock lock = new ReentrantReadWriteLock();
+ private ReadWriteLock flushQueryLock = new ReentrantReadWriteLock();
/**
* true: to be closed
*/
- private volatile boolean closing;
+ private volatile boolean shouldClose;
private IMemTable workMemTable;
@@ -114,12 +114,12 @@ public class BufferWriteProcessorV2 {
* @param memTable
*/
private void removeFlushedMemTable(Object memTable) {
- lock.writeLock().lock();
- writer.makeMetadataVisible();
+ flushQueryLock.writeLock().lock();
try {
+ writer.makeMetadataVisible();
flushingMemTables.remove(memTable);
} finally {
- lock.writeLock().unlock();
+ flushQueryLock.writeLock().unlock();
}
}
@@ -137,23 +137,26 @@ public class BufferWriteProcessorV2 {
workMemTable = null;
}
- public void flushOneMemTable() {
+ public void flushOneMemTable() throws IOException {
IMemTable memTableToFlush = flushingMemTables.pollFirst();
- MemTableFlushTaskV2 flushTask = new MemTableFlushTaskV2(writer, storageGroupName, this::removeFlushedMemTable);
- flushTask.flushMemTable(fileSchema, memTableToFlush, versionController.nextVersion());
-
- if (closing && flushingMemTables.isEmpty()) {
+ // null memtable only appears when calling forceClose()
+ if (memTableToFlush != null) {
+ MemTableFlushTaskV2 flushTask = new MemTableFlushTaskV2(writer, storageGroupName, this::removeFlushedMemTable);
+ flushTask.flushMemTable(fileSchema, memTableToFlush, versionController.nextVersion());
+ }
+ if (shouldClose && flushingMemTables.isEmpty()) {
+ endFile();
}
}
- public void close() throws IOException {
+ private void endFile() throws IOException {
long closeStartTime = System.currentTimeMillis();
writer.endFile(fileSchema);
//FIXME suppose the flushMetadata-thread-pool is 2.
- // then if a flushMetadata task and a close task are running in the same time
- // and the close task is faster, then writer == null, and the flushMetadata task will throw nullpointer
- // exception. Add "synchronized" keyword on both flushMetadata and close may solve the issue.
+ // then if a flushMetadata task and a endFile task are running in the same time
+ // and the endFile task is faster, then writer == null, and the flushMetadata task will throw nullpointer
+ // exception. Add "synchronized" keyword on both flushMetadata and endFile may solve the issue.
writer = null;
// remove this processor from Closing list in FileNodeProcessor
@@ -174,14 +177,21 @@ public class BufferWriteProcessorV2 {
}
}
+ public void forceClose() {
+ flushingMemTables.add(workMemTable);
+ workMemTable = null;
+ shouldClose = true;
+ FlushManager.getInstance().registerBWProcessor(this);
+ }
+
public boolean shouldClose() {
long fileSize = tsFileResource.getFileSize();
long fileSizeThreshold = IoTDBDescriptor.getInstance().getConfig().getBufferwriteFileSizeThreshold();
return fileSize > fileSizeThreshold;
}
- public void setClosing() {
- closing = true;
+ public void close() {
+ shouldClose = true;
}
public boolean isManagedByFlushManager() {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/FlushManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/FlushManager.java
index 085dbfe..b8345ce 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/FlushManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/FlushManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.engine.bufferwriteV2;
+import java.io.IOException;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.iotdb.db.engine.pool.FlushPoolManager;
@@ -29,7 +30,11 @@ public class FlushManager {
private Runnable flushThread = () -> {
BufferWriteProcessorV2 bwProcessor = bwProcessorQueue.poll();
- bwProcessor.flushOneMemTable();
+ try {
+ bwProcessor.flushOneMemTable();
+ } catch (IOException e) {
+ // TODO do sth
+ }
bwProcessor.setManagedByFlushManager(false);
registerBWProcessor(bwProcessor);
};
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index ef46166..1bf61ae 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -170,7 +170,6 @@ public class FileNodeProcessorV2 {
public boolean insert(TSRecord tsRecord) {
-
lock.writeLock().lock();
boolean result = true;
@@ -181,7 +180,7 @@ public class FileNodeProcessorV2 {
String baseDir = directories.getNextFolderForTsfile();
String filePath = Paths.get(baseDir, storageGroup, tsRecord.time + "").toString();
workBufferWriteProcessor = new BufferWriteProcessorV2(storageGroup, new File(filePath),
- fileSchema, versionController, this::closeBufferWriteProcessor);
+ fileSchema, versionController, this::closeBufferWriteProcessorCallBack);
newFileNodes.add(workBufferWriteProcessor.getTsFileResource());
// TODO check if the disk is full
}
@@ -202,19 +201,7 @@ public class FileNodeProcessorV2 {
// check memtable size and may asyncFlush the workMemtable
if (workBufferWriteProcessor.shouldFlush()) {
- workBufferWriteProcessor.asyncFlush();
-
- // update the largest timestamp in the last flushing memtable
- for (Entry<String, Long> entry : latestTimeMap.entrySet()) {
- latestFlushTimeMap.put(entry.getKey(), entry.getValue());
- }
- }
-
- // check file size and may close the BufferWrite
- if (workBufferWriteProcessor.shouldClose()) {
- closingBufferWriteProcessor.add(workBufferWriteProcessor);
- workBufferWriteProcessor.setClosing();
- workBufferWriteProcessor = null;
+ flushAndCheckClose();
}
} else {
@@ -229,11 +216,39 @@ public class FileNodeProcessorV2 {
return result;
}
+ /**
+ * ensure there must be a flush thread submitted after close() is called,
+ * therefore the close task will be executed by a flush thread.
+ * -- said by qiaojialin
+ *
+ * only called by insert(), thread-safety should be ensured by caller
+ */
+ private void flushAndCheckClose() {
+ boolean shouldClose = false;
+ // check file size and may close the BufferWrite
+ if (workBufferWriteProcessor.shouldClose()) {
+ closingBufferWriteProcessor.add(workBufferWriteProcessor);
+ workBufferWriteProcessor.close();
+ shouldClose = true;
+ }
+
+ workBufferWriteProcessor.asyncFlush();
+
+ if (shouldClose) {
+ workBufferWriteProcessor = null;
+ }
+
+ // update the largest timestamp in the last flushing memtable
+ for (Entry<String, Long> entry : latestTimeMap.entrySet()) {
+ latestFlushTimeMap.put(entry.getKey(), entry.getValue());
+ }
+ }
+
/**
* return the memtable to MemTablePool and make metadata in writer visible
*/
- private void closeBufferWriteProcessor(Object bufferWriteProcessor) {
+ private void closeBufferWriteProcessorCallBack(Object bufferWriteProcessor) {
closingBufferWriteProcessor.remove((BufferWriteProcessorV2) bufferWriteProcessor);
synchronized (fileNodeProcessorStore) {
fileNodeProcessorStore.setLastUpdateTimeMap(latestTimeMap);
@@ -257,5 +272,16 @@ public class FileNodeProcessorV2 {
}
}
-
+ public void forceClose() {
+ lock.writeLock().lock();
+ try {
+ if (workBufferWriteProcessor != null) {
+ closingBufferWriteProcessor.add(workBufferWriteProcessor);
+ workBufferWriteProcessor.forceClose();
+ workBufferWriteProcessor = null;
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
index a459f3e..b49c00a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
@@ -38,6 +38,7 @@ public class TsFileResourceV2 {
private Map<String, Long> startTimeMap;
// device -> end time
+ // null if it's an unsealed tsfile
private Map<String, Long> endTimeMap;
private transient ModificationFile modFile;