You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/06/20 14:03:33 UTC
[incubator-iotdb] branch feature_async_close_tsfile updated: fix
sync flush bug
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
new 7b423a3 fix sync flush bug
7b423a3 is described below
commit 7b423a3ccc48dc4fdf625161ffc46b0a3261db20
Author: lta <li...@163.com>
AuthorDate: Thu Jun 20 22:03:06 2019 +0800
fix sync flush bug
---
.../iotdb/db/engine/filenodeV2/FlushManager.java | 16 ++++-----
.../filenodeV2/UnsealedTsFileProcessorV2.java | 41 ++++++++++++----------
.../filenodeV2/UnsealedTsFileProcessorV2Test.java | 1 -
3 files changed, 30 insertions(+), 28 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FlushManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FlushManager.java
index 0fd2b94..ebcac70 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FlushManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FlushManager.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.engine.filenodeV2;
import java.io.IOException;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Future;
import org.apache.iotdb.db.engine.pool.FlushPoolManager;
public class FlushManager {
@@ -30,28 +31,27 @@ public class FlushManager {
private FlushPoolManager flushPool = FlushPoolManager.getInstance();
private Runnable flushThread = () -> {
- UnsealedTsFileProcessorV2 udfProcessor = unsealedTsFileProcessorQueue.poll();
+ UnsealedTsFileProcessorV2 unsealedTsFileProcessor = unsealedTsFileProcessorQueue.poll();
try {
- udfProcessor.flushOneMemTable();
+ unsealedTsFileProcessor.flushOneMemTable();
} catch (IOException e) {
// TODO do sth
}
- udfProcessor.setManagedByFlushManager(false);
- registerUnsealedTsFileProcessor(udfProcessor);
+ unsealedTsFileProcessor.setManagedByFlushManager(false);
+ registerUnsealedTsFileProcessor(unsealedTsFileProcessor);
};
/**
* Add BufferWriteProcessor to asyncFlush manager
*/
- public boolean registerUnsealedTsFileProcessor(UnsealedTsFileProcessorV2 unsealedTsFileProcessor) {
+ public Future registerUnsealedTsFileProcessor(UnsealedTsFileProcessorV2 unsealedTsFileProcessor) {
synchronized (unsealedTsFileProcessor) {
if (!unsealedTsFileProcessor.isManagedByFlushManager() && unsealedTsFileProcessor.getFlushingMemTableSize() > 0) {
unsealedTsFileProcessorQueue.add(unsealedTsFileProcessor);
unsealedTsFileProcessor.setManagedByFlushManager(true);
- flushPool.submit(flushThread);
- return true;
+ return flushPool.submit(flushThread);
}
- return false;
+ return null;
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
index a324929..a5b7b2b 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
@@ -158,23 +158,30 @@ public class UnsealedTsFileProcessorV2 {
flushingMemTables.addLast(workMemTable);
FlushManager.getInstance().registerUnsealedTsFileProcessor(this);
workMemTable = null;
- }finally {
+ } finally {
flushQueryLock.writeLock().unlock();
}
}
// only for test
public void syncFlush() {
+ IMemTable tmpMemTable;
+ flushQueryLock.writeLock().lock();
try {
- IMemTable tmpMemTable = workMemTable == null ? new EmptyMemTable() : workMemTable;
+ tmpMemTable = workMemTable == null ? new EmptyMemTable() : workMemTable;
flushingMemTables.addLast(tmpMemTable);
FlushManager.getInstance().registerUnsealedTsFileProcessor(this);
workMemTable = null;
- synchronized (tmpMemTable) {
+ } finally {
+ flushQueryLock.writeLock().unlock();
+ }
+
+ synchronized (tmpMemTable) {
+ try {
tmpMemTable.wait();
+ } catch (InterruptedException e) {
+ LOGGER.error("wait flush finished meets error", e);
}
- } catch (InterruptedException e) {
- LOGGER.error("wait flush finished meets error", e);
}
}
@@ -190,7 +197,7 @@ public class UnsealedTsFileProcessorV2 {
}
public synchronized void asyncClose() {
- flushingMemTables.add(workMemTable);
+ flushingMemTables.add(workMemTable == null ? new EmptyMemTable() : workMemTable);
workMemTable = null;
shouldClose = true;
FlushManager.getInstance().registerUnsealedTsFileProcessor(this);
@@ -217,20 +224,16 @@ public class UnsealedTsFileProcessorV2 {
IMemTable memTableToFlush;
memTableToFlush = flushingMemTables.getFirst();
-
// null memtable only appears when calling asyncClose()
- if (memTableToFlush != null) {
- if(memTableToFlush.isManagedByMemPool()) {
- MemTableFlushTaskV2 flushTask = new MemTableFlushTaskV2(writer, storageGroupName,
- this::releaseFlushedMemTable);
- flushTask.flushMemTable(fileSchema, memTableToFlush, versionController.nextVersion());
- }
- // for sync flush
- synchronized (memTableToFlush){
- memTableToFlush.notify();
- }
+ if (memTableToFlush.isManagedByMemPool()) {
+ MemTableFlushTaskV2 flushTask = new MemTableFlushTaskV2(writer, storageGroupName,
+ this::releaseFlushedMemTable);
+ flushTask.flushMemTable(fileSchema, memTableToFlush, versionController.nextVersion());
+ }
+ // for sync flush
+ synchronized (memTableToFlush) {
+ memTableToFlush.notify();
}
-
if (shouldClose && flushingMemTables.isEmpty()) {
endFile();
@@ -278,7 +281,7 @@ public class UnsealedTsFileProcessorV2 {
}
public int getFlushingMemTableSize() {
- return flushingMemTables.size();
+ return flushingMemTables.size();
}
/**
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
index 8017168..f66268c 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
@@ -169,5 +169,4 @@ public class UnsealedTsFileProcessorV2Test {
assertTrue(processor.getTsFileResource().isClosed());
}
-
}
\ No newline at end of file