You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/05/08 04:22:41 UTC
[iotdb] branch native_raft updated: fix aync buffer flush do not include .log file in snapshot
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/native_raft by this push:
new 7bc46fae12 fix aync buffer flush do not include .log file in snapshot
7bc46fae12 is described below
commit 7bc46fae12abf1354dd156d300dee89e6924c5d0
Author: Tian Jiang <jt...@163.com>
AuthorDate: Mon May 8 12:25:28 2023 +0800
fix aync buffer flush
do not include .log file in snapshot
---
.../protocol/log/dispatch/DispatcherThread.java | 8 ++-
.../manager/DirectorySnapshotRaftLogManager.java | 2 -
.../serialization/SyncLogDequeSerializer.java | 75 +++++++++-------------
3 files changed, 37 insertions(+), 48 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
index 9db8191499..ae8ba266a4 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
@@ -139,7 +139,9 @@ class DispatcherThread extends DynamicThread {
long startTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
AppendEntryResult appendEntryResult = SyncClientAdaptor.appendEntries(client, request);
Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(startTime);
- handler.onComplete(appendEntryResult);
+ if (appendEntryResult != null) {
+ handler.onComplete(appendEntryResult);
+ }
} catch (Exception e) {
handler.onError(e);
}
@@ -163,7 +165,9 @@ class DispatcherThread extends DynamicThread {
AppendEntryResult appendEntryResult =
SyncClientAdaptor.appendCompressedEntries(client, request);
Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(startTime);
- handler.onComplete(appendEntryResult);
+ if (appendEntryResult != null) {
+ handler.onComplete(appendEntryResult);
+ }
} catch (Exception e) {
handler.onError(e);
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
index a76653dac4..1f99fd4e2c 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.consensus.natraft.protocol.log.applier.LogApplier;
import org.apache.iotdb.consensus.natraft.protocol.log.manager.serialization.StableEntryManager;
import org.apache.iotdb.consensus.natraft.protocol.log.snapshot.DirectorySnapshot;
import org.apache.iotdb.consensus.natraft.protocol.log.snapshot.Snapshot;
-import org.apache.iotdb.consensus.natraft.utils.IOUtils;
import java.io.File;
import java.nio.file.Path;
@@ -77,7 +76,6 @@ public class DirectorySnapshotRaftLogManager extends RaftLogManager {
}
stateMachine.takeSnapshot(latestSnapshotDir);
List<Path> snapshotFiles = stateMachine.getSnapshotFiles(latestSnapshotDir);
- snapshotFiles.addAll(IOUtils.collectPaths(latestSnapshotDir));
directorySnapshot = new DirectorySnapshot(latestSnapshotDir, snapshotFiles, currNodes);
directorySnapshot.setLastLogIndex(snapshotIndex);
directorySnapshot.setLastLogTerm(snapshotTerm);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
index 02dc4eb65c..a4ed9add01 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
@@ -399,20 +399,13 @@ public class SyncLogDequeSerializer implements StableEntryManager {
logger.error("Unexpected exception when flushing log in {}", name);
throw new RuntimeException(e);
}
- } else {
- switchBuffer();
- flushingLogFuture = flushingLogExecutorService.submit(() -> flushLogBufferTask(lastLogIndex));
}
- if (!isAsyncFlush) {
- try {
- flushingLogFuture.get();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } catch (ExecutionException e) {
- logger.error("Unexpected exception when flushing log in {}", name);
- throw new RuntimeException(e);
- }
+ switchBuffer();
+ if (isAsyncFlush) {
+ flushingLogFuture = flushingLogExecutorService.submit(() -> flushLogBufferTask(lastLogIndex));
+ } else {
+ flushLogBufferTask(lastLogIndex);
}
}
@@ -449,14 +442,14 @@ public class SyncLogDequeSerializer implements StableEntryManager {
checkCloseCurrentFile(currentLastIndex);
} catch (IOException e) {
+ logger.error("IOError in logs serialization: ", e);
+ } catch (Throwable e) {
logger.error("Error in logs serialization: ", e);
- return;
}
flushingLogDataBuffer.clear();
flushingLogIndexBuffer.clear();
- switchBuffer();
logger.debug("End flushing log buffer.");
}
@@ -796,36 +789,30 @@ public class SyncLogDequeSerializer implements StableEntryManager {
/** for unclosed file, the file name is ${startIndex}-${Long.MAX_VALUE}-{version} */
private void createNewLogFile(String dirName, long startLogIndex) throws IOException {
- lock.lock();
- try {
- long nextVersion = versionController.nextVersion();
- long endLogIndex = Long.MAX_VALUE;
-
- String fileNamePrefix =
- dirName
- + File.separator
- + startLogIndex
- + FILE_NAME_SEPARATOR
- + endLogIndex
- + FILE_NAME_SEPARATOR
- + nextVersion
- + FILE_NAME_SEPARATOR;
- File logDataFile = SystemFileFactory.INSTANCE.getFile(fileNamePrefix + LOG_DATA_FILE_SUFFIX);
- File logIndexFile =
- SystemFileFactory.INSTANCE.getFile(fileNamePrefix + LOG_INDEX_FILE_SUFFIX);
-
- if (!logDataFile.createNewFile()) {
- logger.warn("Cannot create new log data file {}", logDataFile);
- }
-
- if (!logIndexFile.createNewFile()) {
- logger.warn("Cannot create new log index file {}", logDataFile);
- }
- logDataFileList.add(logDataFile);
- logIndexFileList.add(new IndexFileDescriptor(logIndexFile, startLogIndex, endLogIndex));
- } finally {
- lock.unlock();
- }
+ long nextVersion = versionController.nextVersion();
+ long endLogIndex = Long.MAX_VALUE;
+
+ String fileNamePrefix =
+ dirName
+ + File.separator
+ + startLogIndex
+ + FILE_NAME_SEPARATOR
+ + endLogIndex
+ + FILE_NAME_SEPARATOR
+ + nextVersion
+ + FILE_NAME_SEPARATOR;
+ File logDataFile = SystemFileFactory.INSTANCE.getFile(fileNamePrefix + LOG_DATA_FILE_SUFFIX);
+ File logIndexFile = SystemFileFactory.INSTANCE.getFile(fileNamePrefix + LOG_INDEX_FILE_SUFFIX);
+
+ if (!logDataFile.createNewFile()) {
+ logger.warn("Cannot create new log data file {}", logDataFile);
+ }
+
+ if (!logIndexFile.createNewFile()) {
+ logger.warn("Cannot create new log index file {}", logDataFile);
+ }
+ logDataFileList.add(logDataFile);
+ logIndexFileList.add(new IndexFileDescriptor(logIndexFile, startLogIndex, endLogIndex));
}
private File getCurrentLogDataFile() {