You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ph...@apache.org on 2014/07/23 19:36:20 UTC

svn commit: r1612885 - in /zookeeper/trunk: CHANGES.txt src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java src/java/test/org/apache/zookeeper/test/CnxManagerTest.java

Author: phunt
Date: Wed Jul 23 17:36:19 2014
New Revision: 1612885

URL: http://svn.apache.org/r1612885
Log:
ZOOKEEPER-1789. 3.4.x observer causes NPE on 3.5.0 (trunk) participants (Alex Shraer via phunt)

Modified:
    zookeeper/trunk/CHANGES.txt
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java

Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1612885&r1=1612884&r2=1612885&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Wed Jul 23 17:36:19 2014
@@ -729,6 +729,9 @@ BUGFIXES:
   ZOOKEEPER-1984. testLeaderTimesoutOnNewQuorum is a flakey test
   (Alex Shraer via phunt)
 
+  ZOOKEEPER-1789. 3.4.x observer causes NPE on 3.5.0 (trunk)
+  participants (Alex Shraer via phunt)
+
 IMPROVEMENTS:
 
   ZOOKEEPER-1170. Fix compiler (eclipse) warnings: unused imports,

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=1612885&r1=1612884&r2=1612885&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java Wed Jul 23 17:36:19 2014
@@ -231,13 +231,12 @@ public class QuorumCnxManager {
      */
     public boolean receiveConnection(Socket sock) {
         Long sid = null, protocolVersion = null;
-        InetSocketAddress electionAddr;
+        InetSocketAddress electionAddr = null;
         try {
             DataInputStream din = new DataInputStream(sock.getInputStream());
             protocolVersion = din.readLong();
             if (protocolVersion >= 0) { // this is a server id and not a protocol version
-               sid = protocolVersion;  
-                electionAddr = self.getVotingView().get(sid).electionAddr;
+                sid = protocolVersion;
             } else {
                 sid = din.readLong();
                 int num_remaining_bytes = din.readInt();
@@ -250,11 +249,9 @@ public class QuorumCnxManager {
                         electionAddr = new InetSocketAddress(host_port[0], Integer.parseInt(host_port[1]));                   
                     } else {
                         LOG.error("Got urecognized protocol version " + protocolVersion + " from " + sid);
-                        electionAddr = null;
                     }
                 } else {
-                   LOG.error("Read only " + num_read + " bytes out of " + num_remaining_bytes + " sent by server " + sid);
-                   electionAddr = null;                
+                   LOG.error("Read only " + num_read + " bytes out of " + num_remaining_bytes + " sent by server " + sid);          
                 }
             } 
             if (sid == QuorumPeer.OBSERVER_ID) {
@@ -289,7 +286,12 @@ public class QuorumCnxManager {
              */
             LOG.debug("Create new connection to server: " + sid);
             closeSocket(sock);
-            connectOne(sid, electionAddr);
+
+            if (electionAddr != null) {
+                connectOne(sid, electionAddr);
+            } else {
+                connectOne(sid);
+            }
 
             // Otherwise start worker threads to receive data.
         } else {

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java?rev=1612885&r1=1612884&r2=1612885&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java Wed Jul 23 17:36:19 2014
@@ -37,6 +37,7 @@ import org.apache.zookeeper.ZKTestCase;
 import org.apache.zookeeper.server.quorum.QuorumCnxManager;
 import org.apache.zookeeper.server.quorum.QuorumCnxManager.Message;
 import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
 import org.junit.Assert;
@@ -274,6 +275,60 @@ public class CnxManagerTest extends ZKTe
         Assert.assertFalse(cnxManager.listener.isAlive());
     }
 
+    /**
+     * Tests a bug in QuorumCnxManager that causes a NPE when a 3.4.6
+     * observer connects to a 3.5.0 server. 
+     * {@link https://issues.apache.org/jira/browse/ZOOKEEPER-1789}
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testCnxManagerNPE() throws Exception {
+        // the connecting peer (id = 2) is a 3.4.6 observer
+        peers.get(2L).type = LearnerType.OBSERVER;
+        QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1],
+                peerClientPort[1], 3, 1, 1000, 2, 2);
+        QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
+        QuorumCnxManager.Listener listener = cnxManager.listener;
+        if (listener != null) {
+            listener.start();
+        } else {
+            LOG.error("Null listener when initializing cnx manager");
+        }
+        int port = peers.get(peer.getId()).electionAddr.getPort();
+        LOG.info("Election port: " + port);
+
+        Thread.sleep(1000);
+
+        SocketChannel sc = SocketChannel.open();
+        sc.socket().connect(peers.get(1L).electionAddr, 5000);
+
+        /*
+         * Write id (3.4.6 protocol). This previously caused a NPE in
+         * QuorumCnxManager.
+         */
+        byte[] msgBytes = new byte[8];
+        ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
+        msgBuffer.putLong(2L);
+        msgBuffer.position(0);
+        sc.write(msgBuffer);
+
+        msgBuffer = ByteBuffer.wrap(new byte[8]);
+        // write length of message
+        msgBuffer.putInt(4);
+        // write message
+        msgBuffer.putInt(5);
+        msgBuffer.position(0);
+        sc.write(msgBuffer);
+
+        Message m = cnxManager.pollRecvQueue(1000, TimeUnit.MILLISECONDS);
+        Assert.assertNotNull(m);
+
+        peer.shutdown();
+        cnxManager.halt();
+        Assert.assertFalse(cnxManager.listener.isAlive());
+    }
+
     /*
      * Test if a receiveConnection is able to timeout on socket errors
      */