You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/12/06 11:56:38 UTC

[incubator-ratis] 01/02: rename methods

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch RATIS-1209
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git

commit 2b811c1bffd830049eb86d050e2b95e2d3d719ea
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Sun Dec 6 13:41:48 2020 +0800

    rename methods
---
 .../apache/ratis/grpc/server/GrpcLogAppender.java  | 34 ++++++++-----
 .../apache/ratis/server/impl/LeaderStateImpl.java  | 24 ++++-----
 .../org/apache/ratis/server/impl/LogAppender.java  | 57 ++++++++++------------
 .../ratis/server/impl/LogAppenderDaemon.java       |  2 +-
 4 files changed, 59 insertions(+), 58 deletions(-)

diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 85d3572..dc8e40f 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -114,10 +114,18 @@ public class GrpcLogAppender extends LogAppender {
     getFollower().decreaseNextIndex(nextIndex);
   }
 
+  private boolean haveLogEntriesToSendOut() {
+    return shouldAppendEntries(getFollower().getNextIndex());
+  }
+
+  private boolean isFollowerCommitBehindLastCommitIndex() {
+    return getRaftLog().getLastCommittedIndex() > getFollower().getCommitIndex();
+  }
+
   @Override
-  protected void runAppenderImpl() throws IOException {
+  protected void run() throws IOException {
     boolean installSnapshotRequired;
-    for(; isAppenderRunning(); mayWait()) {
+    for(; isRunning(); mayWait()) {
       installSnapshotRequired = false;
 
       //HB period is expired OR we have messages OR follower is behind with commit index
@@ -174,9 +182,9 @@ public class GrpcLogAppender extends LogAppender {
   }
 
   @Override
-  public void stopAppender() {
+  public void stop() {
     grpcServerMetrics.unregister();
-    super.stopAppender();
+    super.stop();
   }
 
   @Override
@@ -203,7 +211,7 @@ public class GrpcLogAppender extends LogAppender {
       // prepare and enqueue the append request. note changes on follower's
       // nextIndex and ops on pendingRequests should always be associated
       // together and protected by the lock
-      pending = createRequest(callId++, excludeLogEntries);
+      pending = newAppendEntriesRequest(callId++, excludeLogEntries);
       if (pending == null) {
         return;
       }
@@ -216,7 +224,7 @@ public class GrpcLogAppender extends LogAppender {
       s = appendLogRequestObserver;
     }
 
-    if (isAppenderRunning()) {
+    if (isRunning()) {
       sendRequest(request, pending, s);
     }
   }
@@ -316,7 +324,7 @@ public class GrpcLogAppender extends LogAppender {
         default:
           throw new IllegalStateException("Unexpected reply result: " + reply.getResult());
       }
-      notifyAppend();
+      notifyLogAppender();
     }
 
     /**
@@ -324,7 +332,7 @@ public class GrpcLogAppender extends LogAppender {
      */
     @Override
     public void onError(Throwable t) {
-      if (!isAppenderRunning()) {
+      if (!isRunning()) {
         LOG.info("{} is stopped", GrpcLogAppender.this);
         return;
       }
@@ -382,7 +390,7 @@ public class GrpcLogAppender extends LogAppender {
 
     void close() {
       done.set(true);
-      GrpcLogAppender.this.notifyAppend();
+      notifyLogAppender();
     }
 
     synchronized boolean hasAllResponse() {
@@ -439,7 +447,7 @@ public class GrpcLogAppender extends LogAppender {
 
     @Override
     public void onError(Throwable t) {
-      if (!isAppenderRunning()) {
+      if (!isRunning()) {
         LOG.info("{} is stopped", GrpcLogAppender.this);
         return;
       }
@@ -477,7 +485,7 @@ public class GrpcLogAppender extends LogAppender {
     try {
       snapshotRequestObserver = getClient().installSnapshot(responseHandler);
       for (InstallSnapshotRequestProto request : newInstallSnapshotRequests(requestId, snapshot)) {
-        if (isAppenderRunning()) {
+        if (isRunning()) {
           snapshotRequestObserver.onNext(request);
           getFollower().updateLastRpcSendTime();
           responseHandler.addPending(request);
@@ -496,7 +504,7 @@ public class GrpcLogAppender extends LogAppender {
     }
 
     synchronized (this) {
-      while (isAppenderRunning() && !responseHandler.isDone()) {
+      while (isRunning() && !responseHandler.isDone()) {
         try {
           wait();
         } catch (InterruptedException ignored) {
@@ -541,7 +549,7 @@ public class GrpcLogAppender extends LogAppender {
     }
 
     synchronized (this) {
-      if (isAppenderRunning() && !responseHandler.isDone()) {
+      if (isRunning() && !responseHandler.isDone()) {
         try {
           wait();
         } catch (InterruptedException ignored) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 3d52e44..ad6d7c37 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -284,7 +284,7 @@ class LeaderStateImpl implements LeaderState {
         server.getId().toString(), null);
     raftLog.append(placeHolder);
     processor.start();
-    senders.forEach(LogAppender::startAppender);
+    senders.forEach(LogAppender::start);
     return placeHolder;
   }
 
@@ -295,7 +295,7 @@ class LeaderStateImpl implements LeaderState {
   void stop() {
     this.running = false;
     // do not interrupt event processor since it may be in the middle of logSync
-    senders.forEach(LogAppender::stopAppender);
+    senders.forEach(LogAppender::stop);
     final NotLeaderException nle = server.generateNotLeaderException();
     final Collection<CommitInfoProto> commitInfos = server.getCommitInfos();
     try {
@@ -313,7 +313,7 @@ class LeaderStateImpl implements LeaderState {
   }
 
   void notifySenders() {
-    senders.forEach(LogAppender::notifyAppend);
+    senders.forEach(LogAppender::notifyLogAppender);
   }
 
   boolean inStagingState() {
@@ -466,7 +466,7 @@ class LeaderStateImpl implements LeaderState {
    * Update sender list for setConfiguration request
    */
   void addAndStartSenders(Collection<RaftPeer> newPeers) {
-    addSenders(newPeers, RaftLog.LEAST_VALID_LOG_INDEX, false).forEach(LogAppender::startAppender);
+    addSenders(newPeers, RaftLog.LEAST_VALID_LOG_INDEX, false).forEach(LogAppender::start);
   }
 
   Collection<LogAppender> addSenders(Collection<RaftPeer> newPeers, long nextIndex, boolean attendVote) {
@@ -486,7 +486,7 @@ class LeaderStateImpl implements LeaderState {
 
   void stopAndRemoveSenders(Predicate<LogAppender> predicate) {
     final List<LogAppender> toStop = senders.stream().filter(predicate).collect(Collectors.toList());
-    toStop.forEach(LogAppender::stopAppender);
+    toStop.forEach(LogAppender::stop);
     senders.removeAll(toStop);
   }
 
@@ -494,7 +494,7 @@ class LeaderStateImpl implements LeaderState {
   public void restart(LogAppender sender) {
     final FollowerInfo follower = sender.getFollower();
     LOG.info("{}: Restarting {} for {}", this, JavaUtils.getClassSimpleName(sender.getClass()), follower.getName());
-    sender.stopAppender();
+    sender.stop();
     senders.removeAll(Collections.singleton(sender));
     addAndStartSenders(Collections.singleton(follower.getPeer()));
   }
@@ -504,7 +504,7 @@ class LeaderStateImpl implements LeaderState {
    */
   private void updateSenders(RaftConfiguration conf) {
     Preconditions.assertTrue(conf.isStable() && !inStagingState());
-    stopAndRemoveSenders(s -> !conf.containsInConf(s.getFollower().getPeer().getId()));
+    stopAndRemoveSenders(s -> !conf.containsInConf(s.getFollowerId()));
   }
 
   void submitStepDownEvent(StepDownReason reason) {
@@ -845,14 +845,14 @@ class LeaderStateImpl implements LeaderState {
   private List<List<RaftPeerId>> divideFollowers(RaftConfiguration conf) {
     List<List<RaftPeerId>> lists = new ArrayList<>(2);
     List<RaftPeerId> listForNew = senders.stream()
-        .filter(sender -> conf.containsInConf(sender.getFollower().getPeer().getId()))
-        .map(sender -> sender.getFollower().getPeer().getId())
+        .map(LogAppender::getFollowerId)
+        .filter(conf::containsInConf)
         .collect(Collectors.toList());
     lists.add(listForNew);
     if (conf.isTransitional()) {
       List<RaftPeerId> listForOld = senders.stream()
-          .filter(sender -> conf.containsInOldConf(sender.getFollower().getPeer().getId()))
-          .map(sender -> sender.getFollower().getPeer().getId())
+          .map(LogAppender::getFollowerId)
+          .filter(conf::containsInOldConf)
           .collect(Collectors.toList());
       lists.add(listForOld);
     }
@@ -923,7 +923,7 @@ class LeaderStateImpl implements LeaderState {
         .filter(sender -> sender.getFollower()
                                 .getLastRpcResponseTime()
                                 .elapsedTimeMs() <= server.getMaxTimeoutMs())
-        .map(sender -> sender.getFollower().getPeer().getId())
+        .map(LogAppender::getFollowerId)
         .collect(Collectors.toList());
 
     final RaftConfiguration conf = server.getRaftConf();
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index a922e00..ada5424 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -50,7 +50,6 @@ public class LogAppender {
   private final String name;
   private final RaftServer.Division server;
   private final LeaderState leaderState;
-  private final RaftLog raftLog;
   private final FollowerInfo follower;
 
   private final DataQueue<EntryWithData> buffer;
@@ -64,7 +63,6 @@ public class LogAppender {
     this.name = follower.getName() + "-" + JavaUtils.getClassSimpleName(getClass());
     this.server = server;
     this.leaderState = leaderState;
-    this.raftLog = server.getRaftLog();
 
     final RaftProperties properties = server.getRaftServer().getProperties();
     this.snapshotChunkMaxSize = RaftServerConfigKeys.Log.Appender.snapshotChunkSizeMax(properties).getSizeInt();
@@ -85,7 +83,7 @@ public class LogAppender {
   }
 
   public RaftLog getRaftLog() {
-    return raftLog;
+    return getServer().getRaftLog();
   }
 
   @Override
@@ -93,15 +91,15 @@ public class LogAppender {
     return name;
   }
 
-  void startAppender() {
+  public void start() {
     daemon.tryToStart();
   }
 
-  public boolean isAppenderRunning() {
+  public boolean isRunning() {
     return daemon.isWorking();
   }
 
-  public void stopAppender() {
+  public void stop() {
     daemon.tryToClose();
   }
 
@@ -123,7 +121,7 @@ public class LogAppender {
     }
 
     final long previousIndex = nextIndex - 1;
-    final TermIndex previous = raftLog.getTermIndex(previousIndex);
+    final TermIndex previous = getRaftLog().getTermIndex(previousIndex);
     if (previous != null) {
       return previous;
     }
@@ -139,7 +137,7 @@ public class LogAppender {
     return null;
   }
 
-  protected AppendEntriesRequestProto createRequest(long callId,
+  protected AppendEntriesRequestProto newAppendEntriesRequest(long callId,
       boolean heartbeat) throws RaftLogIOException {
     final TermIndex previous = getPrevious(follower.getNextIndex());
     final long snapshotIndex = follower.getSnapshotIndex();
@@ -151,11 +149,11 @@ public class LogAppender {
 
     Preconditions.assertTrue(buffer.isEmpty(), () -> "buffer has " + buffer.getNumElements() + " elements.");
 
-    final long leaderNext = raftLog.getNextIndex();
+    final long leaderNext = getRaftLog().getNextIndex();
     final long followerNext = follower.getNextIndex();
     final long halfMs = heartbeatRemainingMs/2;
     for (long next = followerNext; leaderNext > next && getHeartbeatRemainingTime() - halfMs > 0; ) {
-      if (!buffer.offer(raftLog.getEntryWithData(next++))) {
+      if (!buffer.offer(getRaftLog().getEntryWithData(next++))) {
         break;
       }
     }
@@ -195,16 +193,16 @@ public class LogAppender {
       throws InterruptedException, InterruptedIOException, RaftLogIOException {
     int retry = 0;
     AppendEntriesRequestProto request = null;
-    while (isAppenderRunning()) { // keep retrying for IOException
+    while (isRunning()) { // keep retrying for IOException
       try {
         if (request == null || request.getEntriesCount() == 0) {
-          request = createRequest(DEFAULT_CALLID, false);
+          request = newAppendEntriesRequest(DEFAULT_CALLID, false);
         }
 
         if (request == null) {
           LOG.trace("{} no entries to send now, wait ...", this);
           return null;
-        } else if (!isAppenderRunning()) {
+        } else if (!isRunning()) {
           LOG.info("{} is stopped. Skip appendEntries.", this);
           return null;
         }
@@ -224,7 +222,7 @@ public class LogAppender {
         }
         handleException(ioe);
       }
-      if (isAppenderRunning()) {
+      if (isRunning()) {
         server.properties().rpcSleepTime().sleep();
       }
     }
@@ -275,11 +273,11 @@ public class LogAppender {
   }
 
   protected SnapshotInfo shouldInstallSnapshot() {
-    final long logStartIndex = raftLog.getStartIndex();
+    final long logStartIndex = getRaftLog().getStartIndex();
     // we should install snapshot if the follower needs to catch up and:
     // 1. there is no local log entry but there is snapshot
     // 2. or the follower's next index is smaller than the log start index
-    if (follower.getNextIndex() < raftLog.getNextIndex()) {
+    if (follower.getNextIndex() < getRaftLog().getNextIndex()) {
       final SnapshotInfo snapshot = server.getStateMachine().getLatestSnapshot();
       if (follower.getNextIndex() < logStartIndex ||
           (logStartIndex == RaftLog.INVALID_LOG_INDEX && snapshot != null)) {
@@ -290,13 +288,13 @@ public class LogAppender {
   }
 
   /** Check and send appendEntries RPC */
-  protected void runAppenderImpl() throws InterruptedException, IOException {
-    while (isAppenderRunning()) {
+  protected void run() throws InterruptedException, IOException {
+    while (isRunning()) {
       if (shouldSendRequest()) {
         SnapshotInfo snapshot = shouldInstallSnapshot();
         if (snapshot != null) {
           LOG.info("{}: followerNextIndex = {} but logStartIndex = {}, send snapshot {} to follower",
-              this, follower.getNextIndex(), raftLog.getStartIndex(), snapshot);
+              this, follower.getNextIndex(), getRaftLog().getStartIndex(), snapshot);
 
           final InstallSnapshotReplyProto r = installSnapshot(snapshot);
           if (r != null && r.getResult() == InstallSnapshotResult.NOT_LEADER) {
@@ -309,7 +307,7 @@ public class LogAppender {
           }
         }
       }
-      if (isAppenderRunning() && !shouldAppendEntries(follower.getNextIndex())) {
+      if (isRunning() && !shouldAppendEntries(follower.getNextIndex())) {
         final long waitTime = getHeartbeatRemainingTime();
         if (waitTime > 0) {
           synchronized (this) {
@@ -359,8 +357,10 @@ public class LogAppender {
     getServerRpc().handleException(getFollowerId(), e, false);
   }
 
-  public synchronized void notifyAppend() {
-    this.notify();
+  public void notifyLogAppender() {
+    synchronized (this) {
+      this.notify();
+    }
   }
 
   /** Should the leader send appendEntries RPC to this follower? */
@@ -368,16 +368,9 @@ public class LogAppender {
     return shouldAppendEntries(follower.getNextIndex()) || heartbeatTimeout();
   }
 
-  protected boolean haveLogEntriesToSendOut() {
-    return shouldAppendEntries(follower.getNextIndex());
-  }
-
-  protected boolean isFollowerCommitBehindLastCommitIndex() {
-    return raftLog.getLastCommittedIndex() > follower.getCommitIndex();
-  }
 
-  private boolean shouldAppendEntries(long followerIndex) {
-    return followerIndex < raftLog.getNextIndex();
+  public boolean shouldAppendEntries(long followerIndex) {
+    return followerIndex < getRaftLog().getNextIndex();
   }
 
   protected boolean heartbeatTimeout() {
@@ -393,7 +386,7 @@ public class LogAppender {
 
   protected boolean checkResponseTerm(long responseTerm) {
     synchronized (server) {
-      return isAppenderRunning() && leaderState.onFollowerTerm(follower, responseTerm);
+      return isRunning() && leaderState.onFollowerTerm(follower, responseTerm);
     }
   }
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderDaemon.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderDaemon.java
index 4d0a662..4c375a8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderDaemon.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderDaemon.java
@@ -74,7 +74,7 @@ class LogAppenderDaemon {
   private void run() {
     try {
       if (lifeCycle.transition(TRY_TO_RUN) == RUNNING) {
-        logAppender.runAppenderImpl();
+        logAppender.run();
       }
       lifeCycle.compareAndTransition(RUNNING, CLOSING);
     } catch (InterruptedException e) {