You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/19 14:06:32 UTC
[iotdb] branch master updated: [IOTDB-2917] Fix flush operation has one residual wal file (#5560)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e891a68c13 [IOTDB-2917] Fix flush operation has one residual wal file (#5560)
e891a68c13 is described below
commit e891a68c132dabc3a404eafe3b9b77376313465b
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Tue Apr 19 22:06:27 2022 +0800
[IOTDB-2917] Fix flush operation has one residual wal file (#5560)
---
.../java/org/apache/iotdb/db/service/IoTDB.java | 2 +-
.../org/apache/iotdb/db/wal/buffer/WALBuffer.java | 49 +++++++++++++---------
2 files changed, 30 insertions(+), 21 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index d67b6048ab..2bebd7196e 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -139,6 +139,7 @@ public class IoTDB implements IoTDBMBean {
registerManager.register(CacheHitRatioMonitor.getInstance());
registerManager.register(CompactionTaskManager.getInstance());
JMXService.registerMBean(getInstance(), mbeanName);
+ registerManager.register(WALManager.getInstance());
// in mpp mode we need to start some other services
if (IoTDBDescriptor.getInstance().getConfig().isMppMode()) {
@@ -153,7 +154,6 @@ public class IoTDB implements IoTDBMBean {
registerManager.register(StorageEngine.getInstance());
}
- registerManager.register(WALManager.getInstance());
registerManager.register(TemporaryQueryDataFileService.getInstance());
registerManager.register(UDFClassLoaderManager.getInstance());
registerManager.register(UDFRegistrationService.getInstance());
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
index 670e81607e..916bb4846d 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
@@ -134,7 +134,7 @@ public class WALBuffer extends AbstractWALBuffer {
/** In order to control memory usage of blocking queue, get 1 and then serialize 1 */
private void serialize() {
- boolean rollWAlFileWriter = false;
+ WALFlushListener rollWAlFileWriterListener = null;
int batchSize = 0;
// try to get first WALEntry with blocking interface
@@ -153,9 +153,9 @@ public class WALBuffer extends AbstractWALBuffer {
} else {
switch (((SignalWALEntry) firstWALEntry).getSignalType()) {
case ROLL_WAL_LOG_WRITER_SIGNAL:
- rollWAlFileWriter = true;
- fsyncListeners.add(firstWALEntry.getWalFlushListener());
- break;
+ rollWAlFileWriterListener = firstWALEntry.getWalFlushListener();
+ fsyncWorkingBuffer(fsyncListeners, rollWAlFileWriterListener);
+ return;
case CLOSE_SIGNAL:
default:
break;
@@ -195,8 +195,7 @@ public class WALBuffer extends AbstractWALBuffer {
} else {
switch (((SignalWALEntry) walEntry).getSignalType()) {
case ROLL_WAL_LOG_WRITER_SIGNAL:
- rollWAlFileWriter = true;
- fsyncListeners.add(walEntry.getWalFlushListener());
+ rollWAlFileWriterListener = walEntry.getWalFlushListener();
break;
case CLOSE_SIGNAL:
default:
@@ -207,8 +206,8 @@ public class WALBuffer extends AbstractWALBuffer {
}
// call fsync at last and set fsyncListeners
- if (batchSize > 0 || rollWAlFileWriter) {
- fsyncWorkingBuffer(fsyncListeners, rollWAlFileWriter);
+ if (batchSize > 0 || rollWAlFileWriterListener != null) {
+ fsyncWorkingBuffer(fsyncListeners, rollWAlFileWriterListener);
}
}
}
@@ -296,9 +295,9 @@ public class WALBuffer extends AbstractWALBuffer {
/** Notice: this method only called at the last of SerializeTask. */
private void fsyncWorkingBuffer(
- List<WALFlushListener> fsyncListeners, boolean rollWAlFileWriter) {
+ List<WALFlushListener> fsyncListeners, WALFlushListener rollWAlFileWriterListener) {
switchWorkingBufferToFlushing();
- syncBufferThread.submit(new SyncBufferTask(true, rollWAlFileWriter, fsyncListeners));
+ syncBufferThread.submit(new SyncBufferTask(true, fsyncListeners, rollWAlFileWriterListener));
}
// only called by serializeThread
@@ -326,19 +325,21 @@ public class WALBuffer extends AbstractWALBuffer {
* This task syncs syncingBuffer to disk. The precondition is that syncingBuffer cannot be null.
*/
private class SyncBufferTask implements Runnable {
- private final boolean force;
- private final boolean rollWAlFileWriter;
+ private final boolean forceFlag;
private final List<WALFlushListener> fsyncListeners;
+ private final WALFlushListener rollWAlFileWriterListener;
- public SyncBufferTask(boolean force) {
- this(force, false, Collections.emptyList());
+ public SyncBufferTask(boolean forceFlag) {
+ this(forceFlag, null, null);
}
public SyncBufferTask(
- boolean force, boolean rollWAlFileWriter, List<WALFlushListener> fsyncListeners) {
- this.force = force;
- this.rollWAlFileWriter = rollWAlFileWriter;
+ boolean forceFlag,
+ List<WALFlushListener> fsyncListeners,
+ WALFlushListener rollWAlFileWriterListener) {
+ this.forceFlag = forceFlag;
this.fsyncListeners = fsyncListeners == null ? Collections.emptyList() : fsyncListeners;
+ this.rollWAlFileWriterListener = rollWAlFileWriterListener;
}
@Override
@@ -355,7 +356,7 @@ public class WALBuffer extends AbstractWALBuffer {
}
// force os cache to the storage device
- if (force) {
+ if (forceFlag) {
try {
currentWALFileWriter.force();
} catch (IOException e) {
@@ -368,6 +369,7 @@ public class WALBuffer extends AbstractWALBuffer {
}
config.setReadOnly(true);
}
+ // notify all waiting listeners
for (WALFlushListener fsyncListener : fsyncListeners) {
fsyncListener.succeed();
}
@@ -375,15 +377,22 @@ public class WALBuffer extends AbstractWALBuffer {
// try to roll log writer
try {
- if (rollWAlFileWriter
- || (force && currentWALFileWriter.size() >= config.getWalFileSizeThresholdInByte())) {
+ if (rollWAlFileWriterListener != null
+ || (forceFlag
+ && currentWALFileWriter.size() >= config.getWalFileSizeThresholdInByte())) {
rollLogWriter();
+ if (rollWAlFileWriterListener != null) {
+ rollWAlFileWriterListener.succeed();
+ }
}
} catch (IOException e) {
logger.error(
"Fail to roll wal node-{}'s log writer, change system mode to read-only.",
identifier,
e);
+ if (rollWAlFileWriterListener != null) {
+ rollWAlFileWriterListener.fail(e);
+ }
config.setReadOnly(true);
}
}