You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ma...@apache.org on 2012/09/09 20:22:06 UTC

svn commit: r1382554 - in /zookeeper/branches/branch-3.4: ./ src/java/main/org/apache/zookeeper/ src/java/main/org/apache/zookeeper/client/ src/java/test/org/apache/zookeeper/test/

Author: mahadev
Date: Sun Sep  9 18:22:06 2012
New Revision: 1382554

URL: http://svn.apache.org/viewvc?rev=1382554&view=rev
Log:
ZOOKEEPER-1437. Client uses session before SASL authentication complete. (Eugene Koontz via mahadev)

Modified:
    zookeeper/branches/branch-3.4/CHANGES.txt
    zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/ClientCnxn.java
    zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java
    zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java
    zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/client/ZooKeeperSaslClient.java
    zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/SaslAuthDesignatedClientTest.java
    zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/SaslAuthFailDesignatedClientTest.java
    zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/SaslAuthFailTest.java
    zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/SaslAuthMissingClientConfigTest.java
    zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/SaslAuthTest.java

Modified: zookeeper/branches/branch-3.4/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/CHANGES.txt?rev=1382554&r1=1382553&r2=1382554&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/CHANGES.txt (original)
+++ zookeeper/branches/branch-3.4/CHANGES.txt Sun Sep  9 18:22:06 2012
@@ -128,6 +128,9 @@ IMPROVEMENTS:
   ZOOKEEPER-1497. Allow server-side SASL login with JAAS configuration
   to be programmatically set (rather than only by reading JAAS
   configuration file) (Matteo Bertozzi via phunt)
+ 
+  ZOOKEEPER-1437. Client uses session before SASL authentication complete.
+  (Eugene Koontz via mahadev)
 
 Release 3.4.3 - 2012-02-06
 

Modified: zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/ClientCnxn.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/ClientCnxn.java?rev=1382554&r1=1382553&r2=1382554&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/ClientCnxn.java (original)
+++ zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/ClientCnxn.java Sun Sep  9 18:22:06 2012
@@ -64,11 +64,11 @@ import org.apache.zookeeper.proto.GetACL
 import org.apache.zookeeper.proto.GetChildren2Response;
 import org.apache.zookeeper.proto.GetChildrenResponse;
 import org.apache.zookeeper.proto.GetDataResponse;
+import org.apache.zookeeper.proto.GetSASLRequest;
 import org.apache.zookeeper.proto.ReplyHeader;
 import org.apache.zookeeper.proto.RequestHeader;
 import org.apache.zookeeper.proto.SetACLResponse;
 import org.apache.zookeeper.proto.SetDataResponse;
-import org.apache.zookeeper.proto.SetSASLResponse;
 import org.apache.zookeeper.proto.SetWatches;
 import org.apache.zookeeper.proto.WatcherEvent;
 import org.apache.zookeeper.server.ByteBufferInputStream;
@@ -251,6 +251,8 @@ public class ClientCnxn {
 
         WatchRegistration watchRegistration;
 
+        public boolean readOnly;
+
         /** Convenience ctor */
         Packet(RequestHeader requestHeader, ReplyHeader replyHeader,
                Record request, Record response,
@@ -267,7 +269,11 @@ public class ClientCnxn {
             this.replyHeader = replyHeader;
             this.request = request;
             this.response = response;
+            this.readOnly = readOnly;
+            this.watchRegistration = watchRegistration;
+        }
 
+        public void createBB() {
             try {
                 ByteArrayOutputStream baos = new ByteArrayOutputStream();
                 BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
@@ -289,8 +295,6 @@ public class ClientCnxn {
             } catch (IOException e) {
                 LOG.warn("Ignoring unexpected exception", e);
             }
-
-            this.watchRegistration = watchRegistration;
         }
 
         @Override
@@ -380,12 +384,6 @@ public class ClientCnxn {
 
     }
 
-    // used by ZooKeeperSaslClient.queueSaslPacket().
-    public void queuePacket(RequestHeader h, ReplyHeader r, Record request,
-            Record response, AsyncCallback cb) {
-        queuePacket(h,r,request,response, cb, null, null, this, null);
-    }
-
     /**
      * tests use this to check on reset of watches
      * @return if the auto reset of watches are disabled
@@ -553,17 +551,6 @@ public class ClientCnxn {
                       } else {
                           cb.processResult(rc, clientPath, p.ctx, null);
                       }
-                  } else if (p.cb instanceof ZooKeeperSaslClient.ServerSaslResponseCallback) {
-                      ZooKeeperSaslClient.ServerSaslResponseCallback cb = (ZooKeeperSaslClient.ServerSaslResponseCallback) p.cb;
-                      SetSASLResponse rsp = (SetSASLResponse) p.response;
-                      // TODO : check rc (== 0, etc) as with other packet types.
-                      cb.processResult(rc,null,p.ctx,rsp.getToken(),null);
-                      ClientCnxn clientCnxn = (ClientCnxn)p.ctx;
-                      if ((clientCnxn == null) || (clientCnxn.zooKeeperSaslClient == null) ||
-                              (clientCnxn.zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.FAILED)) {
-                          queueEvent(new WatchedEvent(EventType.None,
-                                  KeeperState.AuthFailed, null));
-                      }
                   } else if (p.response instanceof GetDataResponse) {
                       DataCallback cb = (DataCallback) p.cb;
                       GetDataResponse rsp = (GetDataResponse) p.response;
@@ -777,6 +764,18 @@ public class ClientCnxn {
                 eventThread.queueEvent( we );
                 return;
             }
+
+            // If SASL authentication is currently in progress, construct and
+            // send a response packet immediately, rather than queuing a
+            // response as with other packets.
+            if (clientTunneledAuthenticationInProgress()) {
+                GetSASLRequest request = new GetSASLRequest();
+                request.deserialize(bbia,"token");
+                zooKeeperSaslClient.respondToServer(request.getToken(),
+                  ClientCnxn.this);
+                return;
+            }
+
             Packet packet;
             synchronized (pendingQueue) {
                 if (pendingQueue.size() == 0) {
@@ -923,6 +922,10 @@ public class ClientCnxn {
 
         private int pingRwTimeout = minPingRwTimeout;
 
+        // Set to true if and only if constructor of ZooKeeperSaslClient
+        // throws a LoginException: see startConnect() below.
+        private boolean saslLoginFailed = false;
+
         private void startConnect() throws IOException {
             state = States.CONNECTING;
 
@@ -939,11 +942,16 @@ public class ClientCnxn {
             try {
                 zooKeeperSaslClient = new ZooKeeperSaslClient("zookeeper/"+addr.getHostName());
             } catch (LoginException e) {
+                // An authentication error occurred when the SASL client tried to initialize:
+                // for Kerberos this means that the client failed to authenticate with the KDC.
+                // This is different from an authentication error that occurs during communication
+                // with the Zookeeper server, which is handled below.
                 LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without "
-                        + "SASL authentication, if Zookeeper server allows it.");
+                  + "SASL authentication, if Zookeeper server allows it.");
                 eventThread.queueEvent(new WatchedEvent(
-                        Watcher.Event.EventType.None,
-                        Watcher.Event.KeeperState.AuthFailed, null));
+                  Watcher.Event.EventType.None,
+                  Watcher.Event.KeeperState.AuthFailed, null));
+                saslLoginFailed = true;
             }
             logStartConnect(addr);
 
@@ -987,21 +995,35 @@ public class ClientCnxn {
                     }
 
                     if (state.isConnected()) {
-                        if ((zooKeeperSaslClient != null) && (zooKeeperSaslClient.isFailed() != true) && (zooKeeperSaslClient.isComplete() != true)) {
-                            try {
-                                zooKeeperSaslClient.initialize(ClientCnxn.this);
+                        // determine whether we need to send an AuthFailed event.
+                        if (zooKeeperSaslClient != null) {
+                            boolean sendAuthEvent = false;
+                            if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
+                                try {
+                                    zooKeeperSaslClient.initialize(ClientCnxn.this);
+                                } catch (SaslException e) {
+                                   LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
+                                    state = States.AUTH_FAILED;
+                                    sendAuthEvent = true;
+                                }
                             }
-                            catch (SaslException e) {
-                                LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
-                                state = States.AUTH_FAILED;
-                                eventThread.queueEvent(new WatchedEvent(
-                                        Watcher.Event.EventType.None,
-                                        KeeperState.AuthFailed,null));
+                            KeeperState authState = zooKeeperSaslClient.getKeeperState();
+                            if (authState != null) {
+                                if (authState == KeeperState.AuthFailed) {
+                                    // An authentication error occurred during authentication with the Zookeeper Server.
+                                    state = States.AUTH_FAILED;
+                                    sendAuthEvent = true;
+                                } else {
+                                    if (authState == KeeperState.SaslAuthenticated) {
+                                        sendAuthEvent = true;
+                                    }
+                                }
                             }
-                            if (zooKeeperSaslClient.readyToSendSaslAuthEvent()) {
+
+                            if (sendAuthEvent == true) {
                                 eventThread.queueEvent(new WatchedEvent(
-                                  Watcher.Event.EventType.None,
-                                  Watcher.Event.KeeperState.SaslAuthenticated, null));
+                                      Watcher.Event.EventType.None,
+                                      authState,null));
                             }
                         }
                         to = readTimeout - clientCnxnSocket.getIdleRecv();
@@ -1022,7 +1044,6 @@ public class ClientCnxn {
                         if (timeToNextPing <= 0) {
                             sendPing();
                             clientCnxnSocket.updateLastSend();
-                            clientCnxnSocket.enableWrite();
                         } else {
                             if (timeToNextPing < to) {
                                 to = timeToNextPing;
@@ -1044,8 +1065,7 @@ public class ClientCnxn {
                         to = Math.min(to, pingRwTimeout - idlePingRwServer);
                     }
 
-                    clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue);
-
+                    clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
                 } catch (Throwable e) {
                     if (closing) {
                         if (LOG.isDebugEnabled()) {
@@ -1204,6 +1224,26 @@ public class ClientCnxn {
         void testableCloseSocket() throws IOException {
             clientCnxnSocket.testableCloseSocket();
         }
+
+        public boolean clientTunneledAuthenticationInProgress() {
+            // 1. SASL login failed.
+            if (saslLoginFailed == true) {
+                return false;
+            }
+
+            // 2. SendThread has not created the authenticating object yet,
+            // therefore authentication is (at the earliest stage of being) in progress.
+            if (zooKeeperSaslClient == null) {
+                return true;
+            }
+
+            // 3. authenticating object exists, so ask it for its progress.
+            return zooKeeperSaslClient.clientTunneledAuthenticationInProgress();
+        }
+
+        public void sendPacket(Packet p) throws IOException {
+            clientCnxnSocket.sendPacket(p);
+        }
     }
 
     /**
@@ -1250,7 +1290,11 @@ public class ClientCnxn {
 
     private volatile States state = States.NOT_CONNECTED;
 
-    synchronized private int getXid() {
+    /*
+     * getXid() is called externally by ClientCnxnNIO::doIO() when packets are sent from the outgoingQueue to
+     * the server. Thus, getXid() must be public.
+     */
+    synchronized public int getXid() {
         return xid++;
     }
 
@@ -1268,15 +1312,37 @@ public class ClientCnxn {
         return r;
     }
 
+    public void enableWrite() {
+        sendThread.getClientCnxnSocket().enableWrite();
+    }
+
+    public void sendPacket(Record request, Record response, AsyncCallback cb, int opCode)
+    throws IOException {
+        // Generate Xid now because it will be sent immediately,
+        // by call to sendThread.sendPacket() below.
+        int xid = getXid();
+        RequestHeader h = new RequestHeader();
+        h.setXid(xid);
+        h.setType(opCode);
+
+        ReplyHeader r = new ReplyHeader();
+        r.setXid(xid);
+
+        Packet p = new Packet(h, r, request, response, null, false);
+        p.cb = cb;
+        sendThread.sendPacket(p);
+    }
+
     Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
             Record response, AsyncCallback cb, String clientPath,
             String serverPath, Object ctx, WatchRegistration watchRegistration)
     {
         Packet packet = null;
+
+        // Note that we do not generate the Xid for the packet yet. It is
+        // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
+        // where the packet is actually sent.
         synchronized (outgoingQueue) {
-            if (h.getType() != OpCode.ping && h.getType() != OpCode.auth) {
-                h.setXid(getXid());
-            }
             packet = new Packet(h, r, request, response, watchRegistration);
             packet.cb = cb;
             packet.ctx = ctx;

Modified: zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java?rev=1382554&r1=1382553&r2=1382554&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java (original)
+++ zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java Sun Sep  9 18:22:06 2012
@@ -160,11 +160,15 @@ abstract class ClientCnxnSocket {
 
     abstract void enableWrite();
 
+    abstract void disableWrite();
+
     abstract void enableReadWriteOnly();
 
     abstract void doTransport(int waitTimeOut, List<Packet> pendingQueue,
-            LinkedList<Packet> outgoingQueue) throws IOException,
-            InterruptedException;
+            LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
+            throws IOException, InterruptedException;
 
     abstract void testableCloseSocket() throws IOException;
+
+    abstract void sendPacket(Packet p) throws IOException;
 }

Modified: zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java?rev=1382554&r1=1382553&r2=1382554&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java (original)
+++ zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java Sun Sep  9 18:22:06 2012
@@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
@@ -57,7 +58,8 @@ public class ClientCnxnSocketNIO extends
      * @throws InterruptedException
      * @throws IOException
      */
-    void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue) throws InterruptedException, IOException {
+    void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
+      throws InterruptedException, IOException {
         SocketChannel sock = (SocketChannel) sockKey.channel();
         if (sock == null) {
             throw new IOException("Socket is null!");
@@ -78,7 +80,10 @@ public class ClientCnxnSocketNIO extends
                 } else if (!initialized) {
                     readConnectResult();
                     enableRead();
-                    if (!outgoingQueue.isEmpty()) {
+                    if (findSendablePacket(outgoingQueue,
+                            cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
+                        // Since SASL authentication has completed (if client is configured to do so),
+                        // outgoing packets waiting in the outgoingQueue can now be sent.
                         enableWrite();
                     }
                     lenBuffer.clear();
@@ -95,26 +100,82 @@ public class ClientCnxnSocketNIO extends
         }
         if (sockKey.isWritable()) {
             LinkedList<Packet> pending = new LinkedList<Packet>();
-            synchronized (outgoingQueue) {
-                if (!outgoingQueue.isEmpty()) {
+            Packet p = null;
+            synchronized(outgoingQueue) {
+                p = findSendablePacket(outgoingQueue,
+                        cnxn.sendThread.clientTunneledAuthenticationInProgress());
+
+                if (p != null) {
+                    outgoingQueue.removeFirstOccurrence(p);
                     updateLastSend();
-                    ByteBuffer pbb = outgoingQueue.getFirst().bb;
+                    if ((p.requestHeader != null) &&
+                            (p.requestHeader.getType() != OpCode.ping) &&
+                            (p.requestHeader.getType() != OpCode.auth)) {
+                        p.requestHeader.setXid(cnxn.getXid());
+                    }
+                    p.createBB();
+                    ByteBuffer pbb = p.bb;
                     sock.write(pbb);
                     if (!pbb.hasRemaining()) {
                         sentCount++;
-                        Packet p = outgoingQueue.removeFirst();
                         if (p.requestHeader != null
                                 && p.requestHeader.getType() != OpCode.ping
                                 && p.requestHeader.getType() != OpCode.auth) {
                             pending.add(p);
                         }
                     }
+                } else {
+                    // No suitable packet to send: turn off write interest flag.
+                    // Will be turned on later by a later call to enableWrite(),
+                    // from within ZooKeeperSaslClient (if client is configured
+                    // to attempt SASL authentication), or in either doIO() or
+                    // in doTransport() if not.
+                    disableWrite();
                 }
             }
             synchronized(pendingQueue) {
                 pendingQueue.addAll(pending);
             }
+
+        }
+    }
+
+    private Packet findSendablePacket(LinkedList<Packet> outgoingQueue,
+                                      boolean clientTunneledAuthenticationInProgress) {
+        synchronized (outgoingQueue) {
+            if (!outgoingQueue.isEmpty()) {
+                if (clientTunneledAuthenticationInProgress) {
+                    Packet p = null;
+                    // Since client's authentication with server is in progress,
+                    // send only the null-header packet queued by primeConnection().
+                    // This packet must be sent so that the SASL authentication process
+                    // can proceed, but all other packets should wait until
+                    // SASL authentication completes.
+                    Iterator<Packet> iter = outgoingQueue.listIterator();
+                    while(iter.hasNext()) {
+                        p = iter.next();
+                        if (p.requestHeader == null) {
+                            // We've found the priming-packet.
+                            return p;
+                        } else {
+                            // Non-priming packet: defer it until later, leaving it in the queue
+                            // until authentication completes.
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("deferring non-priming packet: " + p +
+                                        "until SASL authentication completes.");
+                            }
+                        }
+                    }
+                    // no sendable packet found.
+                    return null;
+                } else {
+                    // Tunnelled authentication is not in progress: just
+                    // send the first packet in the queue.
+                    return outgoingQueue.getFirst();
+                }
+            }
         }
+        return null;
     }
 
     @Override
@@ -200,7 +261,7 @@ public class ClientCnxnSocketNIO extends
     void registerAndConnect(SocketChannel sock, InetSocketAddress addr) 
     throws IOException {
         sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
-        boolean immediateConnect = sock.connect(addr);            
+        boolean immediateConnect = sock.connect(addr);
         if (immediateConnect) {
             sendThread.primeConnection();
         }
@@ -269,7 +330,8 @@ public class ClientCnxnSocketNIO extends
     }
     
     @Override
-    void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue )
+    void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
+                     ClientCnxn cnxn)
             throws IOException, InterruptedException {
         selector.select(waitTimeOut);
         Set<SelectionKey> selected;
@@ -288,15 +350,14 @@ public class ClientCnxnSocketNIO extends
                     sendThread.primeConnection();
                 }
             } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
-                doIO(pendingQueue, outgoingQueue);
+                doIO(pendingQueue, outgoingQueue, cnxn);
             }
         }
         if (sendThread.getZkState().isConnected()) {
             synchronized(outgoingQueue) {
-                if (!outgoingQueue.isEmpty()) {
+                if (findSendablePacket(outgoingQueue,
+                        cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
                     enableWrite();
-                } else {
-                    disableWrite();
                 }
             }
         }
@@ -318,7 +379,8 @@ public class ClientCnxnSocketNIO extends
         }
     }
 
-    private synchronized void disableWrite() {
+    @Override
+    public synchronized void disableWrite() {
         int i = sockKey.interestOps();
         if ((i & SelectionKey.OP_WRITE) != 0) {
             sockKey.interestOps(i & (~SelectionKey.OP_WRITE));
@@ -340,4 +402,17 @@ public class ClientCnxnSocketNIO extends
     Selector getSelector() {
         return selector;
     }
+
+    @Override
+    void sendPacket(Packet p) throws IOException {
+        SocketChannel sock = (SocketChannel) sockKey.channel();
+        if (sock == null) {
+            throw new IOException("Socket is null!");
+        }
+        p.createBB();
+        ByteBuffer pbb = p.bb;
+        sock.write(pbb);
+    }
+
+
 }

Modified: zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/client/ZooKeeperSaslClient.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/client/ZooKeeperSaslClient.java?rev=1382554&r1=1382553&r2=1382554&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/client/ZooKeeperSaslClient.java (original)
+++ zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/client/ZooKeeperSaslClient.java Sun Sep  9 18:22:06 2012
@@ -21,17 +21,17 @@ package org.apache.zookeeper.client;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.ClientCnxn;
 import org.apache.zookeeper.Login;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.Environment;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.proto.GetSASLRequest;
-import org.apache.zookeeper.proto.ReplyHeader;
-import org.apache.zookeeper.proto.RequestHeader;
 import org.apache.zookeeper.proto.SetSASLResponse;
 import org.apache.zookeeper.server.auth.KerberosName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.security.Principal;
 import java.security.PrivilegedActionException;
 import java.security.PrivilegedExceptionAction;
@@ -68,6 +68,7 @@ public class ZooKeeperSaslClient {
 
     private SaslState saslState = SaslState.INITIAL;
 
+    private boolean gotLastPacket = false;
     /** informational message indicating the current configuration status */
     private final String configStatus;
 
@@ -166,7 +167,7 @@ public class ZooKeeperSaslClient {
         public void processResult(int rc, String path, Object ctx, byte data[], Stat stat) {
             // processResult() is used by ClientCnxn's sendThread to respond to
             // data[] contains the Zookeeper Server's SASL token.
-            // ctx is the ZooKeeperSaslClient object. We use this object's prepareSaslResponseToServer() method
+            // ctx is the ZooKeeperSaslClient object. We use this object's respondToServer() method
             // to reply to the Zookeeper Server's SASL token
             ZooKeeperSaslClient client = ((ClientCnxn)ctx).zooKeeperSaslClient;
             if (client == null) {
@@ -181,7 +182,7 @@ public class ZooKeeperSaslClient {
                 usedata = new byte[0];
                 LOG.debug("ServerSaslResponseCallback(): using empty data[] as server response (length="+usedata.length+")");
             }
-            client.prepareSaslResponseToServer(usedata, (ClientCnxn)ctx);
+            client.respondToServer(usedata, (ClientCnxn)ctx);
         }
     }
 
@@ -242,38 +243,50 @@ public class ZooKeeperSaslClient {
                     return null;
                 }
             }
-        }
-        catch (LoginException e) {
+        } catch (LoginException e) {
+            // We throw LoginExceptions...
             throw e;
-        }
-        catch (Exception e) {
+        } catch (Exception e) {
+            // ..but consume (with a log message) all other types of exceptions.
             LOG.error("Exception while trying to create SASL client: " + e);
             return null;
         }
     }
 
-    private void prepareSaslResponseToServer(byte[] serverToken, ClientCnxn cnxn) {
-        saslToken = serverToken;
-
+    public void respondToServer(byte[] serverToken, ClientCnxn cnxn) {
         if (saslClient == null) {
             LOG.error("saslClient is unexpectedly null. Cannot respond to server's SASL message; ignoring.");
             return;
         }
 
-        LOG.debug("saslToken (server) length: " + saslToken.length);
         if (!(saslClient.isComplete())) {
             try {
-                saslToken = createSaslToken(saslToken);
+                saslToken = createSaslToken(serverToken);
                 if (saslToken != null) {
-                    LOG.debug("saslToken (client) length: " + saslToken.length);
-                    queueSaslPacket(saslToken, cnxn);
+                    sendSaslPacket(saslToken, cnxn);
                 }
             } catch (SaslException e) {
                 LOG.error("SASL authentication failed using login context '" +
-                this.getLoginContext() + "'.");
+                        this.getLoginContext() + "'.");
                 saslState = SaslState.FAILED;
+                gotLastPacket = true;
             }
         }
+
+        if (saslClient.isComplete()) {
+            // GSSAPI: server sends a final packet after authentication succeeds
+            // or fails.
+            if ((serverToken == null) && (saslClient.getMechanismName() == "GSSAPI"))
+                gotLastPacket = true;
+            // non-GSSAPI: no final packet from server.
+            if (saslClient.getMechanismName() != "GSSAPI") {
+                gotLastPacket = true;
+            }
+            // SASL authentication is completed, successfully or not:
+            // enable the socket's writable flag so that any packets waiting for authentication to complete in
+            // the outgoing queue will be sent to the Zookeeper server.
+            cnxn.enableWrite();
+        }
     }
 
     private byte[] createSaslToken() throws SaslException {
@@ -284,6 +297,7 @@ public class ZooKeeperSaslClient {
     private byte[] createSaslToken(final byte[] saslToken) throws SaslException {
         if (saslToken == null) {
             // TODO: introspect about runtime environment (such as jaas.conf)
+            saslState = SaslState.FAILED;
             throw new SaslException("Error in authenticating with a Zookeeper Quorum member: the quorum member's saslToken is null.");
         }
 
@@ -314,6 +328,7 @@ public class ZooKeeperSaslClient {
                     }
                     error += " Zookeeper Client will go to AUTH_FAILED state.";
                     LOG.error(error);
+                    saslState = SaslState.FAILED;
                     throw new SaslException(error);
                 }
             }
@@ -324,51 +339,73 @@ public class ZooKeeperSaslClient {
         }
     }
 
-    private void queueSaslPacket(byte[] saslToken, ClientCnxn cnxn) {
-        LOG.debug("ClientCnxn:sendSaslPacket:length="+saslToken.length);
-        RequestHeader h = new RequestHeader();
-        h.setType(ZooDefs.OpCode.sasl);
+    private void sendSaslPacket(byte[] saslToken, ClientCnxn cnxn)
+      throws SaslException{
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("ClientCnxn:sendSaslPacket:length="+saslToken.length);
+        }
+
         GetSASLRequest request = new GetSASLRequest();
         request.setToken(saslToken);
         SetSASLResponse response = new SetSASLResponse();
         ServerSaslResponseCallback cb = new ServerSaslResponseCallback();
-        ReplyHeader r = new ReplyHeader();
-        cnxn.queuePacket(h,r,request,response,cb);
+
+        try {
+            cnxn.sendPacket(request,response,cb, ZooDefs.OpCode.sasl);
+        } catch (IOException e) {
+            throw new SaslException("Failed to send SASL packet to server.",
+                e);
+        }
     }
-    
-    private void queueSaslPacket(ClientCnxn cnxn) throws SaslException {
-        queueSaslPacket(createSaslToken(), cnxn);
+
+    private void sendSaslPacket(ClientCnxn cnxn) throws SaslException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("ClientCnxn:sendSaslPacket:length="+saslToken.length);
+        }
+        GetSASLRequest request = new GetSASLRequest();
+        request.setToken(createSaslToken());
+        SetSASLResponse response = new SetSASLResponse();
+        ServerSaslResponseCallback cb = new ServerSaslResponseCallback();
+        try {
+            cnxn.sendPacket(request,response,cb, ZooDefs.OpCode.sasl);
+        } catch (IOException e) {
+            throw new SaslException("Failed to send SASL packet to server due " +
+              "to IOException:", e);
+        }
     }
 
-    // used by ClientCnxn to know when to emit SaslAuthenticated event.
-    // transitions internally from INTERMEDIATE to COMPLETE as a side effect if
-    // it's ready to emit this event.
-    public boolean readyToSendSaslAuthEvent() {
+    // used by ClientCnxn to know whether to emit a SASL-related event: either AuthFailed or SaslAuthenticated,
+    // or none, if not ready yet. Sets saslState to COMPLETE as a side-effect.
+    public KeeperState getKeeperState() {
         if (saslClient != null) {
+            if (saslState == SaslState.FAILED) {
+              return KeeperState.AuthFailed;
+            }
             if (saslClient.isComplete()) {
                 if (saslState == SaslState.INTERMEDIATE) {
                     saslState = SaslState.COMPLETE;
-                    return true;
+                    return KeeperState.SaslAuthenticated;
                 }
             }
         }
-        else {
-            LOG.warn("saslClient is null: client could not authenticate properly.");
-        }
-        return false;
+        // No event ready to emit yet.
+        return null;
     }
 
+    // Initialize the client's communications with the Zookeeper server by sending the server the first
+    // authentication packet.
     public void initialize(ClientCnxn cnxn) throws SaslException {
         if (saslClient == null) {
+            saslState = SaslState.FAILED;
             throw new SaslException("saslClient failed to initialize properly: it's null.");
         }
         if (saslState == SaslState.INITIAL) {
             if (saslClient.hasInitialResponse()) {
-                queueSaslPacket(cnxn);
+                sendSaslPacket(cnxn);
             }
             else {
                 byte[] emptyToken = new byte[0];
-                queueSaslPacket(emptyToken, cnxn);
+                sendSaslPacket(emptyToken, cnxn);
             }
             saslState = SaslState.INTERMEDIATE;
         }
@@ -442,4 +479,44 @@ public class ZooKeeperSaslClient {
             }
         }
     }
+
+    public boolean clientTunneledAuthenticationInProgress() {
+        // TODO: Rather than checking a disjunction here, should be a single member
+        // variable or method in this class to determine whether the client is
+        // configured to use SASL. (see also ZOOKEEPER-1455).
+        try {
+            if ((System.getProperty(Environment.JAAS_CONF_KEY) != null) ||
+                (javax.security.auth.login.Configuration.getConfiguration() != null)) {
+                // Client is configured to use SASL.
+
+                // 1. Authentication hasn't finished yet: we must wait for it to do so.
+                if ((isComplete() == false) &&
+                    (isFailed() == false)) {
+                    return true;
+                }
+
+                // 2. SASL authentication has succeeded or failed..
+                if (isComplete() || isFailed()) {
+                    if (gotLastPacket == false) {
+                        // ..but still in progress, because there is a final SASL
+                        // message from server which must be received.
+                    return true;
+                    }
+                }
+            }
+            // Either client is not configured to use a tunnelled authentication
+            // scheme, or tunnelled authentication has completed (successfully or
+            // not), and all server SASL messages have been received.
+            return false;
+        } catch (SecurityException e) {
+            // Thrown if the caller does not have permission to retrieve the Configuration.
+            // In this case, simply returning false is correct.
+            if (LOG.isDebugEnabled() == true) {
+                LOG.debug("Could not retrieve login configuration: " + e);
+            }
+            return false;
+        }
+    }
+
+
 }

Modified: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/SaslAuthDesignatedClientTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/SaslAuthDesignatedClientTest.java?rev=1382554&r1=1382553&r2=1382554&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/SaslAuthDesignatedClientTest.java (original)
+++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/SaslAuthDesignatedClientTest.java Sun Sep  9 18:22:06 2012
@@ -85,7 +85,6 @@ public class SaslAuthDesignatedClientTes
     @Test
     public void testAuth() throws Exception {
         ZooKeeper zk = createClient();
-        Thread.sleep(1000);
         try {
             zk.create("/path1", null, Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);
             Thread.sleep(1000);

Modified: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/SaslAuthFailDesignatedClientTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/SaslAuthFailDesignatedClientTest.java?rev=1382554&r1=1382553&r2=1382554&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/SaslAuthFailDesignatedClientTest.java (original)
+++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/SaslAuthFailDesignatedClientTest.java Sun Sep  9 18:22:06 2012
@@ -85,7 +85,6 @@ public class SaslAuthFailDesignatedClien
     @Test
     public void testAuth() throws Exception {
         ZooKeeper zk = createClient();
-        Thread.sleep(1000);
         try {
             zk.create("/path1", null, Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);
             Assert.fail("Should have gotten exception.");

Modified: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/SaslAuthFailTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/SaslAuthFailTest.java?rev=1382554&r1=1382553&r2=1382554&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/SaslAuthFailTest.java (original)
+++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/SaslAuthFailTest.java Sun Sep  9 18:22:06 2012
@@ -74,7 +74,10 @@ public class SaslAuthFailTest extends Cl
         @Override
         public synchronized void process(WatchedEvent event) {
             if (event.getState() == KeeperState.AuthFailed) {
-                authFailed.incrementAndGet();
+                synchronized(authFailed) {
+                    authFailed.incrementAndGet();
+                    authFailed.notify();
+                }
             }
             else {
                 super.process(event);
@@ -85,7 +88,10 @@ public class SaslAuthFailTest extends Cl
     @Test
     public void testBadSaslAuthNotifiesWatch() throws Exception {
         ZooKeeper zk = createClient();
-        Thread.sleep(1000);
+        // wait for authFailed event from client's EventThread.
+        synchronized(authFailed) {
+            authFailed.wait();
+        }
         Assert.assertEquals(authFailed.get(),1);
         zk.close();
     }
@@ -94,7 +100,6 @@ public class SaslAuthFailTest extends Cl
     @Test
     public void testAuthFail() throws Exception {
         ZooKeeper zk = createClient();
-        Thread.sleep(1000);
         try {
             zk.create("/path1", null, Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);
             Assert.fail("Should have gotten exception.");

Modified: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/SaslAuthMissingClientConfigTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/SaslAuthMissingClientConfigTest.java?rev=1382554&r1=1382553&r2=1382554&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/SaslAuthMissingClientConfigTest.java (original)
+++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/SaslAuthMissingClientConfigTest.java Sun Sep  9 18:22:06 2012
@@ -83,7 +83,6 @@ public class SaslAuthMissingClientConfig
     @Test
     public void testAuth() throws Exception {
         ZooKeeper zk = createClient();
-        Thread.sleep(1000);
         try {
             zk.create("/path1", null, Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);
             Assert.fail("Should have gotten exception.");

Modified: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/SaslAuthTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/SaslAuthTest.java?rev=1382554&r1=1382553&r2=1382554&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/SaslAuthTest.java (original)
+++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/SaslAuthTest.java Sun Sep  9 18:22:06 2012
@@ -89,7 +89,6 @@ public class SaslAuthTest extends Client
     @Test
     public void testAuth() throws Exception {
         ZooKeeper zk = createClient();
-        Thread.sleep(1000);
         try {
             zk.create("/path1", null, Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);
             Thread.sleep(1000);
@@ -101,7 +100,6 @@ public class SaslAuthTest extends Client
     @Test
     public void testValidSaslIds() throws Exception {
         ZooKeeper zk = createClient();
-        Thread.sleep(1000);
 
         List<String> validIds = new ArrayList<String>();
         validIds.add("user");
@@ -122,7 +120,6 @@ public class SaslAuthTest extends Client
     @Test
     public void testInvalidSaslIds() throws Exception {
         ZooKeeper zk = createClient();
-        Thread.sleep(1000);
 
         List<String> invalidIds = new ArrayList<String>();
         invalidIds.add("user@KERB.REALM/server.com");