You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/20 06:15:25 UTC
[iotdb] branch master updated: Format SerializeTask in WALBuffer (#5603)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 564b286edd Format SerializeTask in WALBuffer (#5603)
564b286edd is described below
commit 564b286eddbc154a699451d52b603f1ba72264e2
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Wed Apr 20 14:15:20 2022 +0800
Format SerializeTask in WALBuffer (#5603)
---
.../org/apache/iotdb/db/wal/buffer/WALBuffer.java | 128 ++++++++++++---------
.../java/org/apache/iotdb/db/wal/node/WALNode.java | 6 +-
2 files changed, 75 insertions(+), 59 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
index 916bb4846d..27c6ca15fa 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
@@ -123,6 +123,9 @@ public class WALBuffer extends AbstractWALBuffer {
private final IWALByteBufferView byteBufferVew = new ByteBufferView();
private final List<WALFlushListener> fsyncListeners = new LinkedList<>();
+ private int batchSize = 0;
+ private WALFlushListener rollWALFileWriterListener = null;
+
@Override
public void run() {
try {
@@ -134,32 +137,12 @@ public class WALBuffer extends AbstractWALBuffer {
/** In order to control memory usage of blocking queue, get 1 and then serialize 1 */
private void serialize() {
- WALFlushListener rollWAlFileWriterListener = null;
- int batchSize = 0;
-
// try to get first WALEntry with blocking interface
try {
WALEntry firstWALEntry = walEntries.take();
- if (!firstWALEntry.isSignal()) {
- try {
- firstWALEntry.serialize(byteBufferVew);
- ++batchSize;
- fsyncListeners.add(firstWALEntry.getWalFlushListener());
- } catch (Exception e) {
- logger.error(
- "Fail to serialize WALEntry to wal node-{}'s buffer, discard it.", identifier, e);
- firstWALEntry.getWalFlushListener().fail(e);
- }
- } else {
- switch (((SignalWALEntry) firstWALEntry).getSignalType()) {
- case ROLL_WAL_LOG_WRITER_SIGNAL:
- rollWAlFileWriterListener = firstWALEntry.getWalFlushListener();
- fsyncWorkingBuffer(fsyncListeners, rollWAlFileWriterListener);
- return;
- case CLOSE_SIGNAL:
- default:
- break;
- }
+ boolean returnFlag = handleWALEntry(firstWALEntry);
+ if (returnFlag) {
+ return;
}
} catch (InterruptedException e) {
logger.warn(
@@ -181,33 +164,66 @@ public class WALBuffer extends AbstractWALBuffer {
// try to get more WALEntries with non-blocking interface to enlarge write batch
while (walEntries.peek() != null && batchSize < QUEUE_CAPACITY) {
WALEntry walEntry = walEntries.poll();
- if (!walEntry.isSignal()) {
- try {
- walEntry.serialize(byteBufferVew);
- } catch (Exception e) {
- logger.error(
- "Fail to serialize WALEntry to wal node-{}'s buffer, discard it.", identifier, e);
- walEntry.getWalFlushListener().fail(e);
- continue;
- }
- ++batchSize;
- fsyncListeners.add(walEntry.getWalFlushListener());
- } else {
- switch (((SignalWALEntry) walEntry).getSignalType()) {
- case ROLL_WAL_LOG_WRITER_SIGNAL:
- rollWAlFileWriterListener = walEntry.getWalFlushListener();
- break;
- case CLOSE_SIGNAL:
- default:
- break;
- }
- break;
+ boolean returnFlag = handleWALEntry(walEntry);
+ if (returnFlag) {
+ return;
}
}
// call fsync at last and set fsyncListeners
- if (batchSize > 0 || rollWAlFileWriterListener != null) {
- fsyncWorkingBuffer(fsyncListeners, rollWAlFileWriterListener);
+ if (batchSize > 0) {
+ fsyncWorkingBuffer(fsyncListeners, rollWALFileWriterListener);
+ }
+ }
+
+ /**
+ * @return true if fsyncWorkingBuffer has been called, which means this serialization task
+ * should be ended.
+ */
+ private boolean handleWALEntry(WALEntry walEntry) {
+ if (walEntry.isSignal()) {
+ return handleSignalEntry((SignalWALEntry) walEntry);
+ }
+
+ boolean success = handleInfoEntry(walEntry);
+ if (success) {
+ ++batchSize;
+ fsyncListeners.add(walEntry.getWalFlushListener());
+ }
+ return false;
+ }
+
+ /** @return true if serialization is successful. */
+ private boolean handleInfoEntry(WALEntry walEntry) {
+ try {
+ walEntry.serialize(byteBufferVew);
+ } catch (Exception e) {
+ logger.error(
+ "Fail to serialize WALEntry to wal node-{}'s buffer, discard it.", identifier, e);
+ walEntry.getWalFlushListener().fail(e);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * @return true if fsyncWorkingBuffer has been called, which means this serialization task
+ * should be ended.
+ */
+ private boolean handleSignalEntry(SignalWALEntry signalWALEntry) {
+ switch (signalWALEntry.getSignalType()) {
+ case ROLL_WAL_LOG_WRITER_SIGNAL:
+ rollWALFileWriterListener = signalWALEntry.getWalFlushListener();
+ fsyncWorkingBuffer(fsyncListeners, rollWALFileWriterListener);
+ return true;
+ case CLOSE_SIGNAL:
+ boolean dataExists = batchSize > 0;
+ if (dataExists) {
+ fsyncWorkingBuffer(fsyncListeners, rollWALFileWriterListener);
+ }
+ return dataExists;
+ default:
+ return false;
}
}
}
@@ -295,9 +311,9 @@ public class WALBuffer extends AbstractWALBuffer {
/** Notice: this method only called at the last of SerializeTask. */
private void fsyncWorkingBuffer(
- List<WALFlushListener> fsyncListeners, WALFlushListener rollWAlFileWriterListener) {
+ List<WALFlushListener> fsyncListeners, WALFlushListener rollWALFileWriterListener) {
switchWorkingBufferToFlushing();
- syncBufferThread.submit(new SyncBufferTask(true, fsyncListeners, rollWAlFileWriterListener));
+ syncBufferThread.submit(new SyncBufferTask(true, fsyncListeners, rollWALFileWriterListener));
}
// only called by serializeThread
@@ -327,7 +343,7 @@ public class WALBuffer extends AbstractWALBuffer {
private class SyncBufferTask implements Runnable {
private final boolean forceFlag;
private final List<WALFlushListener> fsyncListeners;
- private final WALFlushListener rollWAlFileWriterListener;
+ private final WALFlushListener rollWALFileWriterListener;
public SyncBufferTask(boolean forceFlag) {
this(forceFlag, null, null);
@@ -336,10 +352,10 @@ public class WALBuffer extends AbstractWALBuffer {
public SyncBufferTask(
boolean forceFlag,
List<WALFlushListener> fsyncListeners,
- WALFlushListener rollWAlFileWriterListener) {
+ WALFlushListener rollWALFileWriterListener) {
this.forceFlag = forceFlag;
this.fsyncListeners = fsyncListeners == null ? Collections.emptyList() : fsyncListeners;
- this.rollWAlFileWriterListener = rollWAlFileWriterListener;
+ this.rollWALFileWriterListener = rollWALFileWriterListener;
}
@Override
@@ -377,12 +393,12 @@ public class WALBuffer extends AbstractWALBuffer {
// try to roll log writer
try {
- if (rollWAlFileWriterListener != null
+ if (rollWALFileWriterListener != null
|| (forceFlag
&& currentWALFileWriter.size() >= config.getWalFileSizeThresholdInByte())) {
rollLogWriter();
- if (rollWAlFileWriterListener != null) {
- rollWAlFileWriterListener.succeed();
+ if (rollWALFileWriterListener != null) {
+ rollWALFileWriterListener.succeed();
}
}
} catch (IOException e) {
@@ -390,8 +406,8 @@ public class WALBuffer extends AbstractWALBuffer {
"Fail to roll wal node-{}'s log writer, change system mode to read-only.",
identifier,
e);
- if (rollWAlFileWriterListener != null) {
- rollWAlFileWriterListener.fail(e);
+ if (rollWALFileWriterListener != null) {
+ rollWALFileWriterListener.fail(e);
}
config.setReadOnly(true);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
index 581dcb03e7..57344a7f2d 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
@@ -82,7 +82,7 @@ public class WALNode implements IWALNode {
*/
private final AtomicLong totalCostOfFlushedMemTables = new AtomicLong();
/** version id -> cost sum of memTables flushed at this file version */
- private final Map<Integer, Long> walFileVersionId2MemTableCostSum = new ConcurrentHashMap<>();
+ private final Map<Integer, Long> walFileVersionId2MemTablesTotalCost = new ConcurrentHashMap<>();
public WALNode(String identifier, String logDirectory) throws FileNotFoundException {
this.identifier = identifier;
@@ -153,7 +153,7 @@ public class WALNode implements IWALNode {
// update cost info
long cost = config.isEnableMemControl() ? memTable.getTVListsRamCost() : 1;
int currentWALFileVersion = buffer.getCurrentWALFileVersion();
- walFileVersionId2MemTableCostSum.compute(
+ walFileVersionId2MemTablesTotalCost.compute(
currentWALFileVersion, (k, v) -> v == null ? cost : v + cost);
totalCostOfFlushedMemTables.addAndGet(cost);
}
@@ -242,7 +242,7 @@ public class WALNode implements IWALNode {
}
// update totalRamCostOfFlushedMemTables
int versionId = WALWriter.parseVersionId(file.getName());
- Long memTableRamCostSum = walFileVersionId2MemTableCostSum.remove(versionId);
+ Long memTableRamCostSum = walFileVersionId2MemTablesTotalCost.remove(versionId);
if (memTableRamCostSum != null) {
totalCostOfFlushedMemTables.addAndGet(-memTableRamCostSum);
}