You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2017/08/12 22:29:11 UTC
incubator-ratis git commit: RATIS-103. LeaderState.updateSenders may
throws UnsupportedOperationException.
Repository: incubator-ratis
Updated Branches:
refs/heads/master 0235de0ec -> 3e0ad68a0
RATIS-103. LeaderState.updateSenders may throws UnsupportedOperationException.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/3e0ad68a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/3e0ad68a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/3e0ad68a
Branch: refs/heads/master
Commit: 3e0ad68a0897c622b00e85af0b7c06ed1a80fc12
Parents: 0235de0
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Sat Aug 12 15:28:54 2017 -0700
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Sat Aug 12 15:28:54 2017 -0700
----------------------------------------------------------------------
.../apache/ratis/server/impl/LeaderState.java | 85 ++++++++++++--------
.../apache/ratis/server/impl/LogAppender.java | 3 +-
2 files changed, 54 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3e0ad68a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index de88382..313a3bb 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -34,7 +34,10 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.apache.ratis.server.impl.LeaderState.StateUpdateEventType.*;
@@ -68,6 +71,35 @@ public class LeaderState {
}
}
+ /**
+ * Use {@link CopyOnWriteArrayList} to implement a thread-safe list.
+ * Since each mutation induces a copy of the list, only bulk operations
+ * (addAll and removeAll) are supported.
+ */
+ static class SenderList {
+ private final List<LogAppender> senders;
+
+ SenderList(LogAppender[] senders) {
+ this.senders = new CopyOnWriteArrayList<>(senders);
+ }
+
+ Stream<LogAppender> stream() {
+ return senders.stream();
+ }
+
+ void forEach(Consumer<LogAppender> action) {
+ senders.forEach(action);
+ }
+
+ boolean addAll(Collection<LogAppender> c) {
+ return senders.addAll(c);
+ }
+
+ boolean removeAll(Collection<LogAppender> c) {
+ return senders.removeAll(c);
+ }
+ }
+
static final StateUpdateEvent UPDATE_COMMIT_EVENT =
new StateUpdateEvent(StateUpdateEventType.UPDATECOMMIT, -1);
static final StateUpdateEvent STAGING_PROGRESS_EVENT =
@@ -83,7 +115,7 @@ public class LeaderState {
* The list of threads appending entries to followers.
* The list is protected by the RaftServer's lock.
*/
- private final List<LogAppender> senders;
+ private final SenderList senders;
private final BlockingQueue<StateUpdateEvent> eventQ;
private final EventProcessor processor;
private final PendingRequests pendingRequests;
@@ -110,11 +142,11 @@ public class LeaderState {
Collection<RaftPeer> others = conf.getOtherPeers(state.getSelfId());
final Timestamp t = new Timestamp().addTimeMs(-server.getMaxTimeoutMs());
placeHolderIndex = raftLog.getNextIndex();
- senders = new CopyOnWriteArrayList<LogAppender>();
- for (RaftPeer p : others) {
- senders.add(server.newLogAppender(this, p, t, placeHolderIndex, true));
- }
+ senders = new SenderList(others.stream().map(
+ p -> server.newLogAppender(this, p, t, placeHolderIndex, true))
+ .toArray(LogAppender[]::new));
+
voterLists = divideFollowers(conf);
}
@@ -146,10 +178,7 @@ public class LeaderState {
void stop() {
this.running = false;
// do not interrupt event processor since it may be in the middle of logSync
- for (LogAppender sender : senders) {
- sender.stopSender();
- sender.interrupt();
- }
+ senders.forEach(sender -> sender.stopSender().interrupt());
try {
pendingRequests.sendNotLeaderResponses();
} catch (IOException e) {
@@ -235,11 +264,18 @@ public class LeaderState {
void addSenders(Collection<RaftPeer> newMembers) {
final Timestamp t = new Timestamp().addTimeMs(-server.getMaxTimeoutMs());
final long nextIndex = raftLog.getNextIndex();
- for (RaftPeer peer : newMembers) {
+
+ senders.addAll(newMembers.stream().map(peer -> {
LogAppender sender = server.newLogAppender(this, peer, t, nextIndex, false);
- senders.add(sender);
sender.start();
- }
+ return sender;
+ }).collect(Collectors.toList()));
+ }
+
+ void stopAndRemoveSenders(Predicate<LogAppender> predicate) {
+ final List<LogAppender> toStop = senders.stream().filter(predicate).collect(Collectors.toList());
+ toStop.forEach(s -> s.stopSender().interrupt());
+ senders.removeAll(toStop);
}
/**
@@ -247,15 +283,7 @@ public class LeaderState {
*/
private void updateSenders(RaftConfiguration conf) {
Preconditions.assertTrue(conf.isStable() && !inStagingState());
- Iterator<LogAppender> iterator = senders.iterator();
- while (iterator.hasNext()) {
- LogAppender sender = iterator.next();
- if (!conf.containsInConf(sender.getFollower().getPeer().getId())) {
- iterator.remove();
- sender.stopSender();
- sender.interrupt();
- }
- }
+ stopAndRemoveSenders(s -> !conf.containsInConf(s.getFollower().getPeer().getId()));
}
void submitUpdateStateEvent(StateUpdateEvent event) {
@@ -384,9 +412,7 @@ public class LeaderState {
} else if (!reports.contains(BootStrapProgress.PROGRESSING)) {
// all caught up!
applyOldNewConf();
- for (LogAppender sender : senders) {
- sender.getFollower().startAttendVote();
- }
+ senders.forEach(s -> s.getFollower().startAttendVote());
}
}
}
@@ -573,15 +599,8 @@ public class LeaderState {
}
void fail() {
- Iterator<LogAppender> iterator = senders.iterator();
- while (iterator.hasNext()) {
- LogAppender sender = iterator.next();
- if (!sender.getFollower().isAttendingVote()) {
- iterator.remove();
- sender.stopSender();
- sender.interrupt();
- }
- }
+ stopAndRemoveSenders(s -> !s.getFollower().isAttendingVote());
+
LeaderState.this.stagingState = null;
// send back failure response to client's request
pendingRequests.failSetConfiguration(
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3e0ad68a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index 36c6171..a5b0791 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -98,8 +98,9 @@ public class LogAppender extends Daemon {
return sending;
}
- public void stopSender() {
+ public LogAppender stopSender() {
this.sending = false;
+ return this;
}
public FollowerInfo getFollower() {