You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2017/12/12 10:09:12 UTC
incubator-ratis git commit: RATIS-124. RaftLog should be sync'ed on
all the entries appended but not the last entry. Contributed by Kit Hui
Repository: incubator-ratis
Updated Branches:
refs/heads/master 9a8daa1d5 -> c1b23fdb0
RATIS-124. RaftLog should be sync'ed on all the entries appended but not the last entry. Contributed by Kit Hui
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/c1b23fdb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/c1b23fdb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/c1b23fdb
Branch: refs/heads/master
Commit: c1b23fdb0bfe5d3e9937d60f0baa43772737f002
Parents: 9a8daa1
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Tue Dec 12 18:07:25 2017 +0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Tue Dec 12 18:07:25 2017 +0800
----------------------------------------------------------------------
.../ratis/server/impl/RaftServerImpl.java | 15 +++---
.../ratis/server/storage/MemoryRaftLog.java | 33 ++++++------
.../apache/ratis/server/storage/RaftLog.java | 17 +++---
.../ratis/server/storage/RaftLogWorker.java | 23 ++++----
.../ratis/server/storage/SegmentedRaftLog.java | 55 ++++++++++----------
.../apache/ratis/statemachine/StateMachine.java | 3 +-
.../ratis/server/storage/TestCacheEviction.java | 7 ++-
.../server/storage/TestSegmentedRaftLog.java | 20 +++----
8 files changed, 81 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c1b23fdb/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index bc45b5f..8b784e6 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -25,6 +25,7 @@ import org.apache.ratis.server.RaftServerRpc;
import org.apache.ratis.server.protocol.RaftServerProtocol;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.FileInfo;
+import org.apache.ratis.server.storage.RaftLog;
import org.apache.ratis.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.ratis.shaded.proto.RaftProtos.*;
import org.apache.ratis.statemachine.SnapshotInfo;
@@ -36,7 +37,6 @@ import org.slf4j.LoggerFactory;
import javax.management.ObjectName;
import java.io.IOException;
-import java.io.InterruptedIOException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -723,6 +723,8 @@ public class RaftServerImpl implements RaftServerProtocol,
throw new IOException(e);
}
+ final List<CompletableFuture<Long>> futures;
+
final long currentTerm;
long nextIndex = state.getLog().getNextIndex();
synchronized (this) {
@@ -764,16 +766,17 @@ public class RaftServerImpl implements RaftServerProtocol,
return reply;
}
- state.getLog().append(entries);
+ futures = state.getLog().append(entries);
+
state.updateConfiguration(entries);
state.updateStatemachine(leaderCommit, currentTerm);
}
if (entries.length > 0) {
- try {
- state.getLog().logSync();
- } catch (InterruptedException e) {
- throw new InterruptedIOException("logSync got interrupted");
+ CodeInjectionForTesting.execute(RaftLog.LOG_SYNC, getId(), null);
+ for (CompletableFuture future : futures) {
+ future.join();
}
+
nextIndex = entries[entries.length - 1].getIndex() + 1;
}
synchronized (this) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c1b23fdb/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
index ecbae2e..3c14d2c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
@@ -17,9 +17,6 @@
*/
package org.apache.ratis.server.storage;
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.impl.RaftConfiguration;
import org.apache.ratis.server.impl.RaftServerConstants;
@@ -27,10 +24,14 @@ import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.util.AutoCloseableLock;
-import org.apache.ratis.util.CodeInjectionForTesting;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ProtoUtils;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
/**
* A simple RaftLog implementation in memory. Used only for testing.
*/
@@ -79,7 +80,7 @@ public class MemoryRaftLog extends RaftLog {
}
@Override
- void truncate(long index) {
+ CompletableFuture<Long> truncate(long index) {
checkLogState();
try(AutoCloseableLock writeLock = writeLock()) {
Preconditions.assertTrue(index >= 0);
@@ -88,6 +89,7 @@ public class MemoryRaftLog extends RaftLog {
entries.remove(i);
}
}
+ return CompletableFuture.completedFuture(index);
}
@Override
@@ -100,11 +102,12 @@ public class MemoryRaftLog extends RaftLog {
}
@Override
- void appendEntry(LogEntryProto entry) {
+ CompletableFuture<Long> appendEntry(LogEntryProto entry) {
checkLogState();
try(AutoCloseableLock writeLock = writeLock()) {
entries.add(entry);
}
+ return CompletableFuture.completedFuture(entry.getIndex());
}
@Override
@@ -126,11 +129,11 @@ public class MemoryRaftLog extends RaftLog {
}
@Override
- public void append(LogEntryProto... entries) {
+ public List<CompletableFuture<Long>> append(LogEntryProto... entries) {
checkLogState();
try(AutoCloseableLock writeLock = writeLock()) {
if (entries == null || entries.length == 0) {
- return;
+ return Collections.emptyList();
}
// Before truncating the entries, we first need to check if some
// entries are duplicated. If the leader sends entry 6, entry 7, then
@@ -151,12 +154,18 @@ public class MemoryRaftLog extends RaftLog {
break;
}
}
+ final List<CompletableFuture<Long>> futures;
if (toTruncate) {
- truncate(truncateIndex);
+ futures = new ArrayList<>(entries.length - index + 1);
+ futures.add(truncate(truncateIndex));
+ } else {
+ futures = new ArrayList<>(entries.length - index);
}
for (int i = index; i < entries.length; i++) {
this.entries.add(entries[i]);
+ futures.add(CompletableFuture.completedFuture(entries[i].getIndex()));
}
+ return futures;
}
}
@@ -171,12 +180,6 @@ public class MemoryRaftLog extends RaftLog {
}
@Override
- public void logSync() {
- CodeInjectionForTesting.execute(LOG_SYNC, getSelfId(), null);
- // do nothing
- }
-
- @Override
public long getLatestFlushedIndex() {
return getNextIndex() - 1;
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c1b23fdb/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
index ac86582..8edb8a1 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
@@ -35,10 +35,11 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
-
/**
* Base class of RaftLog. Currently we provide two types of RaftLog
* implementation:
@@ -207,13 +208,13 @@ public abstract class RaftLog implements Closeable {
* Truncate the log entries till the given index. The log with the given index
* will also be truncated (i.e., inclusive).
*/
- abstract void truncate(long index);
+ abstract CompletableFuture<Long> truncate(long index);
/**
* Used by the leader when appending a new entry based on client's request
* or configuration change.
*/
- abstract void appendEntry(LogEntryProto entry);
+ abstract CompletableFuture<Long> appendEntry(LogEntryProto entry);
/**
* Append all the given log entries. Used by the followers.
@@ -224,15 +225,9 @@ public abstract class RaftLog implements Closeable {
* This method, {@link #append(long, TransactionContext, ClientId, long)},
* {@link #append(long, RaftConfiguration)}, and {@link #truncate(long)},
* do not guarantee the changes are persisted.
- * Need to call {@link #logSync()} to persist the changes.
- */
- public abstract void append(LogEntryProto... entries);
-
- /**
- * Flush and sync the log.
- * It is triggered by AppendEntries RPC request from the leader.
+ * Need to wait for the returned futures to persist the changes.
*/
- public abstract void logSync() throws InterruptedException;
+ public abstract List<CompletableFuture<Long>> append(LogEntryProto... entries);
/**
* @return the index of the latest entry that has been flushed to the local
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c1b23fdb/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
index 2e1177f..81bd22d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
@@ -37,7 +37,10 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
-import java.util.concurrent.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
/**
@@ -267,31 +270,23 @@ class RaftLogWorker implements Runnable {
private class WriteLog extends Task {
private final LogEntryProto entry;
private final CompletableFuture<?> stateMachineFuture;
+ private final CompletableFuture<Long> combined;
WriteLog(LogEntryProto entry) {
this.entry = ProtoUtils.removeStateMachineData(entry);
if (this.entry == entry || stateMachine == null) {
this.stateMachineFuture = null;
+ this.combined = super.getFuture();
} else {
// this.entry != entry iff the entry has state machine data
this.stateMachineFuture = stateMachine.writeStateMachineData(entry);
+ this.combined = super.getFuture().thenCombine(stateMachineFuture, (index, stateMachineResult) -> index);
}
}
@Override
- void waitForDone() throws InterruptedException {
- super.waitForDone();
- // TODO: It does not work since logSync only wait for the last task L.
- // TODO: If some task T earlier than L has a writeStateMachineData future, it will not be sync'ed.
- // TODO: Need RATIS-124
-
- if (stateMachineFuture != null) {
- try {
- stateMachineFuture.get();
- } catch (ExecutionException e) {
- // ignore
- }
- }
+ CompletableFuture<Long> getFuture() {
+ return combined;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c1b23fdb/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
index 5286738..76b2bd7 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
@@ -29,14 +29,16 @@ import org.apache.ratis.server.storage.LogSegment.LogRecordWithEntry;
import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex;
import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.util.AutoCloseableLock;
-import org.apache.ratis.util.CodeInjectionForTesting;
import org.apache.ratis.util.Preconditions;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
/**
@@ -73,19 +75,17 @@ public class SegmentedRaftLog extends RaftLog {
* I/O task definitions.
*/
static abstract class Task {
- private boolean done = false;
+ private final CompletableFuture<Long> future = new CompletableFuture<>();
- synchronized void done() {
- done = true;
- notifyAll();
+ CompletableFuture<Long> getFuture() {
+ return future;
}
- synchronized void waitForDone() throws InterruptedException {
- while (!done) {
- wait();
- }
+ void done() {
+ future.complete(getEndIndex());
}
+
abstract void execute() throws IOException;
abstract long getEndIndex();
@@ -95,7 +95,6 @@ public class SegmentedRaftLog extends RaftLog {
return getClass().getSimpleName() + ":" + getEndIndex();
}
}
- private static final ThreadLocal<Task> myTask = new ThreadLocal<>();
private final RaftServerImpl server;
private final RaftStorage storage;
@@ -232,19 +231,21 @@ public class SegmentedRaftLog extends RaftLog {
* {@link #append(LogEntryProto...)} need protection of RaftServer's lock.
*/
@Override
- void truncate(long index) {
+ CompletableFuture<Long> truncate(long index) {
checkLogState();
try(AutoCloseableLock writeLock = writeLock()) {
RaftLogCache.TruncationSegments ts = cache.truncate(index);
if (ts != null) {
Task task = fileLogWorker.truncate(ts);
- myTask.set(task);
+ return task.getFuture();
}
}
+ return CompletableFuture.completedFuture(index);
}
@Override
- void appendEntry(LogEntryProto entry) {
+ CompletableFuture<Long> appendEntry(LogEntryProto entry) {
+
checkLogState();
if (LOG.isTraceEnabled()) {
LOG.trace("{}: appendEntry {}", server.getId(),
@@ -272,7 +273,7 @@ public class SegmentedRaftLog extends RaftLog {
}
cache.appendEntry(entry);
- myTask.set(fileLogWorker.writeLogEntry(entry));
+ return fileLogWorker.writeLogEntry(entry).getFuture();
}
}
@@ -289,11 +290,13 @@ public class SegmentedRaftLog extends RaftLog {
}
@Override
- public void append(LogEntryProto... entries) {
+ public List<CompletableFuture<Long>> append(LogEntryProto... entries) {
+
checkLogState();
if (entries == null || entries.length == 0) {
- return;
+ return Collections.emptyList();
}
+
try(AutoCloseableLock writeLock = writeLock()) {
Iterator<TermIndex> iter = cache.iterator(entries[0].getIndex());
int index = 0;
@@ -318,25 +321,21 @@ public class SegmentedRaftLog extends RaftLog {
break;
}
}
+
+ final List<CompletableFuture<Long>> futures;
if (truncateIndex != -1) {
- // truncate from truncateIndex
- truncate(truncateIndex);
+ futures = new ArrayList<>(entries.length - index + 1);
+ futures.add(truncate(truncateIndex));
+ } else {
+ futures = new ArrayList<>(entries.length - index);
}
- // append from entries[index]
for (int i = index; i < entries.length; i++) {
- appendEntry(entries[i]);
+ futures.add(appendEntry(entries[i]));
}
+ return futures;
}
}
- @Override
- public void logSync() throws InterruptedException {
- CodeInjectionForTesting.execute(LOG_SYNC, getSelfId(), null);
- final Task task = myTask.get();
- if (task != null) {
- task.waitForDone();
- }
- }
@Override
public long getLatestFlushedIndex() {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c1b23fdb/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index ae26cb1..1d47815 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -128,8 +128,7 @@ public interface StateMachine extends Closeable {
/**
* Write asynchronously the state machine data to this state machine.
*
- * @return a future for the write task
- * if {@link RaftLog#logSync()} should also sync writing the state machine data;
+ * @return a future for the write task if the state machine data should be sync'ed;
* otherwise, return null.
*/
default CompletableFuture<?> writeStateMachineData(LogEntryProto entry) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c1b23fdb/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java
index 8ce33f5..c94fb73 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java
@@ -41,6 +41,7 @@ import org.mockito.Mockito;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
public class TestCacheEviction extends BaseTest {
private static final CacheInvalidationPolicy policy = new CacheInvalidationPolicyDefault();
@@ -168,8 +169,7 @@ public class TestCacheEviction extends BaseTest {
raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
List<SegmentRange> slist = TestSegmentedRaftLog.prepareRanges(maxCachedNum, 7, 0);
LogEntryProto[] entries = generateEntries(slist);
- raftLog.append(entries);
- raftLog.logSync();
+ raftLog.append(entries).forEach(CompletableFuture::join);
// check the current cached segment number: the last segment is still open
Assert.assertEquals(maxCachedNum - 1,
@@ -179,8 +179,7 @@ public class TestCacheEviction extends BaseTest {
Mockito.when(state.getLastAppliedIndex()).thenReturn(35L);
slist = TestSegmentedRaftLog.prepareRanges(2, 7, 7 * maxCachedNum);
entries = generateEntries(slist);
- raftLog.append(entries);
- raftLog.logSync();
+ raftLog.append(entries).forEach(CompletableFuture::join);
// check the cached segment number again. since the slowest follower is on
// index 21, the eviction should happen and evict 3 segments
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c1b23fdb/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
index 09b31c1..2a0ec60 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
@@ -42,6 +42,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
public class TestSegmentedRaftLog extends BaseTest {
@@ -192,8 +193,7 @@ public class TestSegmentedRaftLog extends BaseTest {
new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
// append entries to the raftlog
- entries.forEach(raftLog::appendEntry);
- raftLog.logSync();
+ entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
}
try (SegmentedRaftLog raftLog =
@@ -221,8 +221,7 @@ public class TestSegmentedRaftLog extends BaseTest {
new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
// append entries to the raftlog
- entries.forEach(raftLog::appendEntry);
- raftLog.logSync();
+ entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
}
try (SegmentedRaftLog raftLog =
@@ -244,8 +243,7 @@ public class TestSegmentedRaftLog extends BaseTest {
new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
// append entries to the raftlog
- entries.forEach(raftLog::appendEntry);
- raftLog.logSync();
+ entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
}
for (long fromIndex = 900; fromIndex >= 0; fromIndex -= 150) {
@@ -259,8 +257,8 @@ public class TestSegmentedRaftLog extends BaseTest {
new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
// truncate the log
- raftLog.truncate(fromIndex);
- raftLog.logSync();
+ raftLog.truncate(fromIndex).join();
+
checkEntries(raftLog, entries, 0, (int) fromIndex);
}
@@ -317,8 +315,7 @@ public class TestSegmentedRaftLog extends BaseTest {
new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
// append entries to the raftlog
- entries.forEach(raftLog::appendEntry);
- raftLog.logSync();
+ entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
}
// append entries whose first 100 entries are the same with existing log,
@@ -332,8 +329,7 @@ public class TestSegmentedRaftLog extends BaseTest {
try (SegmentedRaftLog raftLog =
new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
- raftLog.append(newEntries.toArray(new LogEntryProto[newEntries.size()]));
- raftLog.logSync();
+ raftLog.append(newEntries.toArray(new LogEntryProto[newEntries.size()])).forEach(CompletableFuture::join);
checkEntries(raftLog, entries, 0, 650);
checkEntries(raftLog, newEntries, 100, 100);