You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ph...@apache.org on 2012/10/31 18:24:55 UTC

svn commit: r1404260 - /zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java

Author: phunt
Date: Wed Oct 31 17:24:54 2012
New Revision: 1404260

URL: http://svn.apache.org/viewvc?rev=1404260&view=rev
Log:
ZOOKEEPER-1560 Zookeeper client hangs on creation of large nodes (Skye Wanderman-Milne via phunt)

Modified:
    zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java?rev=1404260&r1=1404259&r2=1404260&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java Wed Oct 31 17:24:54 2012
@@ -26,16 +26,16 @@ import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.ListIterator;
 import java.util.Set;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.ClientCnxn.EndOfStreamException;
 import org.apache.zookeeper.ClientCnxn.Packet;
 import org.apache.zookeeper.ZooDefs.OpCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ClientCnxnSocketNIO extends ClientCnxnSocket {
     private static final Logger LOG = LoggerFactory
@@ -104,83 +104,85 @@ public class ClientCnxnSocketNIO extends
             }
         }
         if (sockKey.isWritable()) {
-            LinkedList<Packet> pending = new LinkedList<Packet>();
-            Packet p = null;
             synchronized(outgoingQueue) {
-                p = findSendablePacket(outgoingQueue,
+                Packet p = findSendablePacket(outgoingQueue,
                         cnxn.sendThread.clientTunneledAuthenticationInProgress());
 
                 if (p != null) {
-                    outgoingQueue.removeFirstOccurrence(p);
                     updateLastSend();
-                    if ((p.requestHeader != null) &&
-                            (p.requestHeader.getType() != OpCode.ping) &&
-                            (p.requestHeader.getType() != OpCode.auth)) {
-                        p.requestHeader.setXid(cnxn.getXid());
+                    // If we already started writing p, p.bb will already exist
+                    if (p.bb == null) {
+                        if ((p.requestHeader != null) &&
+                                (p.requestHeader.getType() != OpCode.ping) &&
+                                (p.requestHeader.getType() != OpCode.auth)) {
+                            p.requestHeader.setXid(cnxn.getXid());
+                        }
+                        p.createBB();
                     }
-                    p.createBB();
-                    ByteBuffer pbb = p.bb;
-                    sock.write(pbb);
-                    if (!pbb.hasRemaining()) {
+                    sock.write(p.bb);
+                    if (!p.bb.hasRemaining()) {
                         sentCount++;
+                        outgoingQueue.removeFirstOccurrence(p);
                         if (p.requestHeader != null
                                 && p.requestHeader.getType() != OpCode.ping
                                 && p.requestHeader.getType() != OpCode.auth) {
-                            pending.add(p);
+                            synchronized (pendingQueue) {
+                                pendingQueue.add(p);
+                            }
                         }
                     }
-                } else {
-                    // No suitable packet to send: turn off write interest flag.
+                }
+                if (outgoingQueue.isEmpty()) {
+                    // No more packets to send: turn off write interest flag.
                     // Will be turned on later by a later call to enableWrite(),
                     // from within ZooKeeperSaslClient (if client is configured
                     // to attempt SASL authentication), or in either doIO() or
                     // in doTransport() if not.
                     disableWrite();
+                } else {
+                    // Just in case
+                    enableWrite();
                 }
             }
-            synchronized(pendingQueue) {
-                pendingQueue.addAll(pending);
-            }
-
         }
     }
 
     private Packet findSendablePacket(LinkedList<Packet> outgoingQueue,
                                       boolean clientTunneledAuthenticationInProgress) {
         synchronized (outgoingQueue) {
-            if (!outgoingQueue.isEmpty()) {
-                if (clientTunneledAuthenticationInProgress) {
-                    Packet p = null;
-                    // Since client's authentication with server is in progress,
-                    // send only the null-header packet queued by primeConnection().
-                    // This packet must be sent so that the SASL authentication process
-                    // can proceed, but all other packets should wait until
-                    // SASL authentication completes.
-                    Iterator<Packet> iter = outgoingQueue.listIterator();
-                    while(iter.hasNext()) {
-                        p = iter.next();
-                        if (p.requestHeader == null) {
-                            // We've found the priming-packet.
-                            return p;
-                        } else {
-                            // Non-priming packet: defer it until later, leaving it in the queue
-                            // until authentication completes.
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug("deferring non-priming packet: " + p +
-                                        "until SASL authentication completes.");
-                            }
-                        }
-                    }
-                    // no sendable packet found.
-                    return null;
+            if (outgoingQueue.isEmpty()) {
+                return null;
+            }
+            if (outgoingQueue.getFirst().bb != null // If we've already starting sending the first packet, we better finish
+                || !clientTunneledAuthenticationInProgress) {
+                return outgoingQueue.getFirst();
+            }
+
+            // Since client's authentication with server is in progress,
+            // send only the null-header packet queued by primeConnection().
+            // This packet must be sent so that the SASL authentication process
+            // can proceed, but all other packets should wait until
+            // SASL authentication completes.
+            ListIterator<Packet> iter = outgoingQueue.listIterator();
+            while (iter.hasNext()) {
+                Packet p = iter.next();
+                if (p.requestHeader == null) {
+                    // We've found the priming-packet. Move it to the beginning of the queue.
+                    iter.remove();
+                    outgoingQueue.add(0, p);
+                    return p;
                 } else {
-                    // Tunnelled authentication is not in progress: just
-                    // send the first packet in the queue.
-                    return outgoingQueue.getFirst();
+                    // Non-priming packet: defer it until later, leaving it in the queue
+                    // until authentication completes.
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("deferring non-priming packet: " + p +
+                                "until SASL authentication completes.");
+                    }
                 }
             }
+            // no sendable packet found.
+            return null;
         }
-        return null;
     }
 
     @Override