You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ha...@apache.org on 2018/04/30 04:35:04 UTC
zookeeper git commit: ZOOKEEPER-2988: NPE triggered if server receives a vote for a server …
Repository: zookeeper
Updated Branches:
refs/heads/master 5c9688764 -> 2022766ec
ZOOKEEPER-2988: NPE triggered if server receives a vote for a server …
…id not in their voting view
Author: Brian Nixon <ni...@fb.com>
Reviewers: Abraham Fine <af...@apache.org>, Michael Han <ha...@apache.org>, Edward Ribeiro <ed...@gmail.com>
Closes #476 from enixon/ZOOKEEPER-2988
Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/2022766e
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/2022766e
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/2022766e
Branch: refs/heads/master
Commit: 2022766ec907f63b1b43fc5455e9e7761cd332f6
Parents: 5c96887
Author: Brian Nixon <ni...@fb.com>
Authored: Sun Apr 29 21:34:57 2018 -0700
Committer: Michael Han <ha...@apache.org>
Committed: Sun Apr 29 21:34:57 2018 -0700
----------------------------------------------------------------------
.../server/quorum/FastLeaderElection.java | 25 +++-
.../server/quorum/QuorumPeerMainTest.java | 136 ++++++++++++++++---
2 files changed, 140 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/2022766e/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java b/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
index f1112b7..1bd0fbf 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
@@ -22,7 +22,6 @@ package org.apache.zookeeper.server.quorum;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -322,7 +321,7 @@ public class FastLeaderElection implements Election {
* If it is from a non-voting server (such as an observer or
* a non-voting follower), respond right away.
*/
- if(!self.getCurrentAndNextConfigVoters().contains(response.sid)) {
+ if(!validVoter(response.sid)) {
Vote current = self.getCurrentVote();
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(ToSend.mType.notification,
@@ -927,10 +926,10 @@ public class FastLeaderElection implements Election {
tmpTimeOut : maxNotificationInterval);
LOG.info("Notification time out: " + notTimeout);
}
- else if (self.getCurrentAndNextConfigVoters().contains(n.sid)) {
+ else if (validVoter(n.sid) && validVoter(n.leader)) {
/*
* Only proceed if the vote comes from a replica in the current or next
- * voting view.
+ * voting view for a replica in the current or next voting view.
*/
switch (n.state) {
case LOOKING:
@@ -1064,7 +1063,12 @@ public class FastLeaderElection implements Election {
break;
}
} else {
- LOG.warn("Ignoring notification from non-cluster member " + n.sid);
+ if (!validVoter(n.leader)) {
+ LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
+ }
+ if (!validVoter(n.sid)) {
+ LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
+ }
}
}
return null;
@@ -1082,4 +1086,15 @@ public class FastLeaderElection implements Election {
manager.getConnectionThreadCount());
}
}
+
+ /**
+ * Check if a given sid is represented in either the current or
+ * the next voting view
+ *
+ * @param sid Server identifier
+ * @return boolean
+ */
+ private boolean validVoter(long sid) {
+ return self.getCurrentAndNextConfigVoters().contains(sid);
+ }
}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/2022766e/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
index 43c341a..9bf3e9f 100644
--- a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
+++ b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
@@ -29,9 +29,9 @@ import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
-import org.apache.log4j.Layout;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
@@ -342,9 +342,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
public void testElectionFraud() throws IOException, InterruptedException {
// capture QuorumPeer logging
ByteArrayOutputStream os = new ByteArrayOutputStream();
- String loggingPattern = ((PatternLayout) Logger.getRootLogger().getAppender("CONSOLE").getLayout()).getConversionPattern();
- WriterAppender appender = new WriterAppender(new PatternLayout(loggingPattern), os);
- appender.setThreshold(Level.INFO);
+ WriterAppender appender = getConsoleAppender(os, Level.INFO);
Logger qlogger = Logger.getLogger(QuorumPeer.class);
qlogger.addAppender(appender);
@@ -540,11 +538,8 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
ClientBase.setupTestEnv();
// setup the logger to capture all logs
- Layout layout =
- Logger.getRootLogger().getAppender("CONSOLE").getLayout();
ByteArrayOutputStream os = new ByteArrayOutputStream();
- WriterAppender appender = new WriterAppender(layout, os);
- appender.setThreshold(Level.WARN);
+ WriterAppender appender = getConsoleAppender(os, Level.WARN);
Logger qlogger = Logger.getLogger("org.apache.zookeeper.server.quorum");
qlogger.addAppender(appender);
@@ -599,11 +594,8 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
ClientBase.setupTestEnv();
// setup the logger to capture all logs
- Layout layout =
- Logger.getRootLogger().getAppender("CONSOLE").getLayout();
ByteArrayOutputStream os = new ByteArrayOutputStream();
- WriterAppender appender = new WriterAppender(layout, os);
- appender.setThreshold(Level.INFO);
+ WriterAppender appender = getConsoleAppender(os, Level.INFO);
Logger qlogger = Logger.getLogger("org.apache.zookeeper.server.quorum");
qlogger.addAppender(appender);
@@ -742,12 +734,9 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
ClientBase.setupTestEnv();
// setup the logger to capture all logs
- Layout layout =
- Logger.getRootLogger().getAppender("CONSOLE").getLayout();
ByteArrayOutputStream os = new ByteArrayOutputStream();
- WriterAppender appender = new WriterAppender(layout, os);
+ WriterAppender appender = getConsoleAppender(os, Level.INFO);
appender.setImmediateFlush(true);
- appender.setThreshold(Level.INFO);
Logger zlogger = Logger.getLogger("org.apache.zookeeper");
zlogger.addAppender(appender);
@@ -1012,4 +1001,119 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
Assert.assertNull("server " + i + " should not have /zk" + leader, servers.zk[i].exists("/zk" + leader, false));
}
}
+
+ /**
+ * Verify that a node without the leader in its view will not attempt to connect to the leader.
+ */
+ @Test
+ public void testLeaderOutOfView() throws Exception {
+ ClientBase.setupTestEnv();
+
+ int numServers = 3;
+
+ // used for assertions later
+ boolean foundLeading = false;
+ boolean foundFollowing = false;
+
+ // capture QuorumPeer logging
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ WriterAppender appender = getConsoleAppender(os, Level.DEBUG);
+ Logger qlogger = Logger.getLogger("org.apache.zookeeper.server.quorum");
+ qlogger.addAppender(appender);
+
+ try {
+ Servers svrs = new Servers();
+ svrs.clientPorts = new int[numServers];
+ for (int i = 0; i < numServers; i++) {
+ svrs.clientPorts[i] = PortAssignment.unique();
+ }
+
+ String quorumCfgIncomplete = getUniquePortCfgForId(1) + "\n" + getUniquePortCfgForId(2);
+ String quorumCfgComplete = quorumCfgIncomplete + "\n" + getUniquePortCfgForId(3);
+ svrs.mt = new MainThread[3];
+
+ // Node 1 is started without the leader (3) in its config view
+ svrs.mt[0] = new MainThread(1, svrs.clientPorts[0], quorumCfgIncomplete);
+ for (int i = 1; i < numServers; i++) {
+ svrs.mt[i] = new MainThread(i + 1, svrs.clientPorts[i], quorumCfgComplete);
+ }
+
+ // Node 1 must be started first, before quorum is formed, to trigger the attempted invalid connection to 3
+ svrs.mt[0].start();
+ QuorumPeer quorumPeer1 = waitForQuorumPeer(svrs.mt[0], CONNECTION_TIMEOUT);
+ Assert.assertTrue(quorumPeer1.getPeerState() == QuorumPeer.ServerState.LOOKING);
+
+ // Node 3 started second to avoid 1 and 2 forming a quorum before 3 starts up
+ int highestServerIndex = numServers - 1;
+ svrs.mt[highestServerIndex].start();
+ QuorumPeer quorumPeer3 = waitForQuorumPeer(svrs.mt[highestServerIndex], CONNECTION_TIMEOUT);
+ Assert.assertTrue(quorumPeer3.getPeerState() == QuorumPeer.ServerState.LOOKING);
+
+ // Node 2 started last, kicks off leader election
+ for (int i = 1; i < highestServerIndex; i++) {
+ svrs.mt[i].start();
+ }
+
+ // Nodes 2 and 3 now form quorum and fully start. 1 attempts to vote for 3, fails, returns to LOOKING state
+ for (int i = 1; i < numServers; i++) {
+ Assert.assertTrue("waiting for server to start",
+ ClientBase.waitForServerUp("127.0.0.1:" + svrs.clientPorts[i], CONNECTION_TIMEOUT));
+ }
+
+ Assert.assertTrue(svrs.mt[0].getQuorumPeer().getPeerState() == QuorumPeer.ServerState.LOOKING);
+ Assert.assertTrue(svrs.mt[highestServerIndex].getQuorumPeer().getPeerState() == QuorumPeer.ServerState.LEADING);
+ for (int i = 1; i < highestServerIndex; i++) {
+ Assert.assertTrue(svrs.mt[i].getQuorumPeer().getPeerState() == QuorumPeer.ServerState.FOLLOWING);
+ }
+
+ // Look through the logs for output that indicates Node 1 is LEADING or FOLLOWING
+ LineNumberReader r = new LineNumberReader(new StringReader(os.toString()));
+ Pattern leading = Pattern.compile(".*myid=1.*QuorumPeer.*LEADING.*");
+ Pattern following = Pattern.compile(".*myid=1.*QuorumPeer.*FOLLOWING.*");
+
+ String line;
+ while ((line = r.readLine()) != null && !foundLeading && !foundFollowing) {
+ foundLeading = leading.matcher(line).matches();
+ foundFollowing = following.matcher(line).matches();
+ }
+
+ } finally {
+ qlogger.removeAppender(appender);
+ }
+
+ Assert.assertFalse("Corrupt peer should never become leader", foundLeading);
+ Assert.assertFalse("Corrupt peer should not attempt connection to out of view leader", foundFollowing);
+ }
+
+ private WriterAppender getConsoleAppender(ByteArrayOutputStream os, Level level) {
+ String loggingPattern = ((PatternLayout) Logger.getRootLogger().getAppender("CONSOLE").getLayout()).getConversionPattern();
+ WriterAppender appender = new WriterAppender(new PatternLayout(loggingPattern), os);
+ appender.setThreshold(level);
+ return appender;
+ }
+
+ private String getUniquePortCfgForId(int id) {
+ return String.format("server.%d=127.0.0.1:%d:%d", id, PortAssignment.unique(), PortAssignment.unique());
+ }
+
+ private QuorumPeer waitForQuorumPeer(MainThread mainThread, int timeout) throws TimeoutException {
+ long start = Time.currentElapsedTime();
+ while (true) {
+ QuorumPeer quorumPeer = mainThread.isAlive() ? mainThread.getQuorumPeer() : null;
+ if (quorumPeer != null) {
+ return quorumPeer;
+ }
+
+ if (Time.currentElapsedTime() > start + timeout) {
+ LOG.error("Timed out while waiting for QuorumPeer");
+ throw new TimeoutException();
+ }
+
+ try {
+ Thread.sleep(250);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
}