You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/19 01:59:52 UTC
[incubator-iotdb] branch feature_async_close_tsfile updated: fix
filenode merge lock
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
new fce0cd7 fix filenode merge lock
fce0cd7 is described below
commit fce0cd739f8755cd0af18225f09ec0abc36857e9
Author: qiaojialin <64...@qq.com>
AuthorDate: Wed Jun 19 09:59:38 2019 +0800
fix filenode merge lock
---
.../bufferwriteV2/BufferWriteProcessorV2.java | 2 +-
.../db/engine/bufferwriteV2/FlushManager.java | 24 +++++++++++-----------
.../db/engine/filenode/FileNodeProcessor.java | 2 +-
.../db/engine/filenodeV2/TsFileResourceV2.java | 7 +++----
4 files changed, 17 insertions(+), 18 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/BufferWriteProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/BufferWriteProcessorV2.java
index 081b383..3ecf5e7 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/BufferWriteProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/BufferWriteProcessorV2.java
@@ -133,7 +133,7 @@ public class BufferWriteProcessorV2 {
*/
public void asyncFlush() {
flushingMemTables.addLast(workMemTable);
- FlushManager.getInstance().registerBWP(this);
+ FlushManager.getInstance().registerBWProcessor(this);
workMemTable = null;
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/FlushManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/FlushManager.java
index 55477f9..085dbfe 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/FlushManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/FlushManager.java
@@ -23,26 +23,26 @@ import org.apache.iotdb.db.engine.pool.FlushPoolManager;
public class FlushManager {
- private ConcurrentLinkedQueue<BufferWriteProcessorV2> bwpQueue = new ConcurrentLinkedQueue<>();
+ private ConcurrentLinkedQueue<BufferWriteProcessorV2> bwProcessorQueue = new ConcurrentLinkedQueue<>();
private FlushPoolManager flushPool = FlushPoolManager.getInstance();
- private Runnable flushAction = () -> {
- BufferWriteProcessorV2 bwp = bwpQueue.poll();
- bwp.flushOneMemTable();
- bwp.setManagedByFlushManager(false);
- registerBWP(bwp);
+ private Runnable flushThread = () -> {
+ BufferWriteProcessorV2 bwProcessor = bwProcessorQueue.poll();
+ bwProcessor.flushOneMemTable();
+ bwProcessor.setManagedByFlushManager(false);
+ registerBWProcessor(bwProcessor);
};
/**
* Add BufferWriteProcessor to asyncFlush manager
*/
- public boolean registerBWP(BufferWriteProcessorV2 bwp) {
- synchronized (bwp) {
- if (!bwp.isManagedByFlushManager() && bwp.getFlushingMemTableSize() > 0) {
- bwpQueue.add(bwp);
- bwp.setManagedByFlushManager(true);
- flushPool.submit(flushAction);
+ public boolean registerBWProcessor(BufferWriteProcessorV2 bwProcessor) {
+ synchronized (bwProcessor) {
+ if (!bwProcessor.isManagedByFlushManager() && bwProcessor.getFlushingMemTableSize() > 0) {
+ bwProcessorQueue.add(bwProcessor);
+ bwProcessor.setManagedByFlushManager(true);
+ flushPool.submit(flushThread);
return true;
}
return false;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index 43086c4..0d670a7 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -1058,6 +1058,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
public void merge() throws FileNodeProcessorException {
// close bufferwrite and overflow, prepare for merge
LOGGER.info("The filenode processor {} begins to merge.", getProcessorName());
+ writeLock();
prepareForMerge();
// change status from overflowed to no overflowed
isOverflowed = false;
@@ -2077,7 +2078,6 @@ public class FileNodeProcessor extends Processor implements IStatistic {
try {
ZoneId zoneId = IoTDBDescriptor.getInstance().getConfig().getZoneID();
long mergeStartTime = System.currentTimeMillis();
- writeLock();
merge();
long mergeEndTime = System.currentTimeMillis();
long intervalTime = mergeEndTime - mergeStartTime;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
index 5741037..a459f3e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
@@ -23,22 +23,21 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Set;
import org.apache.iotdb.db.conf.directories.Directories;
import org.apache.iotdb.db.engine.filenode.OverflowChangeType;
-import org.apache.iotdb.db.engine.filenode.TsFileResource;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
public class TsFileResourceV2 {
private File file;
+
+ // device -> start time
private Map<String, Long> startTimeMap;
+ // device -> end time
private Map<String, Long> endTimeMap;
private transient ModificationFile modFile;