You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ha...@apache.org on 2018/04/30 04:35:04 UTC

zookeeper git commit: ZOOKEEPER-2988: NPE triggered if server receives a vote for a server …

Repository: zookeeper
Updated Branches:
  refs/heads/master 5c9688764 -> 2022766ec


ZOOKEEPER-2988: NPE triggered if server receives a vote for a server …

…id not in their voting view

Author: Brian Nixon <ni...@fb.com>

Reviewers: Abraham Fine <af...@apache.org>, Michael Han <ha...@apache.org>, Edward Ribeiro <ed...@gmail.com>

Closes #476 from enixon/ZOOKEEPER-2988


Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/2022766e
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/2022766e
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/2022766e

Branch: refs/heads/master
Commit: 2022766ec907f63b1b43fc5455e9e7761cd332f6
Parents: 5c96887
Author: Brian Nixon <ni...@fb.com>
Authored: Sun Apr 29 21:34:57 2018 -0700
Committer: Michael Han <ha...@apache.org>
Committed: Sun Apr 29 21:34:57 2018 -0700

----------------------------------------------------------------------
 .../server/quorum/FastLeaderElection.java       |  25 +++-
 .../server/quorum/QuorumPeerMainTest.java       | 136 ++++++++++++++++---
 2 files changed, 140 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zookeeper/blob/2022766e/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java b/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
index f1112b7..1bd0fbf 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
@@ -22,7 +22,6 @@ package org.apache.zookeeper.server.quorum;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -322,7 +321,7 @@ public class FastLeaderElection implements Election {
                          * If it is from a non-voting server (such as an observer or
                          * a non-voting follower), respond right away.
                          */
-                        if(!self.getCurrentAndNextConfigVoters().contains(response.sid)) {
+                        if(!validVoter(response.sid)) {
                             Vote current = self.getCurrentVote();
                             QuorumVerifier qv = self.getQuorumVerifier();
                             ToSend notmsg = new ToSend(ToSend.mType.notification,
@@ -927,10 +926,10 @@ public class FastLeaderElection implements Election {
                             tmpTimeOut : maxNotificationInterval);
                     LOG.info("Notification time out: " + notTimeout);
                 } 
-                else if (self.getCurrentAndNextConfigVoters().contains(n.sid)) {
+                else if (validVoter(n.sid) && validVoter(n.leader)) {
                     /*
                      * Only proceed if the vote comes from a replica in the current or next
-                     * voting view.
+                     * voting view for a replica in the current or next voting view.
                      */
                     switch (n.state) {
                     case LOOKING:
@@ -1064,7 +1063,12 @@ public class FastLeaderElection implements Election {
                         break;
                     }
                 } else {
-                    LOG.warn("Ignoring notification from non-cluster member " + n.sid);
+                    if (!validVoter(n.leader)) {
+                        LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
+                    }
+                    if (!validVoter(n.sid)) {
+                        LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
+                    }
                 }
             }
             return null;
@@ -1082,4 +1086,15 @@ public class FastLeaderElection implements Election {
                     manager.getConnectionThreadCount());
         }
     }
+
+    /**
+     * Check if a given sid is represented in either the current or
+     * the next voting view
+     *
+     * @param sid     Server identifier
+     * @return boolean
+     */
+    private boolean validVoter(long sid) {
+        return self.getCurrentAndNextConfigVoters().contains(sid);
+    }
 }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/2022766e/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
index 43c341a..9bf3e9f 100644
--- a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
+++ b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
@@ -29,9 +29,9 @@ import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeoutException;
 import java.util.regex.Pattern;
 
-import org.apache.log4j.Layout;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PatternLayout;
@@ -342,9 +342,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
     public void testElectionFraud() throws IOException, InterruptedException {
         // capture QuorumPeer logging
         ByteArrayOutputStream os = new ByteArrayOutputStream();
-        String loggingPattern = ((PatternLayout) Logger.getRootLogger().getAppender("CONSOLE").getLayout()).getConversionPattern();
-        WriterAppender appender = new WriterAppender(new PatternLayout(loggingPattern), os);
-        appender.setThreshold(Level.INFO);
+        WriterAppender appender = getConsoleAppender(os, Level.INFO);
         Logger qlogger = Logger.getLogger(QuorumPeer.class);
         qlogger.addAppender(appender);
 
@@ -540,11 +538,8 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
         ClientBase.setupTestEnv();
 
         // setup the logger to capture all logs
-        Layout layout =
-                Logger.getRootLogger().getAppender("CONSOLE").getLayout();
         ByteArrayOutputStream os = new ByteArrayOutputStream();
-        WriterAppender appender = new WriterAppender(layout, os);
-        appender.setThreshold(Level.WARN);
+        WriterAppender appender = getConsoleAppender(os, Level.WARN);
         Logger qlogger = Logger.getLogger("org.apache.zookeeper.server.quorum");
         qlogger.addAppender(appender);
 
@@ -599,11 +594,8 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
         ClientBase.setupTestEnv();
 
         // setup the logger to capture all logs
-        Layout layout =
-                Logger.getRootLogger().getAppender("CONSOLE").getLayout();
         ByteArrayOutputStream os = new ByteArrayOutputStream();
-        WriterAppender appender = new WriterAppender(layout, os);
-        appender.setThreshold(Level.INFO);
+        WriterAppender appender = getConsoleAppender(os, Level.INFO);
         Logger qlogger = Logger.getLogger("org.apache.zookeeper.server.quorum");
         qlogger.addAppender(appender);
 
@@ -742,12 +734,9 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
         ClientBase.setupTestEnv();
 
         // setup the logger to capture all logs
-        Layout layout =
-                Logger.getRootLogger().getAppender("CONSOLE").getLayout();
         ByteArrayOutputStream os = new ByteArrayOutputStream();
-        WriterAppender appender = new WriterAppender(layout, os);
+        WriterAppender appender = getConsoleAppender(os, Level.INFO);
         appender.setImmediateFlush(true);
-        appender.setThreshold(Level.INFO);
         Logger zlogger = Logger.getLogger("org.apache.zookeeper");
         zlogger.addAppender(appender);
 
@@ -1012,4 +1001,119 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
             Assert.assertNull("server " + i + " should not have /zk" + leader, servers.zk[i].exists("/zk" + leader, false));
         }
     }
+
+    /**
+     * Verify that a node without the leader in its view will not attempt to connect to the leader.
+     */
+    @Test
+    public void testLeaderOutOfView() throws Exception {
+        ClientBase.setupTestEnv();
+
+        int numServers = 3;
+
+        // used for assertions later
+        boolean foundLeading = false;
+        boolean foundFollowing = false;
+
+        // capture QuorumPeer logging
+        ByteArrayOutputStream os = new ByteArrayOutputStream();
+        WriterAppender appender = getConsoleAppender(os, Level.DEBUG);
+        Logger qlogger = Logger.getLogger("org.apache.zookeeper.server.quorum");
+        qlogger.addAppender(appender);
+
+        try {
+            Servers svrs = new Servers();
+            svrs.clientPorts = new int[numServers];
+            for (int i = 0; i < numServers; i++) {
+                svrs.clientPorts[i] = PortAssignment.unique();
+            }
+
+            String quorumCfgIncomplete = getUniquePortCfgForId(1) + "\n" + getUniquePortCfgForId(2);
+            String quorumCfgComplete = quorumCfgIncomplete + "\n" + getUniquePortCfgForId(3);
+            svrs.mt = new MainThread[3];
+
+            // Node 1 is started without the leader (3) in its config view
+            svrs.mt[0] = new MainThread(1, svrs.clientPorts[0], quorumCfgIncomplete);
+            for (int i = 1; i < numServers; i++) {
+                svrs.mt[i] = new MainThread(i + 1, svrs.clientPorts[i], quorumCfgComplete);
+            }
+
+            // Node 1 must be started first, before quorum is formed, to trigger the attempted invalid connection to 3
+            svrs.mt[0].start();
+            QuorumPeer quorumPeer1 = waitForQuorumPeer(svrs.mt[0], CONNECTION_TIMEOUT);
+            Assert.assertTrue(quorumPeer1.getPeerState() == QuorumPeer.ServerState.LOOKING);
+
+            // Node 3 started second to avoid 1 and 2 forming a quorum before 3 starts up
+            int highestServerIndex = numServers - 1;
+            svrs.mt[highestServerIndex].start();
+            QuorumPeer quorumPeer3 = waitForQuorumPeer(svrs.mt[highestServerIndex], CONNECTION_TIMEOUT);
+            Assert.assertTrue(quorumPeer3.getPeerState() == QuorumPeer.ServerState.LOOKING);
+
+            // Node 2 started last, kicks off leader election
+            for (int i = 1; i < highestServerIndex; i++) {
+                svrs.mt[i].start();
+            }
+
+            // Nodes 2 and 3 now form quorum and fully start. 1 attempts to vote for 3, fails, returns to LOOKING state
+            for (int i = 1; i < numServers; i++) {
+                Assert.assertTrue("waiting for server to start",
+                        ClientBase.waitForServerUp("127.0.0.1:" + svrs.clientPorts[i], CONNECTION_TIMEOUT));
+            }
+
+            Assert.assertTrue(svrs.mt[0].getQuorumPeer().getPeerState() == QuorumPeer.ServerState.LOOKING);
+            Assert.assertTrue(svrs.mt[highestServerIndex].getQuorumPeer().getPeerState() == QuorumPeer.ServerState.LEADING);
+            for (int i = 1; i < highestServerIndex; i++) {
+                Assert.assertTrue(svrs.mt[i].getQuorumPeer().getPeerState() == QuorumPeer.ServerState.FOLLOWING);
+            }
+
+            // Look through the logs for output that indicates Node 1 is LEADING or FOLLOWING
+            LineNumberReader r = new LineNumberReader(new StringReader(os.toString()));
+            Pattern leading = Pattern.compile(".*myid=1.*QuorumPeer.*LEADING.*");
+            Pattern following = Pattern.compile(".*myid=1.*QuorumPeer.*FOLLOWING.*");
+
+            String line;
+            while ((line = r.readLine()) != null && !foundLeading && !foundFollowing) {
+                foundLeading = leading.matcher(line).matches();
+                foundFollowing = following.matcher(line).matches();
+            }
+
+        } finally {
+            qlogger.removeAppender(appender);
+        }
+
+        Assert.assertFalse("Corrupt peer should never become leader", foundLeading);
+        Assert.assertFalse("Corrupt peer should not attempt connection to out of view leader", foundFollowing);
+    }
+
+    private WriterAppender getConsoleAppender(ByteArrayOutputStream os, Level level) {
+        String loggingPattern = ((PatternLayout) Logger.getRootLogger().getAppender("CONSOLE").getLayout()).getConversionPattern();
+        WriterAppender appender = new WriterAppender(new PatternLayout(loggingPattern), os);
+        appender.setThreshold(level);
+        return appender;
+    }
+
+    private String getUniquePortCfgForId(int id) {
+        return String.format("server.%d=127.0.0.1:%d:%d", id, PortAssignment.unique(), PortAssignment.unique());
+    }
+
+    private QuorumPeer waitForQuorumPeer(MainThread mainThread, int timeout) throws TimeoutException {
+        long start = Time.currentElapsedTime();
+        while (true) {
+            QuorumPeer quorumPeer = mainThread.isAlive() ? mainThread.getQuorumPeer() : null;
+            if (quorumPeer != null) {
+                return quorumPeer;
+            }
+
+            if (Time.currentElapsedTime() > start + timeout) {
+                LOG.error("Timed out while waiting for QuorumPeer");
+                throw new TimeoutException();
+            }
+
+            try {
+                Thread.sleep(250);
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        }
+    }
 }