You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ph...@apache.org on 2012/10/31 19:42:19 UTC
svn commit: r1404289 - in /zookeeper/branches/branch-3.4/src/java:
main/org/apache/zookeeper/ClientCnxnSocketNIO.java
test/org/apache/zookeeper/test/ClientTest.java
Author: phunt
Date: Wed Oct 31 18:42:18 2012
New Revision: 1404289
URL: http://svn.apache.org/viewvc?rev=1404289&view=rev
Log:
ZOOKEEPER-1560 Zookeeper client hangs on creation of large nodes (Skye Wanderman-Milne via phunt)
Modified:
zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java
zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/ClientTest.java
Modified: zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java?rev=1404289&r1=1404288&r2=1404289&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java (original)
+++ zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java Wed Oct 31 18:42:18 2012
@@ -25,16 +25,16 @@ import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.ListIterator;
import java.util.Set;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.zookeeper.ClientCnxn.EndOfStreamException;
import org.apache.zookeeper.ClientCnxn.Packet;
import org.apache.zookeeper.ZooDefs.OpCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ClientCnxnSocketNIO extends ClientCnxnSocket {
private static final Logger LOG = LoggerFactory
@@ -99,83 +99,85 @@ public class ClientCnxnSocketNIO extends
}
}
if (sockKey.isWritable()) {
- LinkedList<Packet> pending = new LinkedList<Packet>();
- Packet p = null;
synchronized(outgoingQueue) {
- p = findSendablePacket(outgoingQueue,
+ Packet p = findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress());
if (p != null) {
- outgoingQueue.removeFirstOccurrence(p);
updateLastSend();
- if ((p.requestHeader != null) &&
- (p.requestHeader.getType() != OpCode.ping) &&
- (p.requestHeader.getType() != OpCode.auth)) {
- p.requestHeader.setXid(cnxn.getXid());
+ // If we already started writing p, p.bb will already exist
+ if (p.bb == null) {
+ if ((p.requestHeader != null) &&
+ (p.requestHeader.getType() != OpCode.ping) &&
+ (p.requestHeader.getType() != OpCode.auth)) {
+ p.requestHeader.setXid(cnxn.getXid());
+ }
+ p.createBB();
}
- p.createBB();
- ByteBuffer pbb = p.bb;
- sock.write(pbb);
- if (!pbb.hasRemaining()) {
+ sock.write(p.bb);
+ if (!p.bb.hasRemaining()) {
sentCount++;
+ outgoingQueue.removeFirstOccurrence(p);
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
- pending.add(p);
+ synchronized (pendingQueue) {
+ pendingQueue.add(p);
+ }
}
}
- } else {
- // No suitable packet to send: turn off write interest flag.
+ }
+ if (outgoingQueue.isEmpty()) {
+ // No more packets to send: turn off write interest flag.
// Will be turned on later by a later call to enableWrite(),
// from within ZooKeeperSaslClient (if client is configured
// to attempt SASL authentication), or in either doIO() or
// in doTransport() if not.
disableWrite();
+ } else {
+ // Just in case
+ enableWrite();
}
}
- synchronized(pendingQueue) {
- pendingQueue.addAll(pending);
- }
-
}
}
private Packet findSendablePacket(LinkedList<Packet> outgoingQueue,
boolean clientTunneledAuthenticationInProgress) {
synchronized (outgoingQueue) {
- if (!outgoingQueue.isEmpty()) {
- if (clientTunneledAuthenticationInProgress) {
- Packet p = null;
- // Since client's authentication with server is in progress,
- // send only the null-header packet queued by primeConnection().
- // This packet must be sent so that the SASL authentication process
- // can proceed, but all other packets should wait until
- // SASL authentication completes.
- Iterator<Packet> iter = outgoingQueue.listIterator();
- while(iter.hasNext()) {
- p = iter.next();
- if (p.requestHeader == null) {
- // We've found the priming-packet.
- return p;
- } else {
- // Non-priming packet: defer it until later, leaving it in the queue
- // until authentication completes.
- if (LOG.isDebugEnabled()) {
- LOG.debug("deferring non-priming packet: " + p +
- "until SASL authentication completes.");
- }
- }
- }
- // no sendable packet found.
- return null;
+ if (outgoingQueue.isEmpty()) {
+ return null;
+ }
+ if (outgoingQueue.getFirst().bb != null // If we've already starting sending the first packet, we better finish
+ || !clientTunneledAuthenticationInProgress) {
+ return outgoingQueue.getFirst();
+ }
+
+ // Since client's authentication with server is in progress,
+ // send only the null-header packet queued by primeConnection().
+ // This packet must be sent so that the SASL authentication process
+ // can proceed, but all other packets should wait until
+ // SASL authentication completes.
+ ListIterator<Packet> iter = outgoingQueue.listIterator();
+ while (iter.hasNext()) {
+ Packet p = iter.next();
+ if (p.requestHeader == null) {
+ // We've found the priming-packet. Move it to the beginning of the queue.
+ iter.remove();
+ outgoingQueue.add(0, p);
+ return p;
} else {
- // Tunnelled authentication is not in progress: just
- // send the first packet in the queue.
- return outgoingQueue.getFirst();
+ // Non-priming packet: defer it until later, leaving it in the queue
+ // until authentication completes.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("deferring non-priming packet: " + p +
+ "until SASL authentication completes.");
+ }
}
}
+ // no sendable packet found.
+ return null;
}
- return null;
}
@Override
Modified: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/ClientTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/ClientTest.java?rev=1404289&r1=1404288&r2=1404289&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/ClientTest.java (original)
+++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/ClientTest.java Wed Oct 31 18:42:18 2012
@@ -523,6 +523,22 @@ public class ClientTest extends ClientBa
}
+ @Test
+ public void testLargeNodeData() throws Exception {
+ ZooKeeper zk= null;
+ String queue_handle = "/large";
+ try {
+ zk = createClient();
+
+ zk.create(queue_handle, new byte[500000], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ } finally {
+ if (zk != null) {
+ zk.close();
+ }
+ }
+
+ }
private void verifyCreateFails(String path, ZooKeeper zk) throws Exception {
try {