You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/12/06 11:56:38 UTC
[incubator-ratis] 01/02: rename methods
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch RATIS-1209
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
commit 2b811c1bffd830049eb86d050e2b95e2d3d719ea
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Sun Dec 6 13:41:48 2020 +0800
rename methods
---
.../apache/ratis/grpc/server/GrpcLogAppender.java | 34 ++++++++-----
.../apache/ratis/server/impl/LeaderStateImpl.java | 24 ++++-----
.../org/apache/ratis/server/impl/LogAppender.java | 57 ++++++++++------------
.../ratis/server/impl/LogAppenderDaemon.java | 2 +-
4 files changed, 59 insertions(+), 58 deletions(-)
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 85d3572..dc8e40f 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -114,10 +114,18 @@ public class GrpcLogAppender extends LogAppender {
getFollower().decreaseNextIndex(nextIndex);
}
+ private boolean haveLogEntriesToSendOut() {
+ return shouldAppendEntries(getFollower().getNextIndex());
+ }
+
+ private boolean isFollowerCommitBehindLastCommitIndex() {
+ return getRaftLog().getLastCommittedIndex() > getFollower().getCommitIndex();
+ }
+
@Override
- protected void runAppenderImpl() throws IOException {
+ protected void run() throws IOException {
boolean installSnapshotRequired;
- for(; isAppenderRunning(); mayWait()) {
+ for(; isRunning(); mayWait()) {
installSnapshotRequired = false;
//HB period is expired OR we have messages OR follower is behind with commit index
@@ -174,9 +182,9 @@ public class GrpcLogAppender extends LogAppender {
}
@Override
- public void stopAppender() {
+ public void stop() {
grpcServerMetrics.unregister();
- super.stopAppender();
+ super.stop();
}
@Override
@@ -203,7 +211,7 @@ public class GrpcLogAppender extends LogAppender {
// prepare and enqueue the append request. note changes on follower's
// nextIndex and ops on pendingRequests should always be associated
// together and protected by the lock
- pending = createRequest(callId++, excludeLogEntries);
+ pending = newAppendEntriesRequest(callId++, excludeLogEntries);
if (pending == null) {
return;
}
@@ -216,7 +224,7 @@ public class GrpcLogAppender extends LogAppender {
s = appendLogRequestObserver;
}
- if (isAppenderRunning()) {
+ if (isRunning()) {
sendRequest(request, pending, s);
}
}
@@ -316,7 +324,7 @@ public class GrpcLogAppender extends LogAppender {
default:
throw new IllegalStateException("Unexpected reply result: " + reply.getResult());
}
- notifyAppend();
+ notifyLogAppender();
}
/**
@@ -324,7 +332,7 @@ public class GrpcLogAppender extends LogAppender {
*/
@Override
public void onError(Throwable t) {
- if (!isAppenderRunning()) {
+ if (!isRunning()) {
LOG.info("{} is stopped", GrpcLogAppender.this);
return;
}
@@ -382,7 +390,7 @@ public class GrpcLogAppender extends LogAppender {
void close() {
done.set(true);
- GrpcLogAppender.this.notifyAppend();
+ notifyLogAppender();
}
synchronized boolean hasAllResponse() {
@@ -439,7 +447,7 @@ public class GrpcLogAppender extends LogAppender {
@Override
public void onError(Throwable t) {
- if (!isAppenderRunning()) {
+ if (!isRunning()) {
LOG.info("{} is stopped", GrpcLogAppender.this);
return;
}
@@ -477,7 +485,7 @@ public class GrpcLogAppender extends LogAppender {
try {
snapshotRequestObserver = getClient().installSnapshot(responseHandler);
for (InstallSnapshotRequestProto request : newInstallSnapshotRequests(requestId, snapshot)) {
- if (isAppenderRunning()) {
+ if (isRunning()) {
snapshotRequestObserver.onNext(request);
getFollower().updateLastRpcSendTime();
responseHandler.addPending(request);
@@ -496,7 +504,7 @@ public class GrpcLogAppender extends LogAppender {
}
synchronized (this) {
- while (isAppenderRunning() && !responseHandler.isDone()) {
+ while (isRunning() && !responseHandler.isDone()) {
try {
wait();
} catch (InterruptedException ignored) {
@@ -541,7 +549,7 @@ public class GrpcLogAppender extends LogAppender {
}
synchronized (this) {
- if (isAppenderRunning() && !responseHandler.isDone()) {
+ if (isRunning() && !responseHandler.isDone()) {
try {
wait();
} catch (InterruptedException ignored) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 3d52e44..ad6d7c37 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -284,7 +284,7 @@ class LeaderStateImpl implements LeaderState {
server.getId().toString(), null);
raftLog.append(placeHolder);
processor.start();
- senders.forEach(LogAppender::startAppender);
+ senders.forEach(LogAppender::start);
return placeHolder;
}
@@ -295,7 +295,7 @@ class LeaderStateImpl implements LeaderState {
void stop() {
this.running = false;
// do not interrupt event processor since it may be in the middle of logSync
- senders.forEach(LogAppender::stopAppender);
+ senders.forEach(LogAppender::stop);
final NotLeaderException nle = server.generateNotLeaderException();
final Collection<CommitInfoProto> commitInfos = server.getCommitInfos();
try {
@@ -313,7 +313,7 @@ class LeaderStateImpl implements LeaderState {
}
void notifySenders() {
- senders.forEach(LogAppender::notifyAppend);
+ senders.forEach(LogAppender::notifyLogAppender);
}
boolean inStagingState() {
@@ -466,7 +466,7 @@ class LeaderStateImpl implements LeaderState {
* Update sender list for setConfiguration request
*/
void addAndStartSenders(Collection<RaftPeer> newPeers) {
- addSenders(newPeers, RaftLog.LEAST_VALID_LOG_INDEX, false).forEach(LogAppender::startAppender);
+ addSenders(newPeers, RaftLog.LEAST_VALID_LOG_INDEX, false).forEach(LogAppender::start);
}
Collection<LogAppender> addSenders(Collection<RaftPeer> newPeers, long nextIndex, boolean attendVote) {
@@ -486,7 +486,7 @@ class LeaderStateImpl implements LeaderState {
void stopAndRemoveSenders(Predicate<LogAppender> predicate) {
final List<LogAppender> toStop = senders.stream().filter(predicate).collect(Collectors.toList());
- toStop.forEach(LogAppender::stopAppender);
+ toStop.forEach(LogAppender::stop);
senders.removeAll(toStop);
}
@@ -494,7 +494,7 @@ class LeaderStateImpl implements LeaderState {
public void restart(LogAppender sender) {
final FollowerInfo follower = sender.getFollower();
LOG.info("{}: Restarting {} for {}", this, JavaUtils.getClassSimpleName(sender.getClass()), follower.getName());
- sender.stopAppender();
+ sender.stop();
senders.removeAll(Collections.singleton(sender));
addAndStartSenders(Collections.singleton(follower.getPeer()));
}
@@ -504,7 +504,7 @@ class LeaderStateImpl implements LeaderState {
*/
private void updateSenders(RaftConfiguration conf) {
Preconditions.assertTrue(conf.isStable() && !inStagingState());
- stopAndRemoveSenders(s -> !conf.containsInConf(s.getFollower().getPeer().getId()));
+ stopAndRemoveSenders(s -> !conf.containsInConf(s.getFollowerId()));
}
void submitStepDownEvent(StepDownReason reason) {
@@ -845,14 +845,14 @@ class LeaderStateImpl implements LeaderState {
private List<List<RaftPeerId>> divideFollowers(RaftConfiguration conf) {
List<List<RaftPeerId>> lists = new ArrayList<>(2);
List<RaftPeerId> listForNew = senders.stream()
- .filter(sender -> conf.containsInConf(sender.getFollower().getPeer().getId()))
- .map(sender -> sender.getFollower().getPeer().getId())
+ .map(LogAppender::getFollowerId)
+ .filter(conf::containsInConf)
.collect(Collectors.toList());
lists.add(listForNew);
if (conf.isTransitional()) {
List<RaftPeerId> listForOld = senders.stream()
- .filter(sender -> conf.containsInOldConf(sender.getFollower().getPeer().getId()))
- .map(sender -> sender.getFollower().getPeer().getId())
+ .map(LogAppender::getFollowerId)
+ .filter(conf::containsInOldConf)
.collect(Collectors.toList());
lists.add(listForOld);
}
@@ -923,7 +923,7 @@ class LeaderStateImpl implements LeaderState {
.filter(sender -> sender.getFollower()
.getLastRpcResponseTime()
.elapsedTimeMs() <= server.getMaxTimeoutMs())
- .map(sender -> sender.getFollower().getPeer().getId())
+ .map(LogAppender::getFollowerId)
.collect(Collectors.toList());
final RaftConfiguration conf = server.getRaftConf();
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index a922e00..ada5424 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -50,7 +50,6 @@ public class LogAppender {
private final String name;
private final RaftServer.Division server;
private final LeaderState leaderState;
- private final RaftLog raftLog;
private final FollowerInfo follower;
private final DataQueue<EntryWithData> buffer;
@@ -64,7 +63,6 @@ public class LogAppender {
this.name = follower.getName() + "-" + JavaUtils.getClassSimpleName(getClass());
this.server = server;
this.leaderState = leaderState;
- this.raftLog = server.getRaftLog();
final RaftProperties properties = server.getRaftServer().getProperties();
this.snapshotChunkMaxSize = RaftServerConfigKeys.Log.Appender.snapshotChunkSizeMax(properties).getSizeInt();
@@ -85,7 +83,7 @@ public class LogAppender {
}
public RaftLog getRaftLog() {
- return raftLog;
+ return getServer().getRaftLog();
}
@Override
@@ -93,15 +91,15 @@ public class LogAppender {
return name;
}
- void startAppender() {
+ public void start() {
daemon.tryToStart();
}
- public boolean isAppenderRunning() {
+ public boolean isRunning() {
return daemon.isWorking();
}
- public void stopAppender() {
+ public void stop() {
daemon.tryToClose();
}
@@ -123,7 +121,7 @@ public class LogAppender {
}
final long previousIndex = nextIndex - 1;
- final TermIndex previous = raftLog.getTermIndex(previousIndex);
+ final TermIndex previous = getRaftLog().getTermIndex(previousIndex);
if (previous != null) {
return previous;
}
@@ -139,7 +137,7 @@ public class LogAppender {
return null;
}
- protected AppendEntriesRequestProto createRequest(long callId,
+ protected AppendEntriesRequestProto newAppendEntriesRequest(long callId,
boolean heartbeat) throws RaftLogIOException {
final TermIndex previous = getPrevious(follower.getNextIndex());
final long snapshotIndex = follower.getSnapshotIndex();
@@ -151,11 +149,11 @@ public class LogAppender {
Preconditions.assertTrue(buffer.isEmpty(), () -> "buffer has " + buffer.getNumElements() + " elements.");
- final long leaderNext = raftLog.getNextIndex();
+ final long leaderNext = getRaftLog().getNextIndex();
final long followerNext = follower.getNextIndex();
final long halfMs = heartbeatRemainingMs/2;
for (long next = followerNext; leaderNext > next && getHeartbeatRemainingTime() - halfMs > 0; ) {
- if (!buffer.offer(raftLog.getEntryWithData(next++))) {
+ if (!buffer.offer(getRaftLog().getEntryWithData(next++))) {
break;
}
}
@@ -195,16 +193,16 @@ public class LogAppender {
throws InterruptedException, InterruptedIOException, RaftLogIOException {
int retry = 0;
AppendEntriesRequestProto request = null;
- while (isAppenderRunning()) { // keep retrying for IOException
+ while (isRunning()) { // keep retrying for IOException
try {
if (request == null || request.getEntriesCount() == 0) {
- request = createRequest(DEFAULT_CALLID, false);
+ request = newAppendEntriesRequest(DEFAULT_CALLID, false);
}
if (request == null) {
LOG.trace("{} no entries to send now, wait ...", this);
return null;
- } else if (!isAppenderRunning()) {
+ } else if (!isRunning()) {
LOG.info("{} is stopped. Skip appendEntries.", this);
return null;
}
@@ -224,7 +222,7 @@ public class LogAppender {
}
handleException(ioe);
}
- if (isAppenderRunning()) {
+ if (isRunning()) {
server.properties().rpcSleepTime().sleep();
}
}
@@ -275,11 +273,11 @@ public class LogAppender {
}
protected SnapshotInfo shouldInstallSnapshot() {
- final long logStartIndex = raftLog.getStartIndex();
+ final long logStartIndex = getRaftLog().getStartIndex();
// we should install snapshot if the follower needs to catch up and:
// 1. there is no local log entry but there is snapshot
// 2. or the follower's next index is smaller than the log start index
- if (follower.getNextIndex() < raftLog.getNextIndex()) {
+ if (follower.getNextIndex() < getRaftLog().getNextIndex()) {
final SnapshotInfo snapshot = server.getStateMachine().getLatestSnapshot();
if (follower.getNextIndex() < logStartIndex ||
(logStartIndex == RaftLog.INVALID_LOG_INDEX && snapshot != null)) {
@@ -290,13 +288,13 @@ public class LogAppender {
}
/** Check and send appendEntries RPC */
- protected void runAppenderImpl() throws InterruptedException, IOException {
- while (isAppenderRunning()) {
+ protected void run() throws InterruptedException, IOException {
+ while (isRunning()) {
if (shouldSendRequest()) {
SnapshotInfo snapshot = shouldInstallSnapshot();
if (snapshot != null) {
LOG.info("{}: followerNextIndex = {} but logStartIndex = {}, send snapshot {} to follower",
- this, follower.getNextIndex(), raftLog.getStartIndex(), snapshot);
+ this, follower.getNextIndex(), getRaftLog().getStartIndex(), snapshot);
final InstallSnapshotReplyProto r = installSnapshot(snapshot);
if (r != null && r.getResult() == InstallSnapshotResult.NOT_LEADER) {
@@ -309,7 +307,7 @@ public class LogAppender {
}
}
}
- if (isAppenderRunning() && !shouldAppendEntries(follower.getNextIndex())) {
+ if (isRunning() && !shouldAppendEntries(follower.getNextIndex())) {
final long waitTime = getHeartbeatRemainingTime();
if (waitTime > 0) {
synchronized (this) {
@@ -359,8 +357,10 @@ public class LogAppender {
getServerRpc().handleException(getFollowerId(), e, false);
}
- public synchronized void notifyAppend() {
- this.notify();
+ public void notifyLogAppender() {
+ synchronized (this) {
+ this.notify();
+ }
}
/** Should the leader send appendEntries RPC to this follower? */
@@ -368,16 +368,9 @@ public class LogAppender {
return shouldAppendEntries(follower.getNextIndex()) || heartbeatTimeout();
}
- protected boolean haveLogEntriesToSendOut() {
- return shouldAppendEntries(follower.getNextIndex());
- }
-
- protected boolean isFollowerCommitBehindLastCommitIndex() {
- return raftLog.getLastCommittedIndex() > follower.getCommitIndex();
- }
- private boolean shouldAppendEntries(long followerIndex) {
- return followerIndex < raftLog.getNextIndex();
+ public boolean shouldAppendEntries(long followerIndex) {
+ return followerIndex < getRaftLog().getNextIndex();
}
protected boolean heartbeatTimeout() {
@@ -393,7 +386,7 @@ public class LogAppender {
protected boolean checkResponseTerm(long responseTerm) {
synchronized (server) {
- return isAppenderRunning() && leaderState.onFollowerTerm(follower, responseTerm);
+ return isRunning() && leaderState.onFollowerTerm(follower, responseTerm);
}
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderDaemon.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderDaemon.java
index 4d0a662..4c375a8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderDaemon.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderDaemon.java
@@ -74,7 +74,7 @@ class LogAppenderDaemon {
private void run() {
try {
if (lifeCycle.transition(TRY_TO_RUN) == RUNNING) {
- logAppender.runAppenderImpl();
+ logAppender.run();
}
lifeCycle.compareAndTransition(RUNNING, CLOSING);
} catch (InterruptedException e) {