You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/11 13:19:42 UTC
[incubator-iotdb] branch feature_async_close_tsfile updated: fix
getClosingBufferWriteProcessor bug
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
new 80f41d9 fix getClosingBufferWriteProcessor bug
80f41d9 is described below
commit 80f41d914eb6ada1cbf623816f5009cf5e4f96a3
Author: qiaojialin <64...@qq.com>
AuthorDate: Tue Jun 11 21:19:28 2019 +0800
fix getClosingBufferWriteProcessor bug
---
.../java/org/apache/iotdb/db/engine/Processor.java | 10 +++----
.../engine/bufferwrite/BufferWriteProcessor.java | 31 +++++++++++-----------
.../db/engine/filenode/CopyOnWriteLinkedList.java | 12 ++++++---
.../iotdb/db/engine/filenode/FileNodeManager.java | 18 ++++++-------
.../db/engine/filenode/FileNodeProcessor.java | 16 +++++++----
.../iotdb/db/query/control/FileReaderManager.java | 2 +-
6 files changed, 50 insertions(+), 39 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java
index 0388476..42d6f5e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java
@@ -82,11 +82,11 @@ public abstract class Processor {
* Release the write lock
*/
public void writeUnlock() {
- lock.writeLock().unlock();
start = System.currentTimeMillis() - start;
if (start > 1000) {
LOGGER.info("Processor {} hold lock for {}ms", processorName, start, new RuntimeException());
}
+ lock.writeLock().unlock();
}
/**
@@ -115,15 +115,15 @@ public abstract class Processor {
* true release write lock, false release read unlock
*/
public void unlock(boolean isWriteUnlock) {
+ start = System.currentTimeMillis() - start;
+ if (start > 1000) {
+ LOGGER.info("Processor {} hold lock for {}ms", processorName, start, new RuntimeException());
+ }
if (isWriteUnlock) {
writeUnlock();
} else {
readUnlock();
}
- start = System.currentTimeMillis() - start;
- if (start > 1000) {
- LOGGER.info("Processor {} hold lock for {}ms", processorName, start, new RuntimeException());
- }
}
/**
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
index a0ed3c2..d627d9b 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
@@ -25,7 +25,6 @@ import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -46,14 +45,12 @@ import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.MemSeriesLazyMerger;
import org.apache.iotdb.db.engine.memtable.MemTableFlushTask;
import org.apache.iotdb.db.engine.memtable.MemTablePool;
-import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.pool.FlushManager;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.BufferWriteProcessorException;
import org.apache.iotdb.db.qp.constant.DatetimeUtils;
-import org.apache.iotdb.db.utils.FileUtils;
import org.apache.iotdb.db.utils.ImmediateFuture;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
@@ -101,7 +98,9 @@ public class BufferWriteProcessor extends Processor {
private WriteLogNode logNode;
private VersionController versionController;
- private boolean isClosed = true;
+ private boolean isClosing = true;
+
+ private boolean isClosed = false;
private boolean isFlush = false;
@@ -141,7 +140,7 @@ public class BufferWriteProcessor extends Processor {
}
public void reopen(String fileName) throws BufferWriteProcessorException {
- if (!isClosed) {
+ if (!isClosing) {
return;
}
@@ -167,12 +166,12 @@ public class BufferWriteProcessor extends Processor {
} else {
workMemTable.clear();
}
- isClosed = false;
+ isClosing = false;
isFlush = false;
}
public void checkOpen() throws BufferWriteProcessorException {
- if (isClosed) {
+ if (isClosing) {
throw new BufferWriteProcessorException("BufferWriteProcessor already closed");
}
}
@@ -385,7 +384,7 @@ public class BufferWriteProcessor extends Processor {
// keyword synchronized is added in this method, so that only one flush task can be submitted now.
private Future<Boolean> flush(boolean isCloseTaskCalled) throws IOException {
- if (!isCloseTaskCalled && isClosed) {
+ if (!isCloseTaskCalled && isClosing) {
throw new IOException("BufferWriteProcessor closed");
}
// statistic information for flush
@@ -459,11 +458,11 @@ public class BufferWriteProcessor extends Processor {
@Override
public synchronized void close() throws BufferWriteProcessorException {
- if (isClosed) {
+ if (isClosing) {
return;
}
try {
- isClosed = true;
+ isClosing = true;
// flush data (if there are flushing task, flush() will be blocked)
//Future<Boolean> flush = flush();
//and wait for finishing flush async
@@ -488,12 +487,8 @@ public class BufferWriteProcessor extends Processor {
//FIXME suppose the flush-thread-pool is 2.
// then if a flush task and a close task are running in the same time
// and the close task is faster, then writer == null, and the flush task will throw nullpointer
- // expcetion. Add "synchronized" keyword on both flush and close may solve the issue.
+ // exception. Add "synchronized" keyword on both flush and close may solve the issue.
writer = null;
- isClosed = true;
- //A BUG may appears here: workMemTable has been cleared,
- // but the corresponding TsFile is not maintained in Processor.
- workMemTable.clear();
// update the IntervalFile for interval list
bufferwriteCloseConsumer.accept(this);
// flush the changed information for filenode
@@ -513,6 +508,8 @@ public class BufferWriteProcessor extends Processor {
}catch (IOException | ActionException e) {
LOGGER.error("Close bufferwrite processor {} failed.", getProcessorName(), e);
return false;
+ } finally {
+ isClosed = true;
}
return true;
}
@@ -642,6 +639,10 @@ public class BufferWriteProcessor extends Processor {
return insertFilePath;
}
+ public boolean isClosing() {
+ return isClosing;
+ }
+
public boolean isClosed() {
return isClosed;
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/CopyOnWriteLinkedList.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/CopyOnWriteLinkedList.java
index a412525..89a8c0a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/CopyOnWriteLinkedList.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/CopyOnWriteLinkedList.java
@@ -31,17 +31,21 @@ import java.util.concurrent.CopyOnWriteArrayList;
* @param <T>
*/
public class CopyOnWriteLinkedList<T> {
+
LinkedList<T> data = new LinkedList<>();
List<T> readCopy;
- public synchronized void add(T d) {
+ public synchronized void add(T d) {
data.add(d);
- readCopy = new ArrayList<>(data);
+ }
+
+ public synchronized void remove(T d) {
+ data.remove(d);
}
public synchronized Iterator<T> iterator() {
- readCopy = new ArrayList<>(data);
- return data.iterator();
+ readCopy = new ArrayList<>(data);
+ return readCopy.iterator();
}
public synchronized void reset() {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
index a1fe9cd..85b2a94 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
@@ -128,18 +128,18 @@ public class FileNodeManager implements IStatistic, IService {
statMonitor.registerStatistics(MonitorConstants.STAT_STORAGE_DELTA_NAME, this);
}
-// closedProcessorCleaner.scheduleWithFixedDelay(()->{
-// int size = 0;
-// for (FileNodeProcessor fileNodeProcessor : processorMap.values()) {
-// size += fileNodeProcessor.getClosingBufferWriteProcessor().size();
-// }
-// if (size > 5) {
-// LOGGER.info("Current closing processor number is {}", size);
-// }
+ closedProcessorCleaner.scheduleWithFixedDelay(()->{
+ int size = 0;
+ for (FileNodeProcessor fileNodeProcessor : processorMap.values()) {
+ size += fileNodeProcessor.getClosingBufferWriteProcessor().size();
+ }
+ if (size > 5) {
+ LOGGER.info("Current closing processor number is {}", size);
+ }
// for (FileNodeProcessor fileNodeProcessor : processorMap.values()) {
// fileNodeProcessor.checkAllClosingProcessors();
// }
-// }, 0, 3000, TimeUnit.MILLISECONDS);
+ }, 0, 30000, TimeUnit.MILLISECONDS);
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index df31499..5bad134 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -580,7 +580,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
.format("The filenode processor %s failed to get the bufferwrite processor.",
processorName), e);
}
- } else if (bufferWriteProcessor.isClosed()) {
+ } else if (bufferWriteProcessor.isClosing()) {
try {
bufferWriteProcessor.reopen(insertTime + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR
+ System.currentTimeMillis());
@@ -1765,7 +1765,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
public FileNodeFlushFuture flush() throws IOException {
Future<Boolean> bufferWriteFlushFuture = null;
Future<Boolean> overflowFlushFuture = null;
- if (bufferWriteProcessor != null && !bufferWriteProcessor.isClosed()) {
+ if (bufferWriteProcessor != null && !bufferWriteProcessor.isClosing()) {
bufferWriteFlushFuture = bufferWriteProcessor.flush();
}
if (overflowProcessor != null && !overflowProcessor.isClosed()) {
@@ -1778,7 +1778,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
* Close the bufferwrite processor.
*/
public Future<Boolean> closeBufferWrite() throws FileNodeProcessorException {
- if (bufferWriteProcessor == null || bufferWriteProcessor.isClosed()) {
+ if (bufferWriteProcessor == null || bufferWriteProcessor.isClosing()) {
return new ImmediateFuture<>(true);
}
try {
@@ -1960,7 +1960,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
// delete data in memory
OverflowProcessor ofProcessor = getOverflowProcessor(getProcessorName());
ofProcessor.delete(deviceId, measurementId, timestamp, version, updatedModFiles);
- if (bufferWriteProcessor != null && !bufferWriteProcessor.isClosed()) {
+ if (bufferWriteProcessor != null && !bufferWriteProcessor.isClosing()) {
bufferWriteProcessor.delete(deviceId, measurementId, timestamp);
}
} catch (Exception e) {
@@ -2015,7 +2015,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
}
throw e;
}
- if (bufferWriteProcessor != null && !bufferWriteProcessor.isClosed()) {
+ if (bufferWriteProcessor != null && !bufferWriteProcessor.isClosing()) {
try {
bufferWriteProcessor.delete(deviceId, measurementId, timestamp);
} catch (BufferWriteProcessorException e) {
@@ -2044,6 +2044,12 @@ public class FileNodeProcessor extends Processor implements IStatistic {
}
public CopyOnWriteLinkedList<BufferWriteProcessor> getClosingBufferWriteProcessor() {
+ for (BufferWriteProcessor processor: closingBufferWriteProcessor.read()) {
+ if (processor.isClosed()) {
+ closingBufferWriteProcessor.remove(processor);
+ }
+ }
+ closingBufferWriteProcessor.reset();
return closingBufferWriteProcessor;
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java b/iotdb/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
index 461ff0b..cd33c91 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
@@ -117,7 +117,7 @@ public class FileReaderManager implements IService {
/**
* Get the reader of the file(tsfile or unseq tsfile) indicated by filePath. If the reader already
- * exists, just get it from closedFileReaderMap or unclosedFileReaderMap depending on isClosed .
+ * exists, just get it from closedFileReaderMap or unclosedFileReaderMap depending on isClosing .
* Otherwise a new reader will be created and cached.
*
* @param filePath the path of the file, of which the reader is desired.