You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by mi...@apache.org on 2015/05/14 07:10:01 UTC

svn commit: r1679313 - in /zookeeper/trunk: CHANGES.txt src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java src/java/test/org/apache/zookeeper/test/CnxManagerTest.java

Author: michim
Date: Thu May 14 05:10:01 2015
New Revision: 1679313

URL: http://svn.apache.org/r1679313
Log:
ZOOKEEPER-2186 QuorumCnxManager#receiveConnection may crash with random input (rgs via michim)

Modified:
    zookeeper/trunk/CHANGES.txt
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java

Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1679313&r1=1679312&r2=1679313&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Thu May 14 05:10:01 2015
@@ -93,6 +93,9 @@ BUGFIXES:
 
   ZOOKEEPER-2182 Several test suites are not running during pre-commit, because their names do not end with "Test". (Chris Nauroth via hdeng)
 
+  ZOOKEEPER-2186 QuorumCnxManager#receiveConnection may crash with random input
+  (rgs via michim)
+
 IMPROVEMENTS:
   ZOOKEEPER-1660 Documentation for Dynamic Reconfiguration (Reed Wanderman-Milne via shralex)  
 

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=1679313&r1=1679312&r2=1679313&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java Thu May 14 05:10:01 2015
@@ -82,7 +82,17 @@ public class QuorumCnxManager {
      */
     
     private long observerCounter = -1;
-    
+
+    /*
+     * Protocol identifier used among peers
+     */
+    public static final long PROTOCOL_VERSION = -65536L;
+
+    /*
+     * Max buffer size to be read from the network.
+     */
+    static public final int maxBuffer = 2048;
+
     /*
      * Connection time out value in milliseconds 
      */
@@ -136,6 +146,72 @@ public class QuorumCnxManager {
         long sid;
     }
 
+    /*
+     * This class parses the initial identification sent out by peers with their
+     * sid & hostname.
+     */
+    static public class InitialMessage {
+        public Long sid;
+        public InetSocketAddress electionAddr;
+
+        InitialMessage(Long sid, InetSocketAddress address) {
+            this.sid = sid;
+            this.electionAddr = address;
+        }
+
+        @SuppressWarnings("serial")
+        public static class InitialMessageException extends Exception {
+            InitialMessageException(String message, Object... args) {
+                super(String.format(message, args));
+            }
+        }
+
+        static public InitialMessage parse(Long protocolVersion, DataInputStream din)
+            throws InitialMessageException, IOException {
+            Long sid;
+
+            if (protocolVersion != PROTOCOL_VERSION) {
+                throw new InitialMessageException(
+                        "Got unrecognized protocol version %s", protocolVersion);
+            }
+
+            sid = din.readLong();
+
+            int remaining = din.readInt();
+            if (remaining <= 0 || remaining > maxBuffer) {
+                throw new InitialMessageException(
+                        "Unreasonable buffer length: %s", remaining);
+            }
+
+            byte[] b = new byte[remaining];
+            int num_read = din.read(b);
+
+            if (num_read != remaining) {
+                throw new InitialMessageException(
+                        "Read only %s bytes out of %s sent by server %s",
+                        num_read, remaining, sid);
+            }
+
+            // FIXME: IPv6 is not supported. Using something like Guava's HostAndPort
+            //        parser would be good.
+            String addr = new String(b);
+            String[] host_port = addr.split(":");
+
+            if (host_port.length != 2) {
+                throw new InitialMessageException("Badly formed address: %s", addr);
+            }
+
+            int port;
+            try {
+                port = Integer.parseInt(host_port[1]);
+            } catch (NumberFormatException e) {
+                throw new InitialMessageException("Bad port number: %s", host_port[1]);
+            }
+
+            return new InitialMessage(sid, new InetSocketAddress(host_port[0], port));
+        }
+    }
+
     public QuorumCnxManager(QuorumPeer self) {
         this.recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY);
         this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
@@ -179,7 +255,7 @@ public class QuorumCnxManager {
             // Sending id and challenge
             dout = new DataOutputStream(sock.getOutputStream());
             // represents protocol version (in other words - message type)
-            dout.writeLong(0xffff0000);              
+            dout.writeLong(PROTOCOL_VERSION);
             dout.writeLong(self.getId());
             String addr = self.getElectionAddress().getHostString() + ":" + self.getElectionAddress().getPort();
             byte[] addr_bytes = addr.getBytes();
@@ -229,31 +305,28 @@ public class QuorumCnxManager {
      * possible long value to lose the challenge.
      * 
      */
-    public boolean receiveConnection(Socket sock) {
+    public void receiveConnection(Socket sock) {
         Long sid = null, protocolVersion = null;
         InetSocketAddress electionAddr = null;
+
         try {
             DataInputStream din = new DataInputStream(sock.getInputStream());
+
             protocolVersion = din.readLong();
             if (protocolVersion >= 0) { // this is a server id and not a protocol version
                 sid = protocolVersion;
             } else {
-                sid = din.readLong();
-                int num_remaining_bytes = din.readInt();
-                byte[] b = new byte[num_remaining_bytes];
-                int num_read = din.read(b);
-                if (num_read == num_remaining_bytes) {
-                    if (protocolVersion == 0xffff0000) {
-                        String addr = new String(b);
-                        String[] host_port = addr.split(":");
-                        electionAddr = new InetSocketAddress(host_port[0], Integer.parseInt(host_port[1]));                   
-                    } else {
-                        LOG.error("Got urecognized protocol version " + protocolVersion + " from " + sid);
-                    }
-                } else {
-                   LOG.error("Read only " + num_read + " bytes out of " + num_remaining_bytes + " sent by server " + sid);          
+                try {
+                    InitialMessage init = InitialMessage.parse(protocolVersion, din);
+                    sid = init.sid;
+                    electionAddr = init.electionAddr;
+                } catch (InitialMessage.InitialMessageException ex) {
+                    LOG.error(ex.toString());
+                    closeSocket(sock);
+                    return;
                 }
-            } 
+            }
+
             if (sid == QuorumPeer.OBSERVER_ID) {
                 /*
                  * Choose identifier at random. We need a value to identify
@@ -261,12 +334,12 @@ public class QuorumCnxManager {
                  */
                 
                 sid = observerCounter--;
-                LOG.info("Setting arbitrary identifier to observer: " + sid);
+                LOG.info("Setting arbitrary identifier to observer: {}", sid);
             }
         } catch (IOException e) {
             closeSocket(sock);
-            LOG.warn("Exception reading or writing challenge: " + e.toString());
-            return false;
+            LOG.warn("Exception reading or writing challenge: {}", e.toString());
+            return;
         }
         
         //If wins the challenge, then close the new connection.
@@ -284,7 +357,7 @@ public class QuorumCnxManager {
             /*
              * Now we start a new connection
              */
-            LOG.debug("Create new connection to server: " + sid);
+            LOG.debug("Create new connection to server: {}", sid);
             closeSocket(sock);
 
             if (electionAddr != null) {
@@ -293,28 +366,25 @@ public class QuorumCnxManager {
                 connectOne(sid);
             }
 
-            // Otherwise start worker threads to receive data.
-        } else {
+        } else { // Otherwise start worker threads to receive data.
             SendWorker sw = new SendWorker(sock, sid);
             RecvWorker rw = new RecvWorker(sock, sid, sw);
             sw.setRecv(rw);
 
             SendWorker vsw = senderWorkerMap.get(sid);
             
-            if(vsw != null)
+            if (vsw != null) {
                 vsw.finish();
-            
+            }
+
             senderWorkerMap.put(sid, sw);
-            
-            queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(
-                        SEND_CAPACITY));
+
+            queueSendMap.putIfAbsent(sid,
+                    new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
             
             sw.start();
             rw.start();
-            
-            return true;    
         }
-        return false;
     }
 
     /**

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java?rev=1679313&r1=1679312&r2=1679313&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java Thu May 14 05:10:01 2015
@@ -18,6 +18,9 @@
 
 package org.apache.zookeeper.test;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.net.InetSocketAddress;
@@ -37,6 +40,7 @@ import org.apache.zookeeper.PortAssignme
 import org.apache.zookeeper.ZKTestCase;
 import org.apache.zookeeper.server.quorum.QuorumCnxManager;
 import org.apache.zookeeper.server.quorum.QuorumCnxManager.Message;
+import org.apache.zookeeper.server.quorum.QuorumCnxManager.InitialMessage;
 import org.apache.zookeeper.server.quorum.QuorumPeer;
 import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
@@ -242,7 +246,7 @@ public class CnxManagerTest extends ZKTe
 
         InetSocketAddress otherAddr = peers.get(new Long(2)).electionAddr;
         DataOutputStream dout = new DataOutputStream(sc.socket().getOutputStream());
-        dout.writeLong(0xffff0000);
+        dout.writeLong(QuorumCnxManager.PROTOCOL_VERSION);
         dout.writeLong(new Long(2));
         String addr = otherAddr.getHostString()+ ":" + otherAddr.getPort();
         byte[] addr_bytes = addr.getBytes();
@@ -433,4 +437,85 @@ public class CnxManagerTest extends ZKTe
         }
         return null;
     }
+
+    @Test
+    public void testInitialMessage() throws Exception {
+        InitialMessage msg;
+        ByteArrayOutputStream bos;
+        DataInputStream din;
+        DataOutputStream dout;
+        String hostport;
+
+        // message with bad protocol version
+        try {
+
+            // the initial message (without the protocol version)
+            hostport = "10.0.0.2:3888";
+            bos = new ByteArrayOutputStream();
+            dout = new DataOutputStream(bos);
+            dout.writeLong(5L); // sid
+            dout.writeInt(hostport.getBytes().length);
+            dout.writeBytes(hostport);
+
+            // now parse it
+            din = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
+            msg = InitialMessage.parse(-65530L, din);
+            Assert.fail("bad protocol version accepted");
+        } catch (InitialMessage.InitialMessageException ex) {}
+
+        // message too long
+        try {
+
+            hostport = createLongString(1048576);
+            bos = new ByteArrayOutputStream();
+            dout = new DataOutputStream(bos);
+            dout.writeLong(5L); // sid
+            dout.writeInt(hostport.getBytes().length);
+            dout.writeBytes(hostport);
+
+            din = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
+            msg = InitialMessage.parse(QuorumCnxManager.PROTOCOL_VERSION, din);
+            Assert.fail("long message accepted");
+        } catch (InitialMessage.InitialMessageException ex) {}
+
+        // bad hostport string
+        try {
+
+            hostport = "what's going on here?";
+            bos = new ByteArrayOutputStream();
+            dout = new DataOutputStream(bos);
+            dout.writeLong(5L); // sid
+            dout.writeInt(hostport.getBytes().length);
+            dout.writeBytes(hostport);
+
+            din = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
+            msg = InitialMessage.parse(QuorumCnxManager.PROTOCOL_VERSION, din);
+            Assert.fail("bad hostport accepted");
+        } catch (InitialMessage.InitialMessageException ex) {}
+
+        // good message
+        try {
+
+            hostport = "10.0.0.2:3888";
+            bos = new ByteArrayOutputStream();
+            dout = new DataOutputStream(bos);
+            dout.writeLong(5L); // sid
+            dout.writeInt(hostport.getBytes().length);
+            dout.writeBytes(hostport);
+
+            // now parse it
+            din = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
+            msg = InitialMessage.parse(QuorumCnxManager.PROTOCOL_VERSION, din);
+        } catch (InitialMessage.InitialMessageException ex) {
+            Assert.fail(ex.toString());
+        }
+    }
+
+    private String createLongString(int size) {
+        StringBuilder sb = new StringBuilder(size);
+        for (int i=0; i < size; i++) {
+            sb.append('x');
+        }
+        return sb.toString();
+    }
 }