You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by li...@apache.org on 2019/06/12 08:16:47 UTC
[incubator-iotdb] branch feature_async_close_tsfile updated: remove
reopen mechanism in bufferwrite
This is an automated email from the ASF dual-hosted git repository.
liurui pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
new 71f5c1a remove reopen mechanism in bufferwrite
71f5c1a is described below
commit 71f5c1afc090007b42c2b0c61520fbf04d945805
Author: liuruiyiyang <24...@qq.com>
AuthorDate: Wed Jun 12 16:16:16 2019 +0800
remove reopen mechanism in bufferwrite
---
.../engine/bufferwrite/BufferWriteProcessor.java | 52 ++++------------------
.../db/engine/filenode/FileNodeProcessor.java | 15 ++-----
2 files changed, 12 insertions(+), 55 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
index 62cfee5..fe47e1d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
@@ -98,7 +98,6 @@ public class BufferWriteProcessor extends Processor {
private WriteLogNode logNode;
private VersionController versionController;
- private boolean isClosing = true;
private boolean isClosed = false;
private TsFileResource currentTsFileResource;
@@ -125,7 +124,7 @@ public class BufferWriteProcessor extends Processor {
//bufferwriteCloseAction = parameters.get(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION);
filenodeFlushAction = parameters.get(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION);
this.bufferwriteCloseConsumer = bufferwriteCloseConsumer;
- reopen(fileName);
+ open(fileName);
try {
getLogNode();
} catch (IOException e) {
@@ -135,20 +134,8 @@ public class BufferWriteProcessor extends Processor {
}
- public void reopen(String fileName) throws BufferWriteProcessorException {
- if (!isClosing) {
- return;
- }
+ private void open(String fileName) throws BufferWriteProcessorException {
- try {
- LOGGER.info("wait for flush to reopen a BufferWrite Processor {}", processorName);
- this.flushFuture.get();
- LOGGER.info("wait for close to reopen a BufferWrite Processor {}", processorName);
- this.closeFuture.get();
- LOGGER.info("now begin to reopen the bufferwrite processor {}", processorName);
- } catch (InterruptedException | ExecutionException e) {
- LOGGER.error("reopen error in Bufferwrite Processor {}", processorName, e);
- }
new File(baseDir, processorName).mkdirs();
this.insertFilePath = Paths.get(baseDir, processorName, fileName).toString();
bufferWriteRelativePath = processorName + File.separatorChar + fileName;
@@ -157,22 +144,11 @@ public class BufferWriteProcessor extends Processor {
} catch (IOException e) {
throw new BufferWriteProcessorException(e);
}
- if (workMemTable == null) {
- long start1 = System.currentTimeMillis();
- workMemTable = MemTablePool.getInstance().getEmptyMemTable(this);
- start1 = System.currentTimeMillis() - start1;
- if (start1 > 1000) {
- LOGGER.info("BufferWriteProcessor.reopen getEmptyMemtable cost: {}", start1);
- }
- } else {
- workMemTable.clear();
- }
- isClosing = false;
- }
-
- public void checkOpen() throws BufferWriteProcessorException {
- if (isClosing) {
- throw new BufferWriteProcessorException("BufferWriteProcessor already closed");
+ long start1 = System.currentTimeMillis();
+ workMemTable = MemTablePool.getInstance().getEmptyMemTable(this);
+ start1 = System.currentTimeMillis() - start1;
+ if (start1 > 1000) {
+ LOGGER.info("BufferWriteProcessor.open getEmptyMemtable cost: {}", start1);
}
}
@@ -193,7 +169,6 @@ public class BufferWriteProcessor extends Processor {
public boolean write(String deviceId, String measurementId, long timestamp, TSDataType dataType,
String value)
throws BufferWriteProcessorException {
- checkOpen();
TSRecord record = new TSRecord(timestamp, deviceId);
DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementId, value);
record.addTuple(dataPoint);
@@ -210,7 +185,6 @@ public class BufferWriteProcessor extends Processor {
*/
public boolean write(TSRecord tsRecord) throws BufferWriteProcessorException {
long start1 = System.currentTimeMillis();
- checkOpen();
long memUsage = MemUtils.getRecordSize(tsRecord);
BasicMemController.UsageLevel level = BasicMemController.getInstance()
.acquireUsage(this, memUsage);
@@ -293,7 +267,6 @@ public class BufferWriteProcessor extends Processor {
public Pair<ReadOnlyMemChunk, List<ChunkMetaData>> queryBufferWriteData(String deviceId,
String measurementId, TSDataType dataType, Map<String, String> props)
throws BufferWriteProcessorException {
- checkOpen();
flushQueryLock.lock();
try {
MemSeriesLazyMerger memSeriesLazyMerger = new MemSeriesLazyMerger();
@@ -392,7 +365,7 @@ public class BufferWriteProcessor extends Processor {
// keyword synchronized is added in this method, so that only one flush task can be submitted now.
private Future<Boolean> flush(boolean isCloseTaskCalled) throws IOException {
- if (!isCloseTaskCalled && isClosing) {
+ if (!isCloseTaskCalled) {
throw new IOException("BufferWriteProcessor closed");
}
// statistic information for flush
@@ -478,11 +451,7 @@ public class BufferWriteProcessor extends Processor {
@Override
public synchronized void close() throws BufferWriteProcessorException {
- if (isClosing) {
- return;
- }
try {
- isClosing = true;
// flush data (if there are flushing task, flush() will be blocked)
//Future<Boolean> flush = flush();
//and wait for finishing flush async
@@ -619,7 +588,6 @@ public class BufferWriteProcessor extends Processor {
*/
public void delete(String deviceId, String measurementId, long timestamp)
throws BufferWriteProcessorException {
- checkOpen();
workMemTable.delete(deviceId, measurementId, timestamp);
// flushing MemTable cannot be directly modified since another thread is reading it
for (IMemTable memTable : flushingMemTables) {
@@ -648,10 +616,6 @@ public class BufferWriteProcessor extends Processor {
return insertFilePath;
}
- public boolean isClosing() {
- return isClosing;
- }
-
public boolean isClosed() {
return isClosed;
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index 2761720..a1ee26a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -582,13 +582,6 @@ public class FileNodeProcessor extends Processor implements IStatistic {
.format("The filenode processor %s failed to get the bufferwrite processor.",
processorName), e);
}
- } else if (bufferWriteProcessor.isClosing()) {
- try {
- bufferWriteProcessor.reopen(insertTime + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR
- + System.currentTimeMillis());
- } catch (BufferWriteProcessorException e) {
- throw new FileNodeProcessorException("Cannot reopen BufferWriteProcessor", e);
- }
}
return bufferWriteProcessor;
}
@@ -1767,7 +1760,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
public FileNodeFlushFuture flush() throws IOException {
Future<Boolean> bufferWriteFlushFuture = null;
Future<Boolean> overflowFlushFuture = null;
- if (bufferWriteProcessor != null && !bufferWriteProcessor.isClosing()) {
+ if (bufferWriteProcessor != null) {
bufferWriteFlushFuture = bufferWriteProcessor.flush();
}
if (overflowProcessor != null && !overflowProcessor.isClosed()) {
@@ -1780,7 +1773,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
* Close the bufferwrite processor.
*/
public Future<Boolean> closeBufferWrite() throws FileNodeProcessorException {
- if (bufferWriteProcessor == null || bufferWriteProcessor.isClosing()) {
+ if (bufferWriteProcessor == null) {
return new ImmediateFuture<>(true);
}
try {
@@ -1962,7 +1955,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
// delete data in memory
OverflowProcessor ofProcessor = getOverflowProcessor(getProcessorName());
ofProcessor.delete(deviceId, measurementId, timestamp, version, updatedModFiles);
- if (bufferWriteProcessor != null && !bufferWriteProcessor.isClosing()) {
+ if (bufferWriteProcessor != null) {
bufferWriteProcessor.delete(deviceId, measurementId, timestamp);
}
} catch (Exception e) {
@@ -2017,7 +2010,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
}
throw e;
}
- if (bufferWriteProcessor != null && !bufferWriteProcessor.isClosing()) {
+ if (bufferWriteProcessor != null) {
try {
bufferWriteProcessor.delete(deviceId, measurementId, timestamp);
} catch (BufferWriteProcessorException e) {