You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/01/29 08:09:44 UTC
[iotdb] branch master updated: [IOTDB-5338] WAL buffer flush threshold optimaztion (#8832)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 76a8b7d720 [IOTDB-5338] WAL buffer flush threshold optimaztion (#8832)
76a8b7d720 is described below
commit 76a8b7d7206c0e65a9c22c74deae77a52be7484e
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Sun Jan 29 16:09:39 2023 +0800
[IOTDB-5338] WAL buffer flush threshold optimaztion (#8832)
---
.../org/apache/iotdb/db/wal/buffer/WALBuffer.java | 58 +++++++++++++---------
1 file changed, 35 insertions(+), 23 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
index c1ef5932e4..db2d627ef5 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
@@ -58,6 +58,7 @@ public class WALBuffer extends AbstractWALBuffer {
private static final Logger logger = LoggerFactory.getLogger(WALBuffer.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final int HALF_WAL_BUFFER_SIZE = config.getWalBufferSize() / 2;
+ private static final double FSYNC_BUFFER_RATIO = 0.95;
private static final int QUEUE_CAPACITY = config.getWalBufferQueueCapacity();
/** whether close method is called */
@@ -142,9 +143,9 @@ public class WALBuffer extends AbstractWALBuffer {
/** This task serializes WALEntry to workingBuffer and will call fsync at last. */
private class SerializeTask implements Runnable {
- private final ByteBufferView byteBufferVew = new ByteBufferView();
+ private final ByteBufferView byteBufferView = new ByteBufferView();
private final SerializeInfo info = new SerializeInfo();
- private int batchSize = 0;
+ private int totalSize = 0;
@Override
public void run() {
@@ -172,20 +173,21 @@ public class WALBuffer extends AbstractWALBuffer {
Thread.currentThread().interrupt();
}
- // for better fsync performance, sleep a while to enlarge write batch
- long fsyncDelay = config.getFsyncWalDelayInMs();
- if (fsyncDelay > 0) {
+ // try to get more WALEntries with blocking interface to enlarge write batch
+ while (totalSize < HALF_WAL_BUFFER_SIZE * FSYNC_BUFFER_RATIO) {
+ WALEntry walEntry = null;
try {
- Thread.sleep(fsyncDelay);
+ // for better fsync performance, wait a while to enlarge write batch
+ walEntry = walEntries.poll(config.getFsyncWalDelayInMs(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
- logger.warn("Interrupted when sleeping a while to enlarge wal write batch.");
+ logger.warn(
+ "Interrupted when waiting for taking WALEntry from blocking queue to serialize.");
Thread.currentThread().interrupt();
}
- }
- // try to get more WALEntries with non-blocking interface to enlarge write batch
- while (walEntries.peek() != null && batchSize < QUEUE_CAPACITY) {
- WALEntry walEntry = walEntries.poll();
+ if (walEntry == null) {
+ break;
+ }
boolean returnFlag = handleWALEntry(walEntry);
if (returnFlag) {
return;
@@ -193,7 +195,7 @@ public class WALBuffer extends AbstractWALBuffer {
}
// call fsync at last and set fsyncListeners
- if (batchSize > 0) {
+ if (totalSize > 0) {
fsyncWorkingBuffer(currentSearchIndex, currentFileStatus, info);
}
}
@@ -209,7 +211,6 @@ public class WALBuffer extends AbstractWALBuffer {
boolean success = handleInfoEntry(walEntry);
if (success) {
- ++batchSize;
info.fsyncListeners.add(walEntry.getWalFlushListener());
}
return false;
@@ -221,11 +222,10 @@ public class WALBuffer extends AbstractWALBuffer {
* @return true if serialization is successful.
*/
private boolean handleInfoEntry(WALEntry walEntry) {
- int size = byteBufferVew.position();
+ int size = byteBufferView.position();
try {
- walEntry.serialize(byteBufferVew);
- size = byteBufferVew.position() - size;
- logger.debug("wal entry size is: {}", size);
+ walEntry.serialize(byteBufferView);
+ size = byteBufferView.position() - size;
} catch (Exception e) {
logger.error(
"Fail to serialize WALEntry to wal node-{}'s buffer, discard it.", identifier, e);
@@ -245,6 +245,7 @@ public class WALBuffer extends AbstractWALBuffer {
currentFileStatus = WALFileStatus.CONTAINS_SEARCH_INDEX;
}
}
+ totalSize += size;
info.metaData.add(size, searchIndex);
return true;
}
@@ -256,16 +257,20 @@ public class WALBuffer extends AbstractWALBuffer {
private boolean handleSignalEntry(WALSignalEntry walSignalEntry) {
switch (walSignalEntry.getType()) {
case ROLL_WAL_LOG_WRITER_SIGNAL:
- logger.debug("Handle roll log writer signal for wal node-{}.", identifier);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Handle roll log writer signal for wal node-{}.", identifier);
+ }
info.rollWALFileWriterListener = walSignalEntry.getWalFlushListener();
fsyncWorkingBuffer(currentSearchIndex, currentFileStatus, info);
return true;
case CLOSE_SIGNAL:
- logger.debug(
- "Handle close signal for wal node-{}, there are {} entries left.",
- identifier,
- walEntries.size());
- boolean dataExists = batchSize > 0;
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Handle close signal for wal node-{}, there are {} entries left.",
+ identifier,
+ walEntries.size());
+ }
+ boolean dataExists = totalSize > 0;
if (dataExists) {
fsyncWorkingBuffer(currentSearchIndex, currentFileStatus, info);
}
@@ -420,6 +425,13 @@ public class WALBuffer extends AbstractWALBuffer {
public void run() {
currentWALFileWriter.updateFileStatus(fileStatus);
+ if (logger.isDebugEnabled()) {
+ double usedRatio = (double) syncingBuffer.position() / syncingBuffer.capacity();
+ logger.debug(
+ "Sync wal buffer, forceFlag: {}, buffer used: {} / {} = {}%",
+ forceFlag, syncingBuffer.position(), syncingBuffer.capacity(), usedRatio * 100);
+ }
+
// flush buffer to os
try {
currentWALFileWriter.write(syncingBuffer, info.metaData);