You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by mi...@apache.org on 2015/05/14 07:11:34 UTC
svn commit: r1679314 - in /zookeeper/branches/branch-3.5: CHANGES.txt
src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
Author: michim
Date: Thu May 14 05:11:33 2015
New Revision: 1679314
URL: http://svn.apache.org/r1679314
Log:
ZOOKEEPER-2186 QuorumCnxManager#receiveConnection may crash with random input (rgs via michim)
Modified:
zookeeper/branches/branch-3.5/CHANGES.txt
zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
Modified: zookeeper/branches/branch-3.5/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.5/CHANGES.txt?rev=1679314&r1=1679313&r2=1679314&view=diff
==============================================================================
--- zookeeper/branches/branch-3.5/CHANGES.txt (original)
+++ zookeeper/branches/branch-3.5/CHANGES.txt Thu May 14 05:11:33 2015
@@ -125,6 +125,9 @@ IMPROVEMENTS:
ZOOKEEPER-2153 X509 Authentication Documentation
(Ian Dimayuga via hdeng)
+ ZOOKEEPER-2186 QuorumCnxManager#receiveConnection may crash with random input
+ (rgs via michim)
+
Release 3.5.0 - 8/4/2014
NEW FEATURES:
Modified: zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=1679314&r1=1679313&r2=1679314&view=diff
==============================================================================
--- zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java (original)
+++ zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java Thu May 14 05:11:33 2015
@@ -82,7 +82,17 @@ public class QuorumCnxManager {
*/
private long observerCounter = -1;
-
+
+ /*
+ * Protocol identifier used among peers
+ */
+ public static final long PROTOCOL_VERSION = -65536L;
+
+ /*
+ * Max buffer size to be read from the network.
+ */
+ static public final int maxBuffer = 2048;
+
/*
* Connection time out value in milliseconds
*/
@@ -136,6 +146,72 @@ public class QuorumCnxManager {
long sid;
}
+ /*
+ * This class parses the initial identification sent out by peers with their
+ * sid & hostname.
+ */
+ static public class InitialMessage {
+ public Long sid;
+ public InetSocketAddress electionAddr;
+
+ InitialMessage(Long sid, InetSocketAddress address) {
+ this.sid = sid;
+ this.electionAddr = address;
+ }
+
+ @SuppressWarnings("serial")
+ public static class InitialMessageException extends Exception {
+ InitialMessageException(String message, Object... args) {
+ super(String.format(message, args));
+ }
+ }
+
+ static public InitialMessage parse(Long protocolVersion, DataInputStream din)
+ throws InitialMessageException, IOException {
+ Long sid;
+
+ if (protocolVersion != PROTOCOL_VERSION) {
+ throw new InitialMessageException(
+ "Got unrecognized protocol version %s", protocolVersion);
+ }
+
+ sid = din.readLong();
+
+ int remaining = din.readInt();
+ if (remaining <= 0 || remaining > maxBuffer) {
+ throw new InitialMessageException(
+ "Unreasonable buffer length: %s", remaining);
+ }
+
+ byte[] b = new byte[remaining];
+ int num_read = din.read(b);
+
+ if (num_read != remaining) {
+ throw new InitialMessageException(
+ "Read only %s bytes out of %s sent by server %s",
+ num_read, remaining, sid);
+ }
+
+ // FIXME: IPv6 is not supported. Using something like Guava's HostAndPort
+ // parser would be good.
+ String addr = new String(b);
+ String[] host_port = addr.split(":");
+
+ if (host_port.length != 2) {
+ throw new InitialMessageException("Badly formed address: %s", addr);
+ }
+
+ int port;
+ try {
+ port = Integer.parseInt(host_port[1]);
+ } catch (NumberFormatException e) {
+ throw new InitialMessageException("Bad port number: %s", host_port[1]);
+ }
+
+ return new InitialMessage(sid, new InetSocketAddress(host_port[0], port));
+ }
+ }
+
public QuorumCnxManager(QuorumPeer self) {
this.recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY);
this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
@@ -179,7 +255,7 @@ public class QuorumCnxManager {
// Sending id and challenge
dout = new DataOutputStream(sock.getOutputStream());
// represents protocol version (in other words - message type)
- dout.writeLong(0xffff0000);
+ dout.writeLong(PROTOCOL_VERSION);
dout.writeLong(self.getId());
String addr = self.getElectionAddress().getHostString() + ":" + self.getElectionAddress().getPort();
byte[] addr_bytes = addr.getBytes();
@@ -229,31 +305,28 @@ public class QuorumCnxManager {
* possible long value to lose the challenge.
*
*/
- public boolean receiveConnection(Socket sock) {
+ public void receiveConnection(Socket sock) {
Long sid = null, protocolVersion = null;
InetSocketAddress electionAddr = null;
+
try {
DataInputStream din = new DataInputStream(sock.getInputStream());
+
protocolVersion = din.readLong();
if (protocolVersion >= 0) { // this is a server id and not a protocol version
sid = protocolVersion;
} else {
- sid = din.readLong();
- int num_remaining_bytes = din.readInt();
- byte[] b = new byte[num_remaining_bytes];
- int num_read = din.read(b);
- if (num_read == num_remaining_bytes) {
- if (protocolVersion == 0xffff0000) {
- String addr = new String(b);
- String[] host_port = addr.split(":");
- electionAddr = new InetSocketAddress(host_port[0], Integer.parseInt(host_port[1]));
- } else {
- LOG.error("Got urecognized protocol version " + protocolVersion + " from " + sid);
- }
- } else {
- LOG.error("Read only " + num_read + " bytes out of " + num_remaining_bytes + " sent by server " + sid);
+ try {
+ InitialMessage init = InitialMessage.parse(protocolVersion, din);
+ sid = init.sid;
+ electionAddr = init.electionAddr;
+ } catch (InitialMessage.InitialMessageException ex) {
+ LOG.error(ex.toString());
+ closeSocket(sock);
+ return;
}
- }
+ }
+
if (sid == QuorumPeer.OBSERVER_ID) {
/*
* Choose identifier at random. We need a value to identify
@@ -261,12 +334,12 @@ public class QuorumCnxManager {
*/
sid = observerCounter--;
- LOG.info("Setting arbitrary identifier to observer: " + sid);
+ LOG.info("Setting arbitrary identifier to observer: {}", sid);
}
} catch (IOException e) {
closeSocket(sock);
- LOG.warn("Exception reading or writing challenge: " + e.toString());
- return false;
+ LOG.warn("Exception reading or writing challenge: {}", e.toString());
+ return;
}
//If wins the challenge, then close the new connection.
@@ -284,7 +357,7 @@ public class QuorumCnxManager {
/*
* Now we start a new connection
*/
- LOG.debug("Create new connection to server: " + sid);
+ LOG.debug("Create new connection to server: {}", sid);
closeSocket(sock);
if (electionAddr != null) {
@@ -293,28 +366,25 @@ public class QuorumCnxManager {
connectOne(sid);
}
- // Otherwise start worker threads to receive data.
- } else {
+ } else { // Otherwise start worker threads to receive data.
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
- if(vsw != null)
+ if (vsw != null) {
vsw.finish();
-
+ }
+
senderWorkerMap.put(sid, sw);
-
- queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(
- SEND_CAPACITY));
+
+ queueSendMap.putIfAbsent(sid,
+ new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
sw.start();
rw.start();
-
- return true;
}
- return false;
}
/**
Modified: zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java?rev=1679314&r1=1679313&r2=1679314&view=diff
==============================================================================
--- zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java (original)
+++ zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java Thu May 14 05:11:33 2015
@@ -18,6 +18,9 @@
package org.apache.zookeeper.test;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.net.InetSocketAddress;
@@ -37,6 +40,7 @@ import org.apache.zookeeper.PortAssignme
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.server.quorum.QuorumCnxManager;
import org.apache.zookeeper.server.quorum.QuorumCnxManager.Message;
+import org.apache.zookeeper.server.quorum.QuorumCnxManager.InitialMessage;
import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
@@ -242,7 +246,7 @@ public class CnxManagerTest extends ZKTe
InetSocketAddress otherAddr = peers.get(new Long(2)).electionAddr;
DataOutputStream dout = new DataOutputStream(sc.socket().getOutputStream());
- dout.writeLong(0xffff0000);
+ dout.writeLong(QuorumCnxManager.PROTOCOL_VERSION);
dout.writeLong(new Long(2));
String addr = otherAddr.getHostString()+ ":" + otherAddr.getPort();
byte[] addr_bytes = addr.getBytes();
@@ -433,4 +437,85 @@ public class CnxManagerTest extends ZKTe
}
return null;
}
+
+ @Test
+ public void testInitialMessage() throws Exception {
+ InitialMessage msg;
+ ByteArrayOutputStream bos;
+ DataInputStream din;
+ DataOutputStream dout;
+ String hostport;
+
+ // message with bad protocol version
+ try {
+
+ // the initial message (without the protocol version)
+ hostport = "10.0.0.2:3888";
+ bos = new ByteArrayOutputStream();
+ dout = new DataOutputStream(bos);
+ dout.writeLong(5L); // sid
+ dout.writeInt(hostport.getBytes().length);
+ dout.writeBytes(hostport);
+
+ // now parse it
+ din = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
+ msg = InitialMessage.parse(-65530L, din);
+ Assert.fail("bad protocol version accepted");
+ } catch (InitialMessage.InitialMessageException ex) {}
+
+ // message too long
+ try {
+
+ hostport = createLongString(1048576);
+ bos = new ByteArrayOutputStream();
+ dout = new DataOutputStream(bos);
+ dout.writeLong(5L); // sid
+ dout.writeInt(hostport.getBytes().length);
+ dout.writeBytes(hostport);
+
+ din = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
+ msg = InitialMessage.parse(QuorumCnxManager.PROTOCOL_VERSION, din);
+ Assert.fail("long message accepted");
+ } catch (InitialMessage.InitialMessageException ex) {}
+
+ // bad hostport string
+ try {
+
+ hostport = "what's going on here?";
+ bos = new ByteArrayOutputStream();
+ dout = new DataOutputStream(bos);
+ dout.writeLong(5L); // sid
+ dout.writeInt(hostport.getBytes().length);
+ dout.writeBytes(hostport);
+
+ din = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
+ msg = InitialMessage.parse(QuorumCnxManager.PROTOCOL_VERSION, din);
+ Assert.fail("bad hostport accepted");
+ } catch (InitialMessage.InitialMessageException ex) {}
+
+ // good message
+ try {
+
+ hostport = "10.0.0.2:3888";
+ bos = new ByteArrayOutputStream();
+ dout = new DataOutputStream(bos);
+ dout.writeLong(5L); // sid
+ dout.writeInt(hostport.getBytes().length);
+ dout.writeBytes(hostport);
+
+ // now parse it
+ din = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
+ msg = InitialMessage.parse(QuorumCnxManager.PROTOCOL_VERSION, din);
+ } catch (InitialMessage.InitialMessageException ex) {
+ Assert.fail(ex.toString());
+ }
+ }
+
+ private String createLongString(int size) {
+ StringBuilder sb = new StringBuilder(size);
+ for (int i=0; i < size; i++) {
+ sb.append('x');
+ }
+ return sb.toString();
+ }
}