You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2023/03/01 01:22:32 UTC
[ratis] 16/16: RATIS-1793. Enforce raft.server.log.appender.wait-time.min. (#832)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch branch-2_tmp
in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 8247d3c6b637d6155c76a03dbb8bd9504cacbdb3
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Tue Feb 28 05:00:02 2023 -0800
RATIS-1793. Enforce raft.server.log.appender.wait-time.min. (#832)
(cherry picked from commit fdd7c5c98beaeed6aa33261752554575282e1a43)
---
.../apache/ratis/grpc/server/GrpcLogAppender.java | 30 +++++++++++-----------
.../apache/ratis/server/leader/FollowerInfo.java | 3 +++
.../apache/ratis/server/leader/LogAppender.java | 4 +--
.../apache/ratis/server/raftlog/RaftLogIndex.java | 30 ++++++++++++++++------
.../apache/ratis/server/impl/FollowerInfoImpl.java | 14 +++++++---
.../ratis/server/leader/LogAppenderBase.java | 11 ++++++--
6 files changed, 62 insertions(+), 30 deletions(-)
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 7c742d91d..b76da8ecb 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -48,7 +48,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
import com.codahale.metrics.Timer;
@@ -74,9 +73,6 @@ public class GrpcLogAppender extends LogAppenderBase {
private final TimeDuration requestTimeoutDuration;
private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
- private final long waitTimeMinMs;
- private final AtomicReference<Timestamp> lastAppendEntries;
-
private volatile StreamObservers appendLogRequestObserver;
private final boolean useSeparateHBChannel;
@@ -96,10 +92,6 @@ public class GrpcLogAppender extends LogAppenderBase {
this.installSnapshotEnabled = RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties);
this.useSeparateHBChannel = GrpcConfigKeys.Server.heartbeatChannel(properties);
- final TimeDuration waitTimeMin = RaftServerConfigKeys.Log.Appender.waitTimeMin(properties);
- this.waitTimeMinMs = waitTimeMin.toLong(TimeUnit.MILLISECONDS);
- this.lastAppendEntries = new AtomicReference<>(Timestamp.currentTime().addTime(waitTimeMin.negate()));
-
grpcServerMetrics = new GrpcServerMetrics(server.getMemberId().toString());
grpcServerMetrics.addPendingRequestsCount(getFollowerId().toString(), pendingRequests::logRequestsSize);
@@ -182,10 +174,9 @@ public class GrpcLogAppender extends LogAppenderBase {
// For normal nodes, new entries should be sent ASAP
// however for slow followers (especially when the follower is down),
// keep sending without any wait time only ends up in high CPU load
- final long min = waitTimeMinMs - lastAppendEntries.get().elapsedTimeMs();
- return Math.max(0L, min);
+ return Math.max(getMinWaitTimeMs(), 0L);
}
- return Math.min(waitTimeMinMs, getHeartbeatWaitTimeMs());
+ return Math.min(getMinWaitTimeMs(), getHeartbeatWaitTimeMs());
}
private boolean isSlowFollower() {
@@ -263,13 +254,13 @@ public class GrpcLogAppender extends LogAppenderBase {
return CALL_ID_COMPARATOR;
}
- private void appendLog(boolean excludeLogEntries) throws IOException {
+ private void appendLog(boolean heartbeat) throws IOException {
final AppendEntriesRequestProto pending;
final AppendEntriesRequest request;
try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) {
// Prepare and send the append request.
// Note changes on follower's nextIndex and ops on pendingRequests should always be done under the write-lock
- pending = newAppendEntriesRequest(callId.getAndIncrement(), excludeLogEntries);
+ pending = newAppendEntriesRequest(callId.getAndIncrement(), heartbeat);
if (pending == null) {
return;
}
@@ -282,6 +273,16 @@ public class GrpcLogAppender extends LogAppenderBase {
}
}
+ final long waitMs = getMinWaitTimeMs();
+ if (waitMs > 0) {
+ try {
+ Thread.sleep(waitMs);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw IOUtils.toInterruptedIOException(
+ "Interrupted appendLog, heartbeat? " + heartbeat, e);
+ }
+ }
if (isRunning()) {
sendRequest(request, pending);
}
@@ -295,15 +296,14 @@ public class GrpcLogAppender extends LogAppenderBase {
final boolean sent = Optional.ofNullable(appendLogRequestObserver)
.map(observer -> {
observer.onNext(proto);
- lastAppendEntries.set(Timestamp.currentTime());
return true;
}).isPresent();
if (sent) {
+ getFollower().updateLastRpcSendTime(request.isHeartbeat());
scheduler.onTimeout(requestTimeoutDuration,
() -> timeoutAppendRequest(request.getCallId(), request.isHeartbeat()),
LOG, () -> "Timeout check failed for append entry request: " + request);
- getFollower().updateLastRpcSendTime(request.isHeartbeat());
} else {
request.stopRequestTimer();
}
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
index b4ae8458c..fb63068a5 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
@@ -78,6 +78,9 @@ public interface FollowerInfo {
/** @return the lastRpcResponseTime . */
Timestamp getLastRpcResponseTime();
+ /** @return the lastRpcSendTime . */
+ Timestamp getLastRpcSendTime();
+
/** Update lastRpcResponseTime to the current time. */
void updateLastRpcResponseTime();
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
index f0ff28690..49a1a12fa 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
@@ -166,8 +166,8 @@ public interface LogAppender {
return getFollower().getNextIndex() < getRaftLog().getNextIndex();
}
- /** send a heartbeat AppendEntries immediately */
- void triggerHeartbeat() throws IOException;
+ /** Trigger to send a heartbeat AppendEntries. */
+ void triggerHeartbeat();
/** @return the wait time in milliseconds to send the next heartbeat. */
default long getHeartbeatWaitTimeMs() {
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogIndex.java b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogIndex.java
index ed545e291..290a58835 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogIndex.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogIndex.java
@@ -44,30 +44,44 @@ public class RaftLogIndex {
public boolean setUnconditionally(long newIndex, Consumer<Object> log) {
final long old = index.getAndSet(newIndex);
- log.accept(StringUtils.stringSupplierAsObject(() -> name + ": setUnconditionally " + old + " -> " + newIndex));
- return old != newIndex;
+ final boolean updated = old != newIndex;
+ if (updated) {
+ log.accept(StringUtils.stringSupplierAsObject(
+ () -> name + ": setUnconditionally " + old + " -> " + newIndex));
+ }
+ return updated;
}
public boolean updateUnconditionally(LongUnaryOperator update, Consumer<Object> log) {
final long old = index.getAndUpdate(update);
final long newIndex = update.applyAsLong(old);
- log.accept(StringUtils.stringSupplierAsObject(() -> name + ": updateUnconditionally " + old + " -> " + newIndex));
- return old != newIndex;
+ final boolean updated = old != newIndex;
+ if (updated) {
+ log.accept(StringUtils.stringSupplierAsObject(
+ () -> name + ": updateUnconditionally " + old + " -> " + newIndex));
+ }
+ return updated;
}
public boolean updateIncreasingly(long newIndex, Consumer<Object> log) {
final long old = index.getAndSet(newIndex);
Preconditions.assertTrue(old <= newIndex,
() -> "Failed to updateIncreasingly for " + name + ": " + old + " -> " + newIndex);
- log.accept(StringUtils.stringSupplierAsObject(() -> name + ": updateIncreasingly " + old + " -> " + newIndex));
- return old != newIndex;
+ final boolean updated = old != newIndex;
+ if (updated) {
+ log.accept(StringUtils.stringSupplierAsObject(
+ () -> name + ": updateIncreasingly " + old + " -> " + newIndex));
+ }
+ return updated;
}
public boolean updateToMax(long newIndex, Consumer<Object> log) {
final long old = index.getAndUpdate(oldIndex -> Math.max(oldIndex, newIndex));
final boolean updated = old < newIndex;
- log.accept(StringUtils.stringSupplierAsObject(
- () -> name + ": updateToMax old=" + old + ", new=" + newIndex + ", updated? " + updated));
+ if (updated) {
+ log.accept(StringUtils.stringSupplierAsObject(
+ () -> name + ": updateToMax old=" + old + ", new=" + newIndex + ", updated? " + updated));
+ }
return updated;
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
index 0d7fe2075..67af642fd 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
@@ -93,17 +93,20 @@ class FollowerInfoImpl implements FollowerInfo {
@Override
public void decreaseNextIndex(long newNextIndex) {
- nextIndex.updateUnconditionally(old -> old <= 0L? old: Math.min(old - 1, newNextIndex), infoIndexChange);
+ nextIndex.updateUnconditionally(old -> old <= 0L? old: Math.min(old - 1, newNextIndex),
+ message -> infoIndexChange.accept("decreaseNextIndex " + message));
}
@Override
public void setNextIndex(long newNextIndex) {
- nextIndex.updateUnconditionally(old -> newNextIndex >= 0 ? newNextIndex : old, infoIndexChange);
+ nextIndex.updateUnconditionally(old -> newNextIndex >= 0 ? newNextIndex : old,
+ message -> infoIndexChange.accept("setNextIndex " + message));
}
@Override
public void updateNextIndex(long newNextIndex) {
- nextIndex.updateToMax(newNextIndex, infoIndexChange);
+ nextIndex.updateToMax(newNextIndex,
+ message -> infoIndexChange.accept("decreaseNextIndex " + message));
}
@Override
@@ -160,6 +163,11 @@ class FollowerInfoImpl implements FollowerInfo {
return lastRpcResponseTime.get();
}
+ @Override
+ public Timestamp getLastRpcSendTime() {
+ return lastRpcSendTime.get();
+ }
+
@Override
public void updateLastRpcSendTime(boolean isHeartbeat) {
final Timestamp currentTime = Timestamp.currentTime();
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
index bc8a31181..1c0f61836 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
@@ -34,10 +34,10 @@ import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.SizeInBytes;
-import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -56,6 +56,7 @@ public abstract class LogAppenderBase implements LogAppender {
private final AwaitForSignal eventAwaitForSignal;
private final AtomicBoolean heartbeatTrigger = new AtomicBoolean();
+ private final long waitTimeMinMs;
protected LogAppenderBase(RaftServer.Division server, LeaderState leaderState, FollowerInfo f) {
this.follower = f;
@@ -71,10 +72,12 @@ public abstract class LogAppenderBase implements LogAppender {
this.buffer = new DataQueue<>(this, bufferByteLimit, bufferElementLimit, EntryWithData::getSerializedSize);
this.daemon = new LogAppenderDaemon(this);
this.eventAwaitForSignal = new AwaitForSignal(name);
+
+ this.waitTimeMinMs = RaftServerConfigKeys.Log.Appender.waitTimeMin(properties).toLong(TimeUnit.MILLISECONDS);
}
@Override
- public void triggerHeartbeat() throws IOException {
+ public void triggerHeartbeat() {
if (heartbeatTrigger.compareAndSet(false, true)) {
notifyLogAppender();
}
@@ -133,6 +136,10 @@ public abstract class LogAppenderBase implements LogAppender {
getLeaderState().restart(this);
}
+ public long getMinWaitTimeMs() {
+ return waitTimeMinMs - getFollower().getLastRpcSendTime().elapsedTimeMs();
+ }
+
@Override
public final FollowerInfo getFollower() {
return follower;