You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2017/12/12 10:09:12 UTC

incubator-ratis git commit: RATIS-124. RaftLog should be sync'ed on all the entries appended but not the last entry. Contributed by Kit Hui

Repository: incubator-ratis
Updated Branches:
  refs/heads/master 9a8daa1d5 -> c1b23fdb0


RATIS-124. RaftLog should be sync'ed on all the entries appended but not the last entry. Contributed by Kit Hui


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/c1b23fdb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/c1b23fdb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/c1b23fdb

Branch: refs/heads/master
Commit: c1b23fdb0bfe5d3e9937d60f0baa43772737f002
Parents: 9a8daa1
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Tue Dec 12 18:07:25 2017 +0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Tue Dec 12 18:07:25 2017 +0800

----------------------------------------------------------------------
 .../ratis/server/impl/RaftServerImpl.java       | 15 +++---
 .../ratis/server/storage/MemoryRaftLog.java     | 33 ++++++------
 .../apache/ratis/server/storage/RaftLog.java    | 17 +++---
 .../ratis/server/storage/RaftLogWorker.java     | 23 ++++----
 .../ratis/server/storage/SegmentedRaftLog.java  | 55 ++++++++++----------
 .../apache/ratis/statemachine/StateMachine.java |  3 +-
 .../ratis/server/storage/TestCacheEviction.java |  7 ++-
 .../server/storage/TestSegmentedRaftLog.java    | 20 +++----
 8 files changed, 81 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c1b23fdb/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index bc45b5f..8b784e6 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -25,6 +25,7 @@ import org.apache.ratis.server.RaftServerRpc;
 import org.apache.ratis.server.protocol.RaftServerProtocol;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.FileInfo;
+import org.apache.ratis.server.storage.RaftLog;
 import org.apache.ratis.shaded.com.google.common.annotations.VisibleForTesting;
 import org.apache.ratis.shaded.proto.RaftProtos.*;
 import org.apache.ratis.statemachine.SnapshotInfo;
@@ -36,7 +37,6 @@ import org.slf4j.LoggerFactory;
 
 import javax.management.ObjectName;
 import java.io.IOException;
-import java.io.InterruptedIOException;
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -723,6 +723,8 @@ public class RaftServerImpl implements RaftServerProtocol,
       throw new IOException(e);
     }
 
+    final List<CompletableFuture<Long>> futures;
+
     final long currentTerm;
     long nextIndex = state.getLog().getNextIndex();
     synchronized (this) {
@@ -764,16 +766,17 @@ public class RaftServerImpl implements RaftServerProtocol,
         return reply;
       }
 
-      state.getLog().append(entries);
+      futures = state.getLog().append(entries);
+
       state.updateConfiguration(entries);
       state.updateStatemachine(leaderCommit, currentTerm);
     }
     if (entries.length > 0) {
-      try {
-        state.getLog().logSync();
-      } catch (InterruptedException e) {
-        throw new InterruptedIOException("logSync got interrupted");
+      CodeInjectionForTesting.execute(RaftLog.LOG_SYNC, getId(), null);
+      for (CompletableFuture future : futures) {
+        future.join();
       }
+
       nextIndex = entries[entries.length - 1].getIndex() + 1;
     }
     synchronized (this) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c1b23fdb/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
index ecbae2e..3c14d2c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
@@ -17,9 +17,6 @@
  */
 package org.apache.ratis.server.storage;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.impl.RaftConfiguration;
 import org.apache.ratis.server.impl.RaftServerConstants;
@@ -27,10 +24,14 @@ import org.apache.ratis.server.impl.ServerProtoUtils;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.util.AutoCloseableLock;
-import org.apache.ratis.util.CodeInjectionForTesting;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.ProtoUtils;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
 /**
  * A simple RaftLog implementation in memory. Used only for testing.
  */
@@ -79,7 +80,7 @@ public class MemoryRaftLog extends RaftLog {
   }
 
   @Override
-  void truncate(long index) {
+  CompletableFuture<Long> truncate(long index) {
     checkLogState();
     try(AutoCloseableLock writeLock = writeLock()) {
       Preconditions.assertTrue(index >= 0);
@@ -88,6 +89,7 @@ public class MemoryRaftLog extends RaftLog {
         entries.remove(i);
       }
     }
+    return CompletableFuture.completedFuture(index);
   }
 
   @Override
@@ -100,11 +102,12 @@ public class MemoryRaftLog extends RaftLog {
   }
 
   @Override
-  void appendEntry(LogEntryProto entry) {
+  CompletableFuture<Long> appendEntry(LogEntryProto entry) {
     checkLogState();
     try(AutoCloseableLock writeLock = writeLock()) {
       entries.add(entry);
     }
+    return CompletableFuture.completedFuture(entry.getIndex());
   }
 
   @Override
@@ -126,11 +129,11 @@ public class MemoryRaftLog extends RaftLog {
   }
 
   @Override
-  public void append(LogEntryProto... entries) {
+  public List<CompletableFuture<Long>> append(LogEntryProto... entries) {
     checkLogState();
     try(AutoCloseableLock writeLock = writeLock()) {
       if (entries == null || entries.length == 0) {
-        return;
+        return Collections.emptyList();
       }
       // Before truncating the entries, we first need to check if some
       // entries are duplicated. If the leader sends entry 6, entry 7, then
@@ -151,12 +154,18 @@ public class MemoryRaftLog extends RaftLog {
           break;
         }
       }
+      final List<CompletableFuture<Long>> futures;
       if (toTruncate) {
-        truncate(truncateIndex);
+        futures = new ArrayList<>(entries.length - index + 1);
+        futures.add(truncate(truncateIndex));
+      } else {
+        futures = new ArrayList<>(entries.length - index);
       }
       for (int i = index; i < entries.length; i++) {
         this.entries.add(entries[i]);
+        futures.add(CompletableFuture.completedFuture(entries[i].getIndex()));
       }
+      return futures;
     }
   }
 
@@ -171,12 +180,6 @@ public class MemoryRaftLog extends RaftLog {
   }
 
   @Override
-  public void logSync() {
-    CodeInjectionForTesting.execute(LOG_SYNC, getSelfId(), null);
-    // do nothing
-  }
-
-  @Override
   public long getLatestFlushedIndex() {
     return getNextIndex() - 1;
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c1b23fdb/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
index ac86582..8edb8a1 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
@@ -35,10 +35,11 @@ import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Consumer;
-
 /**
  * Base class of RaftLog. Currently we provide two types of RaftLog
  * implementation:
@@ -207,13 +208,13 @@ public abstract class RaftLog implements Closeable {
    * Truncate the log entries till the given index. The log with the given index
    * will also be truncated (i.e., inclusive).
    */
-  abstract void truncate(long index);
+  abstract CompletableFuture<Long> truncate(long index);
 
   /**
    * Used by the leader when appending a new entry based on client's request
    * or configuration change.
    */
-  abstract void appendEntry(LogEntryProto entry);
+  abstract CompletableFuture<Long> appendEntry(LogEntryProto entry);
 
   /**
    * Append all the given log entries. Used by the followers.
@@ -224,15 +225,9 @@ public abstract class RaftLog implements Closeable {
    * This method, {@link #append(long, TransactionContext, ClientId, long)},
    * {@link #append(long, RaftConfiguration)}, and {@link #truncate(long)},
    * do not guarantee the changes are persisted.
-   * Need to call {@link #logSync()} to persist the changes.
-   */
-  public abstract void append(LogEntryProto... entries);
-
-  /**
-   * Flush and sync the log.
-   * It is triggered by AppendEntries RPC request from the leader.
+   * Need to wait for the returned futures to persist the changes.
    */
-  public abstract void logSync() throws InterruptedException;
+  public abstract List<CompletableFuture<Long>> append(LogEntryProto... entries);
 
   /**
    * @return the index of the latest entry that has been flushed to the local

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c1b23fdb/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
index 2e1177f..81bd22d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
@@ -37,7 +37,10 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.concurrent.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 
 /**
@@ -267,31 +270,23 @@ class RaftLogWorker implements Runnable {
   private class WriteLog extends Task {
     private final LogEntryProto entry;
     private final CompletableFuture<?> stateMachineFuture;
+    private final CompletableFuture<Long> combined;
 
     WriteLog(LogEntryProto entry) {
       this.entry = ProtoUtils.removeStateMachineData(entry);
       if (this.entry == entry || stateMachine == null) {
         this.stateMachineFuture = null;
+        this.combined = super.getFuture();
       } else {
         // this.entry != entry iff the entry has state machine data
         this.stateMachineFuture = stateMachine.writeStateMachineData(entry);
+        this.combined = super.getFuture().thenCombine(stateMachineFuture, (index, stateMachineResult) -> index);
       }
     }
 
     @Override
-    void waitForDone() throws InterruptedException {
-      super.waitForDone();
-      // TODO: It does not work since logSync only wait for the last task L.
-      // TODO: If some task T earlier than L has a writeStateMachineData future, it will not be sync'ed.
-      // TODO: Need RATIS-124
-
-      if (stateMachineFuture != null) {
-        try {
-          stateMachineFuture.get();
-        } catch (ExecutionException e) {
-          // ignore
-        }
-      }
+    CompletableFuture<Long> getFuture() {
+      return combined;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c1b23fdb/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
index 5286738..76b2bd7 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
@@ -29,14 +29,16 @@ import org.apache.ratis.server.storage.LogSegment.LogRecordWithEntry;
 import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.util.AutoCloseableLock;
-import org.apache.ratis.util.CodeInjectionForTesting;
 import org.apache.ratis.util.Preconditions;
 
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 
 /**
@@ -73,19 +75,17 @@ public class SegmentedRaftLog extends RaftLog {
    * I/O task definitions.
    */
   static abstract class Task {
-    private boolean done = false;
+    private final CompletableFuture<Long> future = new CompletableFuture<>();
 
-    synchronized void done() {
-      done = true;
-      notifyAll();
+    CompletableFuture<Long> getFuture() {
+      return future;
     }
 
-    synchronized void waitForDone() throws InterruptedException {
-      while (!done) {
-        wait();
-      }
+    void done() {
+      future.complete(getEndIndex());
     }
 
+
     abstract void execute() throws IOException;
 
     abstract long getEndIndex();
@@ -95,7 +95,6 @@ public class SegmentedRaftLog extends RaftLog {
       return getClass().getSimpleName() + ":" + getEndIndex();
     }
   }
-  private static final ThreadLocal<Task> myTask = new ThreadLocal<>();
 
   private final RaftServerImpl server;
   private final RaftStorage storage;
@@ -232,19 +231,21 @@ public class SegmentedRaftLog extends RaftLog {
    * {@link #append(LogEntryProto...)} need protection of RaftServer's lock.
    */
   @Override
-  void truncate(long index) {
+  CompletableFuture<Long> truncate(long index) {
     checkLogState();
     try(AutoCloseableLock writeLock = writeLock()) {
       RaftLogCache.TruncationSegments ts = cache.truncate(index);
       if (ts != null) {
         Task task = fileLogWorker.truncate(ts);
-        myTask.set(task);
+        return task.getFuture();
       }
     }
+    return CompletableFuture.completedFuture(index);
   }
 
   @Override
-  void appendEntry(LogEntryProto entry) {
+  CompletableFuture<Long> appendEntry(LogEntryProto entry) {
+
     checkLogState();
     if (LOG.isTraceEnabled()) {
       LOG.trace("{}: appendEntry {}", server.getId(),
@@ -272,7 +273,7 @@ public class SegmentedRaftLog extends RaftLog {
       }
 
       cache.appendEntry(entry);
-      myTask.set(fileLogWorker.writeLogEntry(entry));
+      return fileLogWorker.writeLogEntry(entry).getFuture();
     }
   }
 
@@ -289,11 +290,13 @@ public class SegmentedRaftLog extends RaftLog {
   }
 
   @Override
-  public void append(LogEntryProto... entries) {
+  public List<CompletableFuture<Long>> append(LogEntryProto... entries) {
+
     checkLogState();
     if (entries == null || entries.length == 0) {
-      return;
+      return Collections.emptyList();
     }
+
     try(AutoCloseableLock writeLock = writeLock()) {
       Iterator<TermIndex> iter = cache.iterator(entries[0].getIndex());
       int index = 0;
@@ -318,25 +321,21 @@ public class SegmentedRaftLog extends RaftLog {
           break;
         }
       }
+
+      final List<CompletableFuture<Long>> futures;
       if (truncateIndex != -1) {
-        // truncate from truncateIndex
-        truncate(truncateIndex);
+        futures = new ArrayList<>(entries.length - index + 1);
+        futures.add(truncate(truncateIndex));
+      } else {
+        futures = new ArrayList<>(entries.length - index);
       }
-      // append from entries[index]
       for (int i = index; i < entries.length; i++) {
-        appendEntry(entries[i]);
+        futures.add(appendEntry(entries[i]));
       }
+      return futures;
     }
   }
 
-  @Override
-  public void logSync() throws InterruptedException {
-    CodeInjectionForTesting.execute(LOG_SYNC, getSelfId(), null);
-    final Task task = myTask.get();
-    if (task != null) {
-      task.waitForDone();
-    }
-  }
 
   @Override
   public long getLatestFlushedIndex() {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c1b23fdb/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index ae26cb1..1d47815 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -128,8 +128,7 @@ public interface StateMachine extends Closeable {
   /**
    * Write asynchronously the state machine data to this state machine.
    *
-   * @return a future for the write task
-   *         if {@link RaftLog#logSync()} should also sync writing the state machine data;
+   * @return a future for the write task if the state machine data should be sync'ed;
    *         otherwise, return null.
    */
   default CompletableFuture<?> writeStateMachineData(LogEntryProto entry) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c1b23fdb/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java
index 8ce33f5..c94fb73 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java
@@ -41,6 +41,7 @@ import org.mockito.Mockito;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 
 public class TestCacheEviction extends BaseTest {
   private static final CacheInvalidationPolicy policy = new CacheInvalidationPolicyDefault();
@@ -168,8 +169,7 @@ public class TestCacheEviction extends BaseTest {
     raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
     List<SegmentRange> slist = TestSegmentedRaftLog.prepareRanges(maxCachedNum, 7, 0);
     LogEntryProto[] entries = generateEntries(slist);
-    raftLog.append(entries);
-    raftLog.logSync();
+    raftLog.append(entries).forEach(CompletableFuture::join);
 
     // check the current cached segment number: the last segment is still open
     Assert.assertEquals(maxCachedNum - 1,
@@ -179,8 +179,7 @@ public class TestCacheEviction extends BaseTest {
     Mockito.when(state.getLastAppliedIndex()).thenReturn(35L);
     slist = TestSegmentedRaftLog.prepareRanges(2, 7, 7 * maxCachedNum);
     entries = generateEntries(slist);
-    raftLog.append(entries);
-    raftLog.logSync();
+    raftLog.append(entries).forEach(CompletableFuture::join);
 
     // check the cached segment number again. since the slowest follower is on
     // index 21, the eviction should happen and evict 3 segments

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c1b23fdb/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
index 09b31c1..2a0ec60 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
@@ -42,6 +42,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
 
 public class TestSegmentedRaftLog extends BaseTest {
@@ -192,8 +193,7 @@ public class TestSegmentedRaftLog extends BaseTest {
              new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
       raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
       // append entries to the raftlog
-      entries.forEach(raftLog::appendEntry);
-      raftLog.logSync();
+      entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
     }
 
     try (SegmentedRaftLog raftLog =
@@ -221,8 +221,7 @@ public class TestSegmentedRaftLog extends BaseTest {
              new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
       raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
       // append entries to the raftlog
-      entries.forEach(raftLog::appendEntry);
-      raftLog.logSync();
+      entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
     }
 
     try (SegmentedRaftLog raftLog =
@@ -244,8 +243,7 @@ public class TestSegmentedRaftLog extends BaseTest {
              new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
       raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
       // append entries to the raftlog
-      entries.forEach(raftLog::appendEntry);
-      raftLog.logSync();
+      entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
     }
 
     for (long fromIndex = 900; fromIndex >= 0; fromIndex -= 150) {
@@ -259,8 +257,8 @@ public class TestSegmentedRaftLog extends BaseTest {
              new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
       raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
       // truncate the log
-      raftLog.truncate(fromIndex);
-      raftLog.logSync();
+      raftLog.truncate(fromIndex).join();
+
 
       checkEntries(raftLog, entries, 0, (int) fromIndex);
     }
@@ -317,8 +315,7 @@ public class TestSegmentedRaftLog extends BaseTest {
              new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
       raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
       // append entries to the raftlog
-      entries.forEach(raftLog::appendEntry);
-      raftLog.logSync();
+      entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
     }
 
     // append entries whose first 100 entries are the same with existing log,
@@ -332,8 +329,7 @@ public class TestSegmentedRaftLog extends BaseTest {
     try (SegmentedRaftLog raftLog =
              new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
       raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
-      raftLog.append(newEntries.toArray(new LogEntryProto[newEntries.size()]));
-      raftLog.logSync();
+      raftLog.append(newEntries.toArray(new LogEntryProto[newEntries.size()])).forEach(CompletableFuture::join);
 
       checkEntries(raftLog, entries, 0, 650);
       checkEntries(raftLog, newEntries, 100, 100);