You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/12 05:55:38 UTC
[incubator-iotdb] branch feature_async_close_tsfile updated: fix
write restore info concurrent bug
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
new 4e7962b fix write restore info concurrent bug
4e7962b is described below
commit 4e7962b1dbb8fda9733f3301d440c7d91d416f3f
Author: qiaojialin <64...@qq.com>
AuthorDate: Wed Jun 12 13:55:24 2019 +0800
fix write restore info concurrent bug
---
.../engine/bufferwrite/BufferWriteProcessor.java | 47 +++-------------------
.../db/engine/filenode/FileNodeProcessor.java | 2 +
.../db/engine/memtable/MemTableFlushTask.java | 23 ++++++++---
.../iotdb/db/engine/memtable/MemTablePool.java | 2 +-
.../db/engine/overflow/io/OverflowProcessor.java | 8 ++--
.../bufferwrite/BufferWriteProcessorNewTest.java | 1 -
.../bufferwrite/BufferWriteProcessorTest.java | 3 --
7 files changed, 30 insertions(+), 56 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
index 97e53cb..bc43479 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
@@ -55,7 +55,6 @@ import org.apache.iotdb.db.utils.ImmediateFuture;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
import org.apache.iotdb.db.writelog.node.WriteLogNode;
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -99,12 +98,8 @@ public class BufferWriteProcessor extends Processor {
private WriteLogNode logNode;
private VersionController versionController;
- private boolean isClosing = true;
-
+ private boolean isClosing = false;
private boolean isClosed = false;
- private boolean isFlush = false;
-
-
private TsFileResource currentTsFileResource;
@@ -173,7 +168,6 @@ public class BufferWriteProcessor extends Processor {
workMemTable.clear();
}
isClosing = false;
- isFlush = false;
}
public void checkOpen() throws BufferWriteProcessorException {
@@ -321,19 +315,6 @@ public class BufferWriteProcessor extends Processor {
}
- private void switchFlushToWork() {
- LOGGER.info("BufferWrite Processor {} try to get flushQueryLock for switchFlushToWork", getProcessorName());
- flushQueryLock.lock();
- LOGGER.info("BufferWrite Processor {} get flushQueryLock for switchFlushToWork", getProcessorName());
- try {
- writer.appendMetadata();
- isFlush = false;
- } finally {
- flushQueryLock.unlock();
- LOGGER.info("BufferWrite Processor {} release the flushQueryLock for switchFlushToWork successfully", getProcessorName());
- }
- }
-
/**
* return the memtable to MemTablePool and make
* @param memTable
@@ -386,15 +367,10 @@ public class BufferWriteProcessor extends Processor {
}
result = true;
} catch (Exception e) {
- LOGGER.error(
- "The bufferwrite processor {} failed to flush {}, when calling the filenodeFlushAction.",
- getProcessorName(), displayMessage, e);
+ LOGGER.error("The bufferwrite processor {} failed to flush {}.", getProcessorName(), displayMessage, e);
result = false;
- } finally {
- switchFlushToWork();
- LOGGER.info("The bufferwrite processor {} ends flushing {}.", getProcessorName(),
- displayMessage);
}
+
if (LOGGER.isInfoEnabled()) {
long flushEndTime = System.currentTimeMillis();
LOGGER.info(
@@ -559,15 +535,6 @@ public class BufferWriteProcessor extends Processor {
}
/**
- * check if is flushing.
- *
- * @return True if flushing
- */
- public boolean isFlush() {
- return isFlush;
- }
-
- /**
* get metadata size.
*
* @return The sum of all timeseries's metadata size within this file.
@@ -649,12 +616,10 @@ public class BufferWriteProcessor extends Processor {
throws BufferWriteProcessorException {
checkOpen();
workMemTable.delete(deviceId, measurementId, timestamp);
- if (isFlush()) {
// flushing MemTable cannot be directly modified since another thread is reading it
- for (IMemTable memTable : flushingMemTables) {
- if (memTable.containSeries(deviceId, measurementId)) {
- memTable.delete(new Deletion(deviceId + PATH_SEPARATOR + measurementId, 0, timestamp));
- }
+ for (IMemTable memTable : flushingMemTables) {
+ if (memTable.containSeries(deviceId, measurementId)) {
+ memTable.delete(new Deletion(deviceId + PATH_SEPARATOR + measurementId, 0, timestamp));
}
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index 5bad134..2761720 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -176,6 +176,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
*/
private Map<String, Action> parameters;
private FileSchema fileSchema;
+
private Action fileNodeFlushAction = () -> {
synchronized (fileNodeProcessorStore) {
try {
@@ -185,6 +186,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
}
}
};
+
private Action bufferwriteFlushAction = () -> {
// update the lastUpdateTime Notice: Thread safe
synchronized (fileNodeProcessorStore) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java
index 02cd45c..0c103bb 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java
@@ -61,6 +61,8 @@ public class MemTableFlushTask {
private Thread memoryFlushThread = new Thread(() -> {
long memSerializeTime = 0;
+ LOGGER.info(
+ "BufferWrite Processor {},start serialize data into mem.", processorName);
while (!stop) {
Object task = memoryTaskQueue.poll();
if (task == null) {
@@ -102,14 +104,18 @@ public class MemTableFlushTask {
// rather than per each memtable.
private Thread ioFlushThread = new Thread(() -> {
long ioTime = 0;
+ long lastWaitIdx = 0;
long currentTsFileFlushId;
- boolean printed = false;
+ LOGGER.info("BufferWrite Processor {}, start io cost.", processorName);
+ long waitStartTime = System.currentTimeMillis();
while ((currentTsFileFlushId = tsFileIoWriter.getFlushID().get()) != flushId) {
try {
- if (!printed) {
- LOGGER.info("tsFileIoWriter flushID: {}, flush task flushID: {}", currentTsFileFlushId,
- flushId);
- printed = true;
+ long waitedTime = System.currentTimeMillis() - waitStartTime;
+ long currWaitIdx = waitedTime / 2000;
+ if (currWaitIdx > lastWaitIdx) {
+ lastWaitIdx = currWaitIdx;
+ LOGGER.info("tsFileIoWriter flushID: {}, flush task flushID: {} has waited {}ms", currentTsFileFlushId,
+ flushId, waitedTime);
}
Thread.sleep(10);
} catch (InterruptedException e) {
@@ -143,16 +149,21 @@ public class MemTableFlushTask {
ioTime += System.currentTimeMillis() - starTime;
}
}
+
MemTablePool.getInstance().release(memTable);
LOGGER.info("Processor {} return back a memtable to MemTablePool", processorName);
flushCallBack.afterFlush(memTable, tsFileIoWriter);
if (tsFileIoWriter instanceof RestorableTsFileIOWriter) {
try {
- ((RestorableTsFileIOWriter) tsFileIoWriter).flush();
+ RestorableTsFileIOWriter restorableTsFileIOWriter = (RestorableTsFileIOWriter) tsFileIoWriter;
+ restorableTsFileIOWriter.flush();
+ restorableTsFileIOWriter.appendMetadata();
} catch (IOException e) {
LOGGER.error("write restore file meet error", e);
}
}
+
+ // enable next flush task to IO
long newId = tsFileIoWriter.getFlushID().incrementAndGet();
LOGGER.info("BufferWrite Processor {}, flushing a memtable into disk: io cost {}ms, new flushID in tsFileIoWriter: {}.",
processorName, ioTime, newId);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
index f82bfe5..7799c7a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
@@ -54,7 +54,7 @@ public class MemTablePool {
long waitedTime = System.currentTimeMillis() - waitStartTime;
if (waitedTime / 2000 > lastPrintIdx) {
lastPrintIdx = waitedTime / 2000;
- LOGGER.info("{} has waited for a memtable for {}ms", waitedTime);
+ LOGGER.info("{} has waited for a memtable for {}ms", applier, waitedTime);
}
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
index d8cb8be..4317cbc 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
@@ -767,13 +767,13 @@ public class OverflowProcessor extends Processor {
// try {
// Pair<> workSupport;
// workSupport = new OverflowMemtable();
-// if(isFlush){
-// // isFlush = true, indicating an AsyncFlushThread has been running, only add Current overflowInfo
+// if(isFlushing){
+// // isFlushing = true, indicating an AsyncFlushThread has been running, only add Current overflowInfo
// // into List.
//
//
// }else {
-// isFlush = true;
+// isFlushing = true;
//// flushFuture = FlushManager.getInstance().submit(() ->
// flushTask("asynchronously", walTaskId));
// }
@@ -797,7 +797,7 @@ public class OverflowProcessor extends Processor {
// OverflowMemtable temp = flushSupport == null ? new OverflowMemtable() : flushSupport;
// flushSupport = workSupport;
// workSupport = temp;
-// isFlush = true;
+// isFlushing = true;
// break;
// }
// flushInfo = flushTaskList.remove(0);
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java
index 1fb5e48..a826495 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java
@@ -124,7 +124,6 @@ public class BufferWriteProcessorNewTest {
assertEquals(num, timeValuePair.getTimestamp());
assertEquals(num, timeValuePair.getValue().getInt());
}
- assertFalse(bufferwrite.isFlush());
long lastFlushTime = bufferwrite.getLastFlushTime();
// flush asynchronously
bufferwrite.flush();
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java
index 65ef7a4..d3c462d 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java
@@ -28,7 +28,6 @@ import static org.junit.Assert.fail;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
-import java.nio.Buffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -219,7 +218,6 @@ public class BufferWriteProcessorTest {
bufferwrite = new BufferWriteProcessor(directories.getFolderForTest(), deviceId, insertPath,
parameters, bfcloseConsumer, SysTimeVersionController.INSTANCE,
FileSchemaUtils.constructFileSchema(deviceId));
- assertFalse(bufferwrite.isFlush());
assertTrue(bufferwrite.canBeClosed());
assertEquals(0, bufferwrite.memoryUsage());
assertEquals(TsFileIOWriter.magicStringBytes.length, bufferwrite.getFileSize());
@@ -242,7 +240,6 @@ public class BufferWriteProcessorTest {
Assert.fail("mock flush spends more than 10 seconds... "
+ "Please modify the value or change a better test environment");
}
- assertFalse(bufferwrite.isFlush());
assertEquals(0, bufferwrite.memoryUsage());
// query result
Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = bufferwrite