You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2017/08/12 22:29:11 UTC

incubator-ratis git commit: RATIS-103. LeaderState.updateSenders may throws UnsupportedOperationException.

Repository: incubator-ratis
Updated Branches:
  refs/heads/master 0235de0ec -> 3e0ad68a0


RATIS-103. LeaderState.updateSenders may throws UnsupportedOperationException.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/3e0ad68a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/3e0ad68a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/3e0ad68a

Branch: refs/heads/master
Commit: 3e0ad68a0897c622b00e85af0b7c06ed1a80fc12
Parents: 0235de0
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Sat Aug 12 15:28:54 2017 -0700
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Sat Aug 12 15:28:54 2017 -0700

----------------------------------------------------------------------
 .../apache/ratis/server/impl/LeaderState.java   | 85 ++++++++++++--------
 .../apache/ratis/server/impl/LogAppender.java   |  3 +-
 2 files changed, 54 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3e0ad68a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index de88382..313a3bb 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -34,7 +34,10 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.apache.ratis.server.impl.LeaderState.StateUpdateEventType.*;
 
@@ -68,6 +71,35 @@ public class LeaderState {
     }
   }
 
+  /**
+   * Use {@link CopyOnWriteArrayList} to implement a thread-safe list.
+   * Since each mutation induces a copy of the list, only bulk operations
+   * (addAll and removeAll) are supported.
+   */
+  static class SenderList {
+    private final List<LogAppender> senders;
+
+    SenderList(LogAppender[] senders) {
+      this.senders = new CopyOnWriteArrayList<>(senders);
+    }
+
+    Stream<LogAppender> stream() {
+      return senders.stream();
+    }
+
+    void forEach(Consumer<LogAppender> action) {
+      senders.forEach(action);
+    }
+
+    boolean addAll(Collection<LogAppender> c) {
+      return senders.addAll(c);
+    }
+
+    boolean removeAll(Collection<LogAppender> c) {
+      return senders.removeAll(c);
+    }
+  }
+
   static final StateUpdateEvent UPDATE_COMMIT_EVENT =
       new StateUpdateEvent(StateUpdateEventType.UPDATECOMMIT, -1);
   static final StateUpdateEvent STAGING_PROGRESS_EVENT =
@@ -83,7 +115,7 @@ public class LeaderState {
    * The list of threads appending entries to followers.
    * The list is protected by the RaftServer's lock.
    */
-  private final List<LogAppender> senders;
+  private final SenderList senders;
   private final BlockingQueue<StateUpdateEvent> eventQ;
   private final EventProcessor processor;
   private final PendingRequests pendingRequests;
@@ -110,11 +142,11 @@ public class LeaderState {
     Collection<RaftPeer> others = conf.getOtherPeers(state.getSelfId());
     final Timestamp t = new Timestamp().addTimeMs(-server.getMaxTimeoutMs());
     placeHolderIndex = raftLog.getNextIndex();
-    senders = new CopyOnWriteArrayList<LogAppender>();
 
-    for (RaftPeer p : others) {
-      senders.add(server.newLogAppender(this, p, t, placeHolderIndex, true));
-    }
+    senders = new SenderList(others.stream().map(
+        p -> server.newLogAppender(this, p, t, placeHolderIndex, true))
+        .toArray(LogAppender[]::new));
+
     voterLists = divideFollowers(conf);
   }
 
@@ -146,10 +178,7 @@ public class LeaderState {
   void stop() {
     this.running = false;
     // do not interrupt event processor since it may be in the middle of logSync
-    for (LogAppender sender : senders) {
-      sender.stopSender();
-      sender.interrupt();
-    }
+    senders.forEach(sender -> sender.stopSender().interrupt());
     try {
       pendingRequests.sendNotLeaderResponses();
     } catch (IOException e) {
@@ -235,11 +264,18 @@ public class LeaderState {
   void addSenders(Collection<RaftPeer> newMembers) {
     final Timestamp t = new Timestamp().addTimeMs(-server.getMaxTimeoutMs());
     final long nextIndex = raftLog.getNextIndex();
-    for (RaftPeer peer : newMembers) {
+
+    senders.addAll(newMembers.stream().map(peer -> {
       LogAppender sender = server.newLogAppender(this, peer, t, nextIndex, false);
-      senders.add(sender);
       sender.start();
-    }
+      return sender;
+    }).collect(Collectors.toList()));
+  }
+
+  void stopAndRemoveSenders(Predicate<LogAppender> predicate) {
+    final List<LogAppender> toStop = senders.stream().filter(predicate).collect(Collectors.toList());
+    toStop.forEach(s -> s.stopSender().interrupt());
+    senders.removeAll(toStop);
   }
 
   /**
@@ -247,15 +283,7 @@ public class LeaderState {
    */
   private void updateSenders(RaftConfiguration conf) {
     Preconditions.assertTrue(conf.isStable() && !inStagingState());
-    Iterator<LogAppender> iterator = senders.iterator();
-    while (iterator.hasNext()) {
-      LogAppender sender = iterator.next();
-      if (!conf.containsInConf(sender.getFollower().getPeer().getId())) {
-        iterator.remove();
-        sender.stopSender();
-        sender.interrupt();
-      }
-    }
+    stopAndRemoveSenders(s -> !conf.containsInConf(s.getFollower().getPeer().getId()));
   }
 
   void submitUpdateStateEvent(StateUpdateEvent event) {
@@ -384,9 +412,7 @@ public class LeaderState {
       } else if (!reports.contains(BootStrapProgress.PROGRESSING)) {
         // all caught up!
         applyOldNewConf();
-        for (LogAppender sender : senders) {
-          sender.getFollower().startAttendVote();
-        }
+        senders.forEach(s -> s.getFollower().startAttendVote());
       }
     }
   }
@@ -573,15 +599,8 @@ public class LeaderState {
     }
 
     void fail() {
-      Iterator<LogAppender> iterator = senders.iterator();
-      while (iterator.hasNext()) {
-        LogAppender sender = iterator.next();
-        if (!sender.getFollower().isAttendingVote()) {
-          iterator.remove();
-          sender.stopSender();
-          sender.interrupt();
-        }
-      }
+      stopAndRemoveSenders(s -> !s.getFollower().isAttendingVote());
+
       LeaderState.this.stagingState = null;
       // send back failure response to client's request
       pendingRequests.failSetConfiguration(

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3e0ad68a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index 36c6171..a5b0791 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -98,8 +98,9 @@ public class LogAppender extends Daemon {
     return sending;
   }
 
-  public void stopSender() {
+  public LogAppender stopSender() {
     this.sending = false;
+    return this;
   }
 
   public FollowerInfo getFollower() {