You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/05/08 04:22:41 UTC

[iotdb] branch native_raft updated: fix aync buffer flush do not include .log file in snapshot

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/native_raft by this push:
     new 7bc46fae12 fix aync buffer flush do not include .log file in snapshot
7bc46fae12 is described below

commit 7bc46fae12abf1354dd156d300dee89e6924c5d0
Author: Tian Jiang <jt...@163.com>
AuthorDate: Mon May 8 12:25:28 2023 +0800

    fix aync buffer flush
    do not include .log file in snapshot
---
 .../protocol/log/dispatch/DispatcherThread.java    |  8 ++-
 .../manager/DirectorySnapshotRaftLogManager.java   |  2 -
 .../serialization/SyncLogDequeSerializer.java      | 75 +++++++++-------------
 3 files changed, 37 insertions(+), 48 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
index 9db8191499..ae8ba266a4 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
@@ -139,7 +139,9 @@ class DispatcherThread extends DynamicThread {
       long startTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
       AppendEntryResult appendEntryResult = SyncClientAdaptor.appendEntries(client, request);
       Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(startTime);
-      handler.onComplete(appendEntryResult);
+      if (appendEntryResult != null) {
+        handler.onComplete(appendEntryResult);
+      }
     } catch (Exception e) {
       handler.onError(e);
     }
@@ -163,7 +165,9 @@ class DispatcherThread extends DynamicThread {
       AppendEntryResult appendEntryResult =
           SyncClientAdaptor.appendCompressedEntries(client, request);
       Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(startTime);
-      handler.onComplete(appendEntryResult);
+      if (appendEntryResult != null) {
+        handler.onComplete(appendEntryResult);
+      }
     } catch (Exception e) {
       handler.onError(e);
     }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
index a76653dac4..1f99fd4e2c 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.consensus.natraft.protocol.log.applier.LogApplier;
 import org.apache.iotdb.consensus.natraft.protocol.log.manager.serialization.StableEntryManager;
 import org.apache.iotdb.consensus.natraft.protocol.log.snapshot.DirectorySnapshot;
 import org.apache.iotdb.consensus.natraft.protocol.log.snapshot.Snapshot;
-import org.apache.iotdb.consensus.natraft.utils.IOUtils;
 
 import java.io.File;
 import java.nio.file.Path;
@@ -77,7 +76,6 @@ public class DirectorySnapshotRaftLogManager extends RaftLogManager {
     }
     stateMachine.takeSnapshot(latestSnapshotDir);
     List<Path> snapshotFiles = stateMachine.getSnapshotFiles(latestSnapshotDir);
-    snapshotFiles.addAll(IOUtils.collectPaths(latestSnapshotDir));
     directorySnapshot = new DirectorySnapshot(latestSnapshotDir, snapshotFiles, currNodes);
     directorySnapshot.setLastLogIndex(snapshotIndex);
     directorySnapshot.setLastLogTerm(snapshotTerm);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
index 02dc4eb65c..a4ed9add01 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
@@ -399,20 +399,13 @@ public class SyncLogDequeSerializer implements StableEntryManager {
         logger.error("Unexpected exception when flushing log in {}", name);
         throw new RuntimeException(e);
       }
-    } else {
-      switchBuffer();
-      flushingLogFuture = flushingLogExecutorService.submit(() -> flushLogBufferTask(lastLogIndex));
     }
 
-    if (!isAsyncFlush) {
-      try {
-        flushingLogFuture.get();
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      } catch (ExecutionException e) {
-        logger.error("Unexpected exception when flushing log in {}", name);
-        throw new RuntimeException(e);
-      }
+    switchBuffer();
+    if (isAsyncFlush) {
+      flushingLogFuture = flushingLogExecutorService.submit(() -> flushLogBufferTask(lastLogIndex));
+    } else {
+      flushLogBufferTask(lastLogIndex);
     }
   }
 
@@ -449,14 +442,14 @@ public class SyncLogDequeSerializer implements StableEntryManager {
 
       checkCloseCurrentFile(currentLastIndex);
     } catch (IOException e) {
+      logger.error("IOError in logs serialization: ", e);
+    } catch (Throwable e) {
       logger.error("Error in logs serialization: ", e);
-      return;
     }
 
     flushingLogDataBuffer.clear();
     flushingLogIndexBuffer.clear();
 
-    switchBuffer();
     logger.debug("End flushing log buffer.");
   }
 
@@ -796,36 +789,30 @@ public class SyncLogDequeSerializer implements StableEntryManager {
 
   /** for unclosed file, the file name is ${startIndex}-${Long.MAX_VALUE}-{version} */
   private void createNewLogFile(String dirName, long startLogIndex) throws IOException {
-    lock.lock();
-    try {
-      long nextVersion = versionController.nextVersion();
-      long endLogIndex = Long.MAX_VALUE;
-
-      String fileNamePrefix =
-          dirName
-              + File.separator
-              + startLogIndex
-              + FILE_NAME_SEPARATOR
-              + endLogIndex
-              + FILE_NAME_SEPARATOR
-              + nextVersion
-              + FILE_NAME_SEPARATOR;
-      File logDataFile = SystemFileFactory.INSTANCE.getFile(fileNamePrefix + LOG_DATA_FILE_SUFFIX);
-      File logIndexFile =
-          SystemFileFactory.INSTANCE.getFile(fileNamePrefix + LOG_INDEX_FILE_SUFFIX);
-
-      if (!logDataFile.createNewFile()) {
-        logger.warn("Cannot create new log data file {}", logDataFile);
-      }
-
-      if (!logIndexFile.createNewFile()) {
-        logger.warn("Cannot create new log index file {}", logDataFile);
-      }
-      logDataFileList.add(logDataFile);
-      logIndexFileList.add(new IndexFileDescriptor(logIndexFile, startLogIndex, endLogIndex));
-    } finally {
-      lock.unlock();
-    }
+    long nextVersion = versionController.nextVersion();
+    long endLogIndex = Long.MAX_VALUE;
+
+    String fileNamePrefix =
+        dirName
+            + File.separator
+            + startLogIndex
+            + FILE_NAME_SEPARATOR
+            + endLogIndex
+            + FILE_NAME_SEPARATOR
+            + nextVersion
+            + FILE_NAME_SEPARATOR;
+    File logDataFile = SystemFileFactory.INSTANCE.getFile(fileNamePrefix + LOG_DATA_FILE_SUFFIX);
+    File logIndexFile = SystemFileFactory.INSTANCE.getFile(fileNamePrefix + LOG_INDEX_FILE_SUFFIX);
+
+    if (!logDataFile.createNewFile()) {
+      logger.warn("Cannot create new log data file {}", logDataFile);
+    }
+
+    if (!logIndexFile.createNewFile()) {
+      logger.warn("Cannot create new log index file {}", logDataFile);
+    }
+    logDataFileList.add(logDataFile);
+    logIndexFileList.add(new IndexFileDescriptor(logIndexFile, startLogIndex, endLogIndex));
   }
 
   private File getCurrentLogDataFile() {