You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ph...@apache.org on 2016/12/06 00:17:02 UTC

[4/4] zookeeper git commit: ZOOKEEPER-1045: Support Quorum Peer mutual authentication via SASL (rakeshr via phunt)

ZOOKEEPER-1045: Support Quorum Peer mutual authentication via SASL (rakeshr via phunt)

Change-Id: I7ae6bd863d46621bba5b9abc908e1497111e0336


Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/8a06bd1c
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/8a06bd1c
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/8a06bd1c

Branch: refs/heads/branch-3.4
Commit: 8a06bd1ccef382461c7b0a63f2012f4aeac90753
Parents: 967c3a7
Author: Rakesh Radhakrishnan <ra...@apache.org>
Authored: Mon Dec 5 16:15:37 2016 -0800
Committer: Patrick Hunt <ph...@apache.org>
Committed: Mon Dec 5 16:15:37 2016 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   6 +
 build.xml                                       |   7 +-
 ivy.xml                                         |  28 +
 src/java/main/org/apache/zookeeper/Login.java   |   7 +-
 .../zookeeper/SaslClientCallbackHandler.java    | 104 +++
 .../zookeeper/client/ZooKeeperSaslClient.java   | 161 +---
 .../zookeeper/server/ZooKeeperSaslServer.java   | 114 +--
 .../server/auth/SaslServerCallbackHandler.java  |  10 +-
 .../server/quorum/FastLeaderElection.java       |   2 +
 .../zookeeper/server/quorum/Follower.java       |   5 +-
 .../apache/zookeeper/server/quorum/Leader.java  |  12 +-
 .../apache/zookeeper/server/quorum/Learner.java |  24 +-
 .../zookeeper/server/quorum/LearnerHandler.java |  27 +-
 .../zookeeper/server/quorum/Observer.java       |   8 +-
 .../server/quorum/QuorumCnxManager.java         | 347 +++++--
 .../zookeeper/server/quorum/QuorumPeer.java     | 181 +++-
 .../server/quorum/QuorumPeerConfig.java         |  48 +-
 .../zookeeper/server/quorum/QuorumPeerMain.java |  39 +-
 .../quorum/auth/NullQuorumAuthLearner.java      |  33 +
 .../quorum/auth/NullQuorumAuthServer.java       |  34 +
 .../server/quorum/auth/QuorumAuth.java          |  96 ++
 .../server/quorum/auth/QuorumAuthLearner.java   |  40 +
 .../server/quorum/auth/QuorumAuthServer.java    |  41 +
 .../quorum/auth/SaslQuorumAuthLearner.java      | 230 +++++
 .../quorum/auth/SaslQuorumAuthServer.java       | 180 ++++
 .../auth/SaslQuorumServerCallbackHandler.java   | 148 +++
 .../apache/zookeeper/util/SecurityUtils.java    | 298 ++++++
 src/java/test/data/kerberos/minikdc-krb5.conf   |  30 +
 src/java/test/data/kerberos/minikdc.ldiff       |  52 ++
 .../zookeeper/server/quorum/CnxManagerTest.java |  15 +-
 .../quorum/FLEBackwardElectionRoundTest.java    |   4 +-
 .../server/quorum/FLECompatibilityTest.java     |   4 +-
 .../server/quorum/FLEDontCareTest.java          |  10 +-
 .../server/quorum/FLELostMessageTest.java       |   2 +-
 .../zookeeper/server/quorum/LearnerTest.java    |   4 +-
 .../server/quorum/QuorumCnxManagerTest.java     | 925 +++++++++++++++++++
 .../server/quorum/QuorumPeerTestBase.java       |  65 +-
 .../zookeeper/server/quorum/Zab1_0Test.java     |  44 +-
 .../quorum/auth/KerberosSecurityTestcase.java   | 120 +++
 .../server/quorum/auth/KerberosTestUtils.java   |  76 ++
 .../zookeeper/server/quorum/auth/MiniKdc.java   | 418 +++++++++
 .../server/quorum/auth/MiniKdcTest.java         | 184 ++++
 .../server/quorum/auth/QuorumAuthTestBase.java  | 146 +++
 .../quorum/auth/QuorumAuthUpgradeTest.java      | 239 +++++
 .../quorum/auth/QuorumDigestAuthTest.java       | 221 +++++
 .../quorum/auth/QuorumKerberosAuthTest.java     | 110 +++
 .../auth/QuorumKerberosHostBasedAuthTest.java   | 184 ++++
 .../apache/zookeeper/test/FLEPredicateTest.java |   2 +-
 src/zookeeper.jute                              |   5 +
 49 files changed, 4655 insertions(+), 435 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2b69758..eb9ef86 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -31,6 +31,12 @@ IMPROVEMENTS:
   ZOOKEEPER-2606: SaslServerCallbackHandler#handleAuthorizeCallback() should
   log the exception (Ted Yu via fpj)
 
+NEW FEATURE:
+
+  ZOOKEEPER-1045: Support Quorum Peer mutual authentication via SASL
+  (rakeshr via phunt)
+
+
 Release 3.4.9 - 2016-08-23
 
 Backward compatible changes:

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 505597a..5c4fab2 100644
--- a/build.xml
+++ b/build.xml
@@ -75,6 +75,7 @@ xmlns:maven="antlib:org.apache.maven.artifact.ant">
     <property name="test.data.upgrade.dir" value="${test.data.dir}/upgrade" />
     <property name="test.data.invalid.dir" value="${test.data.dir}/invalidsnap" />
     <property name="test.data.buffersize.dir" value="${test.data.dir}/buffersize" />
+    <property name="test.data.kerberos.dir" value="${test.data.dir}/kerberos" />
     <property name="test.cppunit.dir" value="${test.java.build.dir}/test-cppunit"/>
     <property name="test.tmp.dir" value="${test.java.build.dir}/tmp" />
     <property name="test.output" value="no" />
@@ -1242,6 +1243,7 @@ xmlns:maven="antlib:org.apache.maven.artifact.ant">
         <delete dir="${test.data.upgrade.dir}" />
         <delete dir="${test.data.invalid.dir}" />
         <delete dir="${test.data.buffersize.dir}" />
+        <delete dir="${test.data.kerberos.dir}" />
         <delete dir="${test.data.dir}" />
         <mkdir dir="${test.log.dir}" />
         <mkdir dir="${test.tmp.dir}" />
@@ -1258,7 +1260,10 @@ xmlns:maven="antlib:org.apache.maven.artifact.ant">
         <copy todir="${test.data.buffersize.dir}">
             <fileset dir="${basedir}/src/java/test/data/buffersize"/>
         </copy>
-       
+        <mkdir dir="${test.data.kerberos.dir}" />
+        <copy todir="${test.data.kerberos.dir}">
+            <fileset dir="${basedir}/src/java/test/data/kerberos"/>
+        </copy>
     </target>
 
     <condition property="quicktest">

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/ivy.xml
----------------------------------------------------------------------
diff --git a/ivy.xml b/ivy.xml
index 95b0e5a..437b86b 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -74,6 +74,34 @@
                 rev="2.4" conf="releaseaudit->default"/>
     <dependency org="commons-collections" name="commons-collections" 
                 rev="3.2.2" conf="releaseaudit->default"/>
+
+    <dependency org="commons-io" name="commons-io" rev="2.4"
+                conf="test->default"/>
+
+    <dependency org="org.apache.kerby" name="kerb-simplekdc" rev="1.0.0-RC2"
+                conf="test->default"/>
+    <dependency org="org.apache.kerby" name="kerby-config" rev="1.0.0-RC2"
+                conf="test->default"/>
+    <dependency org="org.apache.kerby" name="kerb-core" rev="1.0.0-RC2"
+                conf="test->default"/>
+    <dependency org="org.apache.kerby" name="kerb-server" rev="1.0.0-RC2"
+                conf="test->default"/>
+    <dependency org="org.apache.kerby" name="kerb-common" rev="1.0.0-RC2"
+                conf="test->default"/>
+    <dependency org="org.apache.kerby" name="kerb-admin" rev="1.0.0-RC2"
+                conf="test->default"/>
+    <dependency org="org.apache.kerby" name="kerb-identity" rev="1.0.0-RC2"
+                conf="test->default"/>
+    <dependency org="org.apache.kerby" name="kerb-client" rev="1.0.0-RC2"
+                conf="test->default"/>
+    <dependency org="org.apache.kerby" name="kerb-util" rev="1.0.0-RC2"
+                conf="test->default"/>
+    <dependency org="org.apache.kerby" name="kerb-crypto" rev="1.0.0-RC2"
+                conf="test->default"/>
+    <dependency org="org.apache.kerby" name="kerby-util" rev="1.0.0-RC2"
+                conf="test->default"/>
+    <dependency org="org.apache.kerby" name="kerby-asn1" rev="1.0.0-RC2"
+                conf="test->default"/>
   </dependencies>
 
 </ivy-module>

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/main/org/apache/zookeeper/Login.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/Login.java b/src/java/main/org/apache/zookeeper/Login.java
index aaa220c..3e21aae 100644
--- a/src/java/main/org/apache/zookeeper/Login.java
+++ b/src/java/main/org/apache/zookeeper/Login.java
@@ -32,8 +32,9 @@ import javax.security.auth.login.LoginContext;
 import javax.security.auth.login.LoginException;
 import javax.security.auth.callback.CallbackHandler;
 
-import org.apache.log4j.Logger;
 import org.apache.zookeeper.client.ZooKeeperSaslClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import javax.security.auth.kerberos.KerberosTicket;
 import javax.security.auth.Subject;
 import java.util.Date;
@@ -41,7 +42,7 @@ import java.util.Random;
 import java.util.Set;
 
 public class Login {
-    Logger LOG = Logger.getLogger(Login.class);
+    private static final Logger LOG = LoggerFactory.getLogger(Login.class);
     public CallbackHandler callbackHandler;
 
     // LoginThread will sleep until 80% of time from last refresh to
@@ -291,7 +292,7 @@ public class Login {
         }
         LoginContext loginContext = new LoginContext(loginContextName,callbackHandler);
         loginContext.login();
-        LOG.info("successfully logged in.");
+        LOG.info("{} successfully logged in.", loginContextName);
         return loginContext;
     }
 

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/main/org/apache/zookeeper/SaslClientCallbackHandler.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/SaslClientCallbackHandler.java b/src/java/main/org/apache/zookeeper/SaslClientCallbackHandler.java
new file mode 100644
index 0000000..d6f5549
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/SaslClientCallbackHandler.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is used by the SASL mechanisms to get further information to complete
+ * the authentication. For example, a SASL mechanism might use this callback
+ * handler to do verification operation. The CallbackHandler interface here
+ * refers to javax.security.auth.callback.CallbackHandler. It should not be
+ * confused with ZooKeeper packet callbacks like
+ * org.apache.zookeeper.server.auth.SaslServerCallbackHandler.
+ */
+public class SaslClientCallbackHandler implements CallbackHandler {
+    private String password = null;
+    private static final Logger LOG = LoggerFactory.getLogger(SaslClientCallbackHandler.class);
+    private final String entity;
+    public SaslClientCallbackHandler(String password, String client) {
+        this.password = password;
+        this.entity = client;
+    }
+
+    public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
+        for (Callback callback : callbacks) {
+            if (callback instanceof NameCallback) {
+                NameCallback nc = (NameCallback) callback;
+                nc.setName(nc.getDefaultName());
+            }
+            else {
+                if (callback instanceof PasswordCallback) {
+                    PasswordCallback pc = (PasswordCallback)callback;
+                    if (password != null) {
+                        pc.setPassword(this.password.toCharArray());
+                    } else {
+                        LOG.warn("Could not login: the {} is being asked for a password, but the ZooKeeper {}" +
+                          " code does not currently support obtaining a password from the user." +
+                          " Make sure that the {} is configured to use a ticket cache (using" +
+                          " the JAAS configuration setting 'useTicketCache=true)' and restart the {}. If" +
+                          " you still get this message after that, the TGT in the ticket cache has expired and must" +
+                          " be manually refreshed. To do so, first determine if you are using a password or a" +
+                          " keytab. If the former, run kinit in a Unix shell in the environment of the user who" +
+                          " is running this Zookeeper {} using the command" +
+                          " 'kinit <princ>' (where <princ> is the name of the {}'s Kerberos principal)." +
+                          " If the latter, do" +
+                          " 'kinit -k -t <keytab> <princ>' (where <princ> is the name of the Kerberos principal, and" +
+                          " <keytab> is the location of the keytab file). After manually refreshing your cache," +
+                          " restart this {}. If you continue to see this message after manually refreshing" +
+                          " your cache, ensure that your KDC host's clock is in sync with this host's clock.",
+                          new Object[]{entity, entity, entity, entity, entity, entity, entity});
+                    }
+                }
+                else {
+                    if (callback instanceof RealmCallback) {
+                        RealmCallback rc = (RealmCallback) callback;
+                        rc.setText(rc.getDefaultText());
+                    }
+                    else {
+                        if (callback instanceof AuthorizeCallback) {
+                            AuthorizeCallback ac = (AuthorizeCallback) callback;
+                            String authid = ac.getAuthenticationID();
+                            String authzid = ac.getAuthorizationID();
+                            if (authid.equals(authzid)) {
+                                ac.setAuthorized(true);
+                            } else {
+                                ac.setAuthorized(false);
+                            }
+                            if (ac.isAuthorized()) {
+                                ac.setAuthorizedID(authzid);
+                            }
+                        }
+                        else {
+                            throw new UnsupportedCallbackException(callback, "Unrecognized SASL " + entity + "Callback");
+                        }
+                    }
+                }
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/main/org/apache/zookeeper/client/ZooKeeperSaslClient.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/client/ZooKeeperSaslClient.java b/src/java/main/org/apache/zookeeper/client/ZooKeeperSaslClient.java
index 21ef0fa..f3c9d3c 100644
--- a/src/java/main/org/apache/zookeeper/client/ZooKeeperSaslClient.java
+++ b/src/java/main/org/apache/zookeeper/client/ZooKeeperSaslClient.java
@@ -19,22 +19,13 @@
 package org.apache.zookeeper.client;
 
 import java.io.IOException;
-import java.security.Principal;
 import java.security.PrivilegedActionException;
 import java.security.PrivilegedExceptionAction;
 
 import javax.security.auth.Subject;
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
 import javax.security.auth.login.AppConfigurationEntry;
 import javax.security.auth.login.Configuration;
 import javax.security.auth.login.LoginException;
-import javax.security.sasl.AuthorizeCallback;
-import javax.security.sasl.RealmCallback;
-import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslClient;
 import javax.security.sasl.SaslException;
 
@@ -42,17 +33,13 @@ import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.ClientCnxn;
 import org.apache.zookeeper.Environment;
 import org.apache.zookeeper.Login;
+import org.apache.zookeeper.SaslClientCallbackHandler;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.proto.GetSASLRequest;
 import org.apache.zookeeper.proto.SetSASLResponse;
-import org.apache.zookeeper.server.auth.KerberosName;
-import org.ietf.jgss.GSSContext;
-import org.ietf.jgss.GSSCredential;
-import org.ietf.jgss.GSSException;
-import org.ietf.jgss.GSSManager;
-import org.ietf.jgss.Oid;
+import org.apache.zookeeper.util.SecurityUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -226,83 +213,14 @@ public class ZooKeeperSaslClient {
                         }
                         // note that the login object is static: it's shared amongst all zookeeper-related connections.
                         // in order to ensure the login is initialized only once, it must be synchronized the code snippet.
-                        login = new Login(loginContext, new ClientCallbackHandler(null));
+                        login = new Login(loginContext, new SaslClientCallbackHandler(null, "Client"));
                         login.startThreadIfNeeded();
                         initializedLogin = true;
                     }
                 }
             }
-            Subject subject = login.getSubject();
-            SaslClient saslClient;
-            // Use subject.getPrincipals().isEmpty() as an indication of which SASL mechanism to use:
-            // if empty, use DIGEST-MD5; otherwise, use GSSAPI.
-            if (subject.getPrincipals().isEmpty()) {
-                // no principals: must not be GSSAPI: use DIGEST-MD5 mechanism instead.
-                LOG.info("Client will use DIGEST-MD5 as SASL mechanism.");
-                String[] mechs = {"DIGEST-MD5"};
-                String username = (String)(subject.getPublicCredentials().toArray()[0]);
-                String password = (String)(subject.getPrivateCredentials().toArray()[0]);
-                // "zk-sasl-md5" is a hard-wired 'domain' parameter shared with zookeeper server code (see ServerCnxnFactory.java)
-                saslClient = Sasl.createSaslClient(mechs, username, "zookeeper", "zk-sasl-md5", null, new ClientCallbackHandler(password));
-                return saslClient;
-            }
-            else { // GSSAPI.
-            	boolean usingNativeJgss =
-            			Boolean.getBoolean("sun.security.jgss.native");
-            	if (usingNativeJgss) {
-            		// http://docs.oracle.com/javase/6/docs/technotes/guides/security/jgss/jgss-features.html
-            		// """
-            		// In addition, when performing operations as a particular
-            		// Subject, e.g. Subject.doAs(...) or Subject.doAsPrivileged(...),
-            		// the to-be-used GSSCredential should be added to Subject's
-            		// private credential set. Otherwise, the GSS operations will
-            		// fail since no credential is found.
-            		// """
-            		try {
-            			GSSManager manager = GSSManager.getInstance();
-            			Oid krb5Mechanism = new Oid("1.2.840.113554.1.2.2");
-            			GSSCredential cred = manager.createCredential(null,
-            					GSSContext.DEFAULT_LIFETIME,
-            					krb5Mechanism,
-            					GSSCredential.INITIATE_ONLY);
-            			subject.getPrivateCredentials().add(cred);
-            			if (LOG.isDebugEnabled()) {
-            				LOG.debug("Added private credential to subject: " + cred);
-            			}
-            		} catch (GSSException ex) {
-            			LOG.warn("Cannot add private credential to subject; " +
-            					"authentication at the server may fail", ex);
-            		}
-            	}
-                final Object[] principals = subject.getPrincipals().toArray();
-                // determine client principal from subject.
-                final Principal clientPrincipal = (Principal)principals[0];
-                final KerberosName clientKerberosName = new KerberosName(clientPrincipal.getName());
-                // assume that server and client are in the same realm (by default; unless the system property
-                // "zookeeper.server.realm" is set).
-                String serverRealm = System.getProperty("zookeeper.server.realm",clientKerberosName.getRealm());
-                KerberosName serviceKerberosName = new KerberosName(servicePrincipal+"@"+serverRealm);
-                final String serviceName = serviceKerberosName.getServiceName();
-                final String serviceHostname = serviceKerberosName.getHostName();
-                final String clientPrincipalName = clientKerberosName.toString();
-                try {
-                    saslClient = Subject.doAs(subject,new PrivilegedExceptionAction<SaslClient>() {
-                        public SaslClient run() throws SaslException {
-                            LOG.info("Client will use GSSAPI as SASL mechanism.");
-                            String[] mechs = {"GSSAPI"};
-                            LOG.debug("creating sasl client: client="+clientPrincipalName+";service="+serviceName+";serviceHostname="+serviceHostname);
-                            SaslClient saslClient = Sasl.createSaslClient(mechs,clientPrincipalName,serviceName,serviceHostname,null,new ClientCallbackHandler(null));
-                            return saslClient;
-                        }
-                    });
-                    return saslClient;
-                }
-                catch (Exception e) {
-                	LOG.error("Exception while trying to create SASL client", e);
-                    e.printStackTrace();
-                    return null;
-                }
-            }
+            return SecurityUtils.createSaslClient(login.getSubject(),
+                    servicePrincipal, "zookeeper", "zk-sasl-md5", LOG, "Client");
         } catch (LoginException e) {
             // We throw LoginExceptions...
             throw e;
@@ -471,75 +389,6 @@ public class ZooKeeperSaslClient {
         }
     }
 
-    // The CallbackHandler interface here refers to
-    // javax.security.auth.callback.CallbackHandler.
-    // It should not be confused with Zookeeper packet callbacks like
-    //  org.apache.zookeeper.server.auth.SaslServerCallbackHandler.
-    public static class ClientCallbackHandler implements CallbackHandler {
-        private String password = null;
-
-        public ClientCallbackHandler(String password) {
-            this.password = password;
-        }
-
-        public void handle(Callback[] callbacks) throws
-          UnsupportedCallbackException {
-            for (Callback callback : callbacks) {
-                if (callback instanceof NameCallback) {
-                    NameCallback nc = (NameCallback) callback;
-                    nc.setName(nc.getDefaultName());
-                }
-                else {
-                    if (callback instanceof PasswordCallback) {
-                        PasswordCallback pc = (PasswordCallback)callback;
-                        if (password != null) {
-                            pc.setPassword(this.password.toCharArray());
-                        } else {
-                            LOG.warn("Could not login: the client is being asked for a password, but the Zookeeper" +
-                              " client code does not currently support obtaining a password from the user." +
-                              " Make sure that the client is configured to use a ticket cache (using" +
-                              " the JAAS configuration setting 'useTicketCache=true)' and restart the client. If" +
-                              " you still get this message after that, the TGT in the ticket cache has expired and must" +
-                              " be manually refreshed. To do so, first determine if you are using a password or a" +
-                              " keytab. If the former, run kinit in a Unix shell in the environment of the user who" +
-                              " is running this Zookeeper client using the command" +
-                              " 'kinit <princ>' (where <princ> is the name of the client's Kerberos principal)." +
-                              " If the latter, do" +
-                              " 'kinit -k -t <keytab> <princ>' (where <princ> is the name of the Kerberos principal, and" +
-                              " <keytab> is the location of the keytab file). After manually refreshing your cache," +
-                              " restart this client. If you continue to see this message after manually refreshing" +
-                              " your cache, ensure that your KDC host's clock is in sync with this host's clock.");
-                        }
-                    }
-                    else {
-                        if (callback instanceof RealmCallback) {
-                            RealmCallback rc = (RealmCallback) callback;
-                            rc.setText(rc.getDefaultText());
-                        }
-                        else {
-                            if (callback instanceof AuthorizeCallback) {
-                                AuthorizeCallback ac = (AuthorizeCallback) callback;
-                                String authid = ac.getAuthenticationID();
-                                String authzid = ac.getAuthorizationID();
-                                if (authid.equals(authzid)) {
-                                    ac.setAuthorized(true);
-                                } else {
-                                    ac.setAuthorized(false);
-                                }
-                                if (ac.isAuthorized()) {
-                                    ac.setAuthorizedID(authzid);
-                                }
-                            }
-                            else {
-                                throw new UnsupportedCallbackException(callback,"Unrecognized SASL ClientCallback");
-                            }
-                        }
-                    }
-                }
-            }
-        }
-    }
-
     public boolean clientTunneledAuthenticationInProgress() {
     	if (!isSASLConfigured) {
     	    return false;

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/main/org/apache/zookeeper/server/ZooKeeperSaslServer.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/ZooKeeperSaslServer.java b/src/java/main/org/apache/zookeeper/server/ZooKeeperSaslServer.java
index 71870ce..dd6ee8f 100644
--- a/src/java/main/org/apache/zookeeper/server/ZooKeeperSaslServer.java
+++ b/src/java/main/org/apache/zookeeper/server/ZooKeeperSaslServer.java
@@ -18,22 +18,12 @@
 
 package org.apache.zookeeper.server;
 
-import java.security.Principal;
-import java.security.PrivilegedActionException;
-import java.security.PrivilegedExceptionAction;
-
 import javax.security.auth.Subject;
-import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslException;
 import javax.security.sasl.SaslServer;
 
 import org.apache.zookeeper.Login;
-import org.ietf.jgss.GSSContext;
-import org.ietf.jgss.GSSCredential;
-import org.ietf.jgss.GSSException;
-import org.ietf.jgss.GSSManager;
-import org.ietf.jgss.GSSName;
-import org.ietf.jgss.Oid;
+import org.apache.zookeeper.util.SecurityUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,107 +41,9 @@ public class ZooKeeperSaslServer {
     private SaslServer createSaslServer(final Login login) {
         synchronized (login) {
             Subject subject = login.getSubject();
-            if (subject != null) {
-                // server is using a JAAS-authenticated subject: determine service principal name and hostname from zk server's subject.
-                if (subject.getPrincipals().size() > 0) {
-                    try {
-                        final Object[] principals = subject.getPrincipals().toArray();
-                        final Principal servicePrincipal = (Principal)principals[0];
-
-                        // e.g. servicePrincipalNameAndHostname := "zookeeper/myhost.foo.com@FOO.COM"
-                        final String servicePrincipalNameAndHostname = servicePrincipal.getName();
-
-                        int indexOf = servicePrincipalNameAndHostname.indexOf("/");
-
-                        // e.g. servicePrincipalName := "zookeeper"
-                        final String servicePrincipalName = servicePrincipalNameAndHostname.substring(0, indexOf);
-
-                        // e.g. serviceHostnameAndKerbDomain := "myhost.foo.com@FOO.COM"
-                        final String serviceHostnameAndKerbDomain = servicePrincipalNameAndHostname.substring(indexOf+1,servicePrincipalNameAndHostname.length());
-
-                        indexOf = serviceHostnameAndKerbDomain.indexOf("@");
-                        // e.g. serviceHostname := "myhost.foo.com"
-                        final String serviceHostname = serviceHostnameAndKerbDomain.substring(0,indexOf);
-
-                        final String mech = "GSSAPI";   // TODO: should depend on zoo.cfg specified mechs, but if subject is non-null, it can be assumed to be GSSAPI.
-
-                        LOG.debug("serviceHostname is '"+ serviceHostname + "'");
-                        LOG.debug("servicePrincipalName is '"+ servicePrincipalName + "'");
-                        LOG.debug("SASL mechanism(mech) is '"+ mech +"'");
-
-                        boolean usingNativeJgss =
-                        		Boolean.getBoolean("sun.security.jgss.native");
-                        if (usingNativeJgss) {
-                        	// http://docs.oracle.com/javase/6/docs/technotes/guides/security/jgss/jgss-features.html
-                        	// """
-                        	// In addition, when performing operations as a particular
-                        	// Subject, e.g. Subject.doAs(...) or
-                        	// Subject.doAsPrivileged(...), the to-be-used
-                        	// GSSCredential should be added to Subject's
-                        	// private credential set. Otherwise, the GSS operations
-                        	// will fail since no credential is found.
-                        	// """
-                        	try {
-                        		GSSManager manager = GSSManager.getInstance();
-                        		Oid krb5Mechanism = new Oid("1.2.840.113554.1.2.2");
-                        		GSSName gssName = manager.createName(
-                        				servicePrincipalName + "@" + serviceHostname,
-                        				GSSName.NT_HOSTBASED_SERVICE);
-                        		GSSCredential cred = manager.createCredential(gssName,
-                        				GSSContext.DEFAULT_LIFETIME,
-                        				krb5Mechanism,
-                        				GSSCredential.ACCEPT_ONLY);
-                        		subject.getPrivateCredentials().add(cred);
-                        		if (LOG.isDebugEnabled()) {
-                        			LOG.debug("Added private credential to subject: " + cred);
-                        		}
-                        	} catch (GSSException ex) {
-                        		LOG.warn("Cannot add private credential to subject; " +
-                        				"clients authentication may fail", ex);
-                        	}
-                        }
-                        try {
-                            return Subject.doAs(subject,new PrivilegedExceptionAction<SaslServer>() {
-                                public SaslServer run() {
-                                    try {
-                                        SaslServer saslServer;
-                                        saslServer = Sasl.createSaslServer(mech, servicePrincipalName, serviceHostname, null, login.callbackHandler);
-                                        return saslServer;
-                                    }
-                                    catch (SaslException e) {
-                                        LOG.error("Zookeeper Server failed to create a SaslServer to interact with a client during session initiation: " + e);
-                                        e.printStackTrace();
-                                        return null;
-                                    }
-                                }
-                            }
-                            );
-                        }
-                        catch (PrivilegedActionException e) {
-                            // TODO: exit server at this point(?)
-                            LOG.error("Zookeeper Quorum member experienced a PrivilegedActionException exception while creating a SaslServer using a JAAS principal context:" + e);
-                            e.printStackTrace();
-                        }
-                    }
-                    catch (IndexOutOfBoundsException e) {
-                        LOG.error("server principal name/hostname determination error: ", e);
-                    }
-                }
-                else {
-                    // JAAS non-GSSAPI authentication: assuming and supporting only DIGEST-MD5 mechanism for now.
-                    // TODO: use 'authMech=' value in zoo.cfg.
-                    try {
-                        SaslServer saslServer = Sasl.createSaslServer("DIGEST-MD5","zookeeper","zk-sasl-md5",null, login.callbackHandler);
-                        return saslServer;
-                    }
-                    catch (SaslException e) {
-                        LOG.error("Zookeeper Quorum member failed to create a SaslServer to interact with a client during session initiation", e);
-                    }
-                }
-            }
+            return SecurityUtils.createSaslServer(subject, "zookeeper",
+                    "zk-sasl-md5", login.callbackHandler, LOG);
         }
-        LOG.error("failed to create saslServer object.");
-        return null;
     }
 
     public byte[] evaluateResponse(byte[] response) throws SaslException {

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/main/org/apache/zookeeper/server/auth/SaslServerCallbackHandler.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/auth/SaslServerCallbackHandler.java b/src/java/main/org/apache/zookeeper/server/auth/SaslServerCallbackHandler.java
index 7fdffde..9f53a4d 100644
--- a/src/java/main/org/apache/zookeeper/server/auth/SaslServerCallbackHandler.java
+++ b/src/java/main/org/apache/zookeeper/server/auth/SaslServerCallbackHandler.java
@@ -46,13 +46,15 @@ public class SaslServerCallbackHandler implements CallbackHandler {
     private String userName;
     private final Map<String,String> credentials = new HashMap<String,String>();
 
-    public SaslServerCallbackHandler(Configuration configuration) throws IOException {
-        String serverSection = System.getProperty(ZooKeeperSaslServer.LOGIN_CONTEXT_NAME_KEY,
-                                                  ZooKeeperSaslServer.DEFAULT_LOGIN_CONTEXT_NAME);
+    public SaslServerCallbackHandler(Configuration configuration)
+            throws IOException {
+        String serverSection = System.getProperty(
+                ZooKeeperSaslServer.LOGIN_CONTEXT_NAME_KEY,
+                ZooKeeperSaslServer.DEFAULT_LOGIN_CONTEXT_NAME);
         AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(serverSection);
 
         if (configurationEntries == null) {
-            String errorMessage = "Could not find a 'Server' entry in this configuration: Server cannot start.";
+            String errorMessage = "Could not find a '" + serverSection + "' entry in this configuration: Server cannot start.";
             LOG.error(errorMessage);
             throw new IOException(errorMessage);
         }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java b/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
index 78f3aa6..2a3d4fd 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
@@ -997,6 +997,8 @@ public class FastLeaderElection implements Election {
                 LOG.warn("Failed to unregister with JMX", e);
             }
             self.jmxLeaderElectionBean = null;
+            LOG.debug("Number of connection processing threads: {}",
+                    manager.getConnectionThreadCount());
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/Follower.java b/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
index 043a522..9aa0d0b 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 
 import org.apache.jute.Record;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.util.SerializeUtils;
 import org.apache.zookeeper.server.util.ZxidUtils;
 import org.apache.zookeeper.txn.TxnHeader;
@@ -64,9 +65,9 @@ public class Follower extends Learner{
         self.end_fle = 0;
         fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
         try {
-            InetSocketAddress addr = findLeader();            
+            QuorumServer leaderServer = findLeader();            
             try {
-                connectToLeader(addr);
+                connectToLeader(leaderServer.addr, leaderServer.hostname);
                 long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
 
                 //check to see if the leader zxid is lower than ours

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/Leader.java b/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
index c83d352..710745d 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
@@ -19,6 +19,7 @@
 package org.apache.zookeeper.server.quorum;
 
 import java.io.ByteArrayOutputStream;
+import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.net.BindException;
 import java.net.ServerSocket;
@@ -32,10 +33,12 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.security.sasl.SaslException;
 
 import org.apache.jute.BinaryOutputArchive;
 import org.apache.zookeeper.server.FinalRequestProcessor;
@@ -318,7 +321,10 @@ public class Leader {
                         // in LearnerHandler switch to the syncLimit
                         s.setSoTimeout(self.tickTime * self.initLimit);
                         s.setTcpNoDelay(nodelay);
-                        LearnerHandler fh = new LearnerHandler(s, Leader.this);
+
+                        BufferedInputStream is = new BufferedInputStream(
+                                s.getInputStream());
+                        LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
                         fh.start();
                     } catch (SocketException e) {
                         if (stop) {
@@ -332,6 +338,8 @@ public class Leader {
                         } else {
                             throw e;
                         }
+                    } catch (SaslException e){
+                        LOG.error("Exception while connecting to quorum learner", e);
                     }
                 }
             } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/Learner.java b/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
index 647b8a2..749b274 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
@@ -39,8 +39,6 @@ import org.apache.jute.BinaryOutputArchive;
 import org.apache.jute.InputArchive;
 import org.apache.jute.OutputArchive;
 import org.apache.jute.Record;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.ServerCnxn;
 import org.apache.zookeeper.server.ZooTrace;
@@ -48,6 +46,8 @@ import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.util.SerializeUtils;
 import org.apache.zookeeper.server.util.ZxidUtils;
 import org.apache.zookeeper.txn.TxnHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class is the superclass of two of the three main actors in a ZK
@@ -191,8 +191,8 @@ public class Learner {
     /**
      * Returns the address of the node we think is the leader.
      */
-    protected InetSocketAddress findLeader() {
-        InetSocketAddress addr = null;
+    protected QuorumServer findLeader() {
+        QuorumServer leaderServer = null;
         // Find the leader by id
         Vote current = self.getCurrentVote();
         for (QuorumServer s : self.getView().values()) {
@@ -200,27 +200,28 @@ public class Learner {
                 // Ensure we have the leader's correct IP address before
                 // attempting to connect.
                 s.recreateSocketAddresses();
-                addr = s.addr;
+                leaderServer = s;
                 break;
             }
         }
-        if (addr == null) {
+        if (leaderServer == null) {
             LOG.warn("Couldn't find the leader with id = "
                     + current.getId());
         }
-        return addr;
+        return leaderServer;
     }
     
     /**
      * Establish a connection with the Leader found by findLeader. Retries
      * 5 times before giving up. 
      * @param addr - the address of the Leader to connect to.
-     * @throws IOException - if the socket connection fails on the 5th attempt
+     * @throws IOException <li>if the socket connection fails on the 5th attempt</li>
+     * <li>if there is an authentication failure while connecting to leader</li>
      * @throws ConnectException
      * @throws InterruptedException
      */
-    protected void connectToLeader(InetSocketAddress addr) 
-    throws IOException, ConnectException, InterruptedException {
+    protected void connectToLeader(InetSocketAddress addr, String hostname)
+            throws IOException, ConnectException, InterruptedException {
         sock = new Socket();        
         sock.setSoTimeout(self.tickTime * self.initLimit);
         for (int tries = 0; tries < 5; tries++) {
@@ -241,6 +242,9 @@ public class Learner {
             }
             Thread.sleep(1000);
         }
+
+        self.authLearner.authenticate(sock, hostname);
+
         leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(
                 sock.getInputStream()));
         bufferedOutput = new BufferedOutputStream(sock.getOutputStream());

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java b/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
index 8a748c7..51ed7e7 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
@@ -32,6 +32,8 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 
+import javax.security.sasl.SaslException;
+
 import org.apache.jute.BinaryInputArchive;
 import org.apache.jute.BinaryOutputArchive;
 import org.apache.jute.Record;
@@ -153,15 +155,30 @@ public class LearnerHandler extends ZooKeeperThread {
 
     private BinaryOutputArchive oa;
 
+    private final BufferedInputStream bufferedInput;
     private BufferedOutputStream bufferedOutput;
 
-    LearnerHandler(Socket sock, Leader leader) throws IOException {
+    LearnerHandler(Socket sock, BufferedInputStream bufferedInput,
+                   Leader leader) throws IOException {
         super("LearnerHandler-" + sock.getRemoteSocketAddress());
         this.sock = sock;
         this.leader = leader;
-        leader.addLearnerHandler(this);
+        this.bufferedInput = bufferedInput;
+        try {
+            leader.self.authServer.authenticate(sock,
+                    new DataInputStream(bufferedInput));
+        } catch (IOException e) {
+            LOG.error("Server failed to authenticate quorum learner, addr: {}, closing connection",
+                    sock.getRemoteSocketAddress(), e);
+            try {
+                sock.close();
+            } catch (IOException ie) {
+                LOG.error("Exception while closing socket", ie);
+            }
+            throw new SaslException("Authentication failure: " + e.getMessage());
+        }
     }
-    
+
     @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();
@@ -296,11 +313,11 @@ public class LearnerHandler extends ZooKeeperThread {
     @Override
     public void run() {
         try {
+            leader.addLearnerHandler(this);
             tickOfNextAckDeadline = leader.self.tick
                     + leader.self.initLimit + leader.self.syncLimit;
 
-            ia = BinaryInputArchive.getArchive(new BufferedInputStream(sock
-                    .getInputStream()));
+            ia = BinaryInputArchive.getArchive(bufferedInput);
             bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
             oa = BinaryOutputArchive.getArchive(bufferedOutput);
 

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/main/org/apache/zookeeper/server/quorum/Observer.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/Observer.java b/src/java/main/org/apache/zookeeper/server/quorum/Observer.java
index e53f6f2..53f516f 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/Observer.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/Observer.java
@@ -19,11 +19,11 @@
 package org.apache.zookeeper.server.quorum;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 
 import org.apache.jute.Record;
 import org.apache.zookeeper.server.ObserverBean;
 import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.util.SerializeUtils;
 import org.apache.zookeeper.txn.TxnHeader;
 
@@ -61,10 +61,10 @@ public class Observer extends Learner{
         zk.registerJMX(new ObserverBean(this, zk), self.jmxLocalPeerBean);
 
         try {
-            InetSocketAddress addr = findLeader();
-            LOG.info("Observing " + addr);
+            QuorumServer leaderServer = findLeader();
+            LOG.info("Observing " + leaderServer.addr);
             try {
-                connectToLeader(addr);
+                connectToLeader(leaderServer.addr, leaderServer.hostname);
                 long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);
 
                 syncWithLeader(newLeaderZxid);

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
index 20e5f16..74d1c1e 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.zookeeper.server.quorum;
 
+import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -28,20 +29,28 @@ import java.net.SocketException;
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.nio.channels.UnresolvedAddressException;
+import java.util.Collections;
 import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.NoSuchElementException;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.Date;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.zookeeper.server.ZooKeeperThread;
+import org.apache.zookeeper.server.quorum.auth.QuorumAuthLearner;
+import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.ZooKeeperThread;
-
 /**
  * This class implements a connection manager for leader election using TCP. It
  * maintains one connection for every pair of servers. The tricky part is to
@@ -89,7 +98,7 @@ public class QuorumCnxManager {
      * Negative counter for observer server ids.
      */
     
-    private long observerCounter = -1;
+    private AtomicLong observerCounter = new AtomicLong(-1);
     
     /*
      * Connection time out value in milliseconds 
@@ -100,7 +109,20 @@ public class QuorumCnxManager {
     /*
      * Local IP address
      */
-    final QuorumPeer self;
+    final long mySid;
+    final int socketTimeout;
+    final Map<Long, QuorumPeer.QuorumServer> view;
+    final boolean listenOnAllIPs;
+    private ThreadPoolExecutor connectionExecutor;
+    private final Set<Long> inprogressConnections = Collections
+            .synchronizedSet(new HashSet<Long>());
+    private QuorumAuthServer authServer;
+    private QuorumAuthLearner authLearner;
+    private boolean quorumSaslAuthEnabled;
+    /*
+     * Counter to count connection processing threads.
+     */
+    private AtomicInteger connectionThreadCnt = new AtomicInteger(0);
 
     /*
      * Mapping from Peer to Thread number
@@ -145,7 +167,14 @@ public class QuorumCnxManager {
         long sid;
     }
 
-    public QuorumCnxManager(QuorumPeer self) {
+    public QuorumCnxManager(final long mySid,
+                            Map<Long,QuorumPeer.QuorumServer> view,
+                            QuorumAuthServer authServer,
+                            QuorumAuthLearner authLearner,
+                            int socketTimeout,
+                            boolean listenOnAllIPs,
+                            int quorumCnxnThreadsSize,
+                            boolean quorumSaslAuthEnabled) {
         this.recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY);
         this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
         this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
@@ -155,13 +184,53 @@ public class QuorumCnxManager {
         if(cnxToValue != null){
             this.cnxTO = new Integer(cnxToValue); 
         }
-        
-        this.self = self;
+
+        this.mySid = mySid;
+        this.socketTimeout = socketTimeout;
+        this.view = view;
+        this.listenOnAllIPs = listenOnAllIPs;
+
+        initializeAuth(mySid, authServer, authLearner, quorumCnxnThreadsSize,
+                quorumSaslAuthEnabled);
 
         // Starts listener thread that waits for connection requests 
         listener = new Listener();
     }
 
+    private void initializeAuth(final long mySid,
+            final QuorumAuthServer authServer,
+            final QuorumAuthLearner authLearner,
+            final int quorumCnxnThreadsSize,
+            final boolean quorumSaslAuthEnabled) {
+        this.authServer = authServer;
+        this.authLearner = authLearner;
+        this.quorumSaslAuthEnabled = quorumSaslAuthEnabled;
+        if (!this.quorumSaslAuthEnabled) {
+            LOG.debug("Not initializing connection executor as quorum sasl auth is disabled");
+            return;
+        }
+
+        // init connection executors
+        final AtomicInteger threadIndex = new AtomicInteger(1);
+        SecurityManager s = System.getSecurityManager();
+        final ThreadGroup group = (s != null) ? s.getThreadGroup()
+                : Thread.currentThread().getThreadGroup();
+        ThreadFactory daemonThFactory = new ThreadFactory() {
+
+            @Override
+            public Thread newThread(Runnable r) {
+                Thread t = new Thread(group, r, "QuorumConnectionThread-"
+                        + "[myid=" + mySid + "]-"
+                        + threadIndex.getAndIncrement());
+                return t;
+            }
+        };
+        this.connectionExecutor = new ThreadPoolExecutor(3,
+                quorumCnxnThreadsSize, 60, TimeUnit.SECONDS,
+                new SynchronousQueue<Runnable>(), daemonThFactory);
+        this.connectionExecutor.allowCoreThreadTimeOut(true);
+    }
+
     /**
      * Invokes initiateConnection for testing purposes
      * 
@@ -173,7 +242,8 @@ public class QuorumCnxManager {
         }
         Socket sock = new Socket();
         setSockOpts(sock);
-        sock.connect(self.getVotingView().get(sid).electionAddr, cnxTO);
+        sock.connect(QuorumPeer.viewToVotingView(view).get(sid).electionAddr,
+                     cnxTO);
         initiateConnection(sock, sid);
     }
     
@@ -181,28 +251,96 @@ public class QuorumCnxManager {
      * If this server has initiated the connection, then it gives up on the
      * connection if it loses challenge. Otherwise, it keeps the connection.
      */
-    public boolean initiateConnection(Socket sock, Long sid) {
+    public void initiateConnection(final Socket sock, final Long sid) {
+        try {
+            startConnection(sock, sid);
+        } catch (IOException e) {
+            LOG.error("Exception while connecting, id: {}, addr: {}, closing learner connection",
+                     new Object[] { sid, sock.getRemoteSocketAddress() }, e);
+            closeSocket(sock);
+            return;
+        }
+    }
+
+    /**
+     * Server will initiate the connection request to its peer server
+     * asynchronously via separate connection thread.
+     */
+    public void initiateConnectionAsync(final Socket sock, final Long sid) {
+        if(!inprogressConnections.add(sid)){
+            // simply return as there is a connection request to
+            // server 'sid' already in progress.
+            LOG.debug("Connection request to server id: {} is already in progress, so skipping this request",
+                    sid);
+            closeSocket(sock);
+            return;
+        }
+        try {
+            connectionExecutor.execute(
+                    new QuorumConnectionReqThread(sock, sid));
+            connectionThreadCnt.incrementAndGet();
+        } catch (Throwable e) {
+            // Imp: Safer side catching all type of exceptions and remove 'sid'
+            // from inprogress connections. This is to avoid blocking further
+            // connection requests from this 'sid' in case of errors.
+            inprogressConnections.remove(sid);
+            LOG.error("Exception while submitting quorum connection request", e);
+            closeSocket(sock);
+        }
+    }
+
+    /**
+     * Thread to send connection request to peer server.
+     */
+    private class QuorumConnectionReqThread extends ZooKeeperThread {
+        final Socket sock;
+        final Long sid;
+        QuorumConnectionReqThread(final Socket sock, final Long sid) {
+            super("QuorumConnectionReqThread-" + sid);
+            this.sock = sock;
+            this.sid = sid;
+        }
+
+        @Override
+        public void run() {
+            try{
+                initiateConnection(sock, sid);
+            } finally {
+                inprogressConnections.remove(sid);
+            }
+        }
+    }
+
+    private boolean startConnection(Socket sock, Long sid)
+            throws IOException {
         DataOutputStream dout = null;
+        DataInputStream din = null;
         try {
             // Sending id and challenge
             dout = new DataOutputStream(sock.getOutputStream());
-            dout.writeLong(self.getId());
+            dout.writeLong(this.mySid);
             dout.flush();
+
+            din = new DataInputStream(
+                    new BufferedInputStream(sock.getInputStream()));
         } catch (IOException e) {
             LOG.warn("Ignoring exception reading or writing challenge: ", e);
             closeSocket(sock);
             return false;
         }
-        
+
+        // authenticate learner
+        authLearner.authenticate(sock, view.get(sid).hostname);
+
         // If lost the challenge, then drop the new connection
-        if (sid > self.getId()) {
+        if (sid > this.mySid) {
             LOG.info("Have smaller server identifier, so dropping the " +
-                     "connection: (" + sid + ", " + self.getId() + ")");
+                     "connection: (" + sid + ", " + this.mySid + ")");
             closeSocket(sock);
             // Otherwise proceed with the connection
         } else {
             SendWorker sw = new SendWorker(sock, sid);
-            RecvWorker rw = new RecvWorker(sock, sid, sw);
+            RecvWorker rw = new RecvWorker(sock, din, sid, sw);
             sw.setRecv(rw);
 
             SendWorker vsw = senderWorkerMap.get(sid);
@@ -225,8 +363,6 @@ public class QuorumCnxManager {
         return false;
     }
 
-    
-    
     /**
      * If this server receives a connection request, then it gives up on the new
      * connection if it wins. Notice that it checks whether it has a connection
@@ -234,12 +370,57 @@ public class QuorumCnxManager {
      * possible long value to lose the challenge.
      * 
      */
-    public void receiveConnection(Socket sock) {
+    public void receiveConnection(final Socket sock) {
+        DataInputStream din = null;
+        try {
+            din = new DataInputStream(
+                    new BufferedInputStream(sock.getInputStream()));
+
+            handleConnection(sock, din);
+        } catch (IOException e) {
+            LOG.error("Exception handling connection, addr: {}, closing server connection",
+                     sock.getRemoteSocketAddress());
+            closeSocket(sock);
+        }
+    }
+
+    /**
+     * Server receives a connection request and handles it asynchronously via
+     * separate thread.
+     */
+    public void receiveConnectionAsync(final Socket sock) {
+        try {
+            connectionExecutor.execute(
+                    new QuorumConnectionReceiverThread(sock));
+            connectionThreadCnt.incrementAndGet();
+        } catch (Throwable e) {
+            LOG.error("Exception handling connection, addr: {}, closing server connection",
+                     sock.getRemoteSocketAddress());
+            closeSocket(sock);
+        }
+    }
+
+    /**
+     * Thread to receive connection request from peer server.
+     */
+    private class QuorumConnectionReceiverThread extends ZooKeeperThread {
+        private final Socket sock;
+        QuorumConnectionReceiverThread(final Socket sock) {
+            super("QuorumConnectionReceiverThread-" + sock.getRemoteSocketAddress());
+            this.sock = sock;
+        }
+
+        @Override
+        public void run() {
+            receiveConnection(sock);
+        }
+    }
+
+    private void handleConnection(Socket sock, DataInputStream din)
+            throws IOException {
         Long sid = null;
-        
         try {
             // Read server id
-            DataInputStream din = new DataInputStream(sock.getInputStream());
             sid = din.readLong();
             if (sid < 0) { // this is not a server id but a protocol version (see ZOOKEEPER-1633)
                 sid = din.readLong();
@@ -265,8 +446,7 @@ public class QuorumCnxManager {
                  * Choose identifier at random. We need a value to identify
                  * the connection.
                  */
-                
-                sid = observerCounter--;
+                sid = observerCounter.getAndDecrement();
                 LOG.info("Setting arbitrary identifier to observer: " + sid);
             }
         } catch (IOException e) {
@@ -274,9 +454,12 @@ public class QuorumCnxManager {
             LOG.warn("Exception reading or writing challenge: " + e.toString());
             return;
         }
-        
+
+        // do authenticating learner
+        authServer.authenticate(sock, din);
+
         //If wins the challenge, then close the new connection.
-        if (sid < self.getId()) {
+        if (sid < this.mySid) {
             /*
              * This replica might still believe that the connection to sid is
              * up, so we have to shut down the workers before trying to open a
@@ -297,7 +480,7 @@ public class QuorumCnxManager {
             // Otherwise start worker threads to receive data.
         } else {
             SendWorker sw = new SendWorker(sock, sid);
-            RecvWorker rw = new RecvWorker(sock, sid, sw);
+            RecvWorker rw = new RecvWorker(sock, din, sid, sw);
             sw.setRecv(rw);
 
             SendWorker vsw = senderWorkerMap.get(sid);
@@ -327,7 +510,7 @@ public class QuorumCnxManager {
         /*
          * If sending message to myself, then simply enqueue it (loopback).
          */
-        if (self.getId() == sid) {
+        if (this.mySid == sid) {
              b.position(0);
              addToRecvQueue(new Message(b.duplicate(), sid));
             /*
@@ -361,28 +544,32 @@ public class QuorumCnxManager {
      * 
      *  @param sid  server id
      */
-    
-    synchronized void connectOne(long sid){
-        if (senderWorkerMap.get(sid) == null){
+    synchronized public void connectOne(long sid){
+        if (!connectedToPeer(sid)){
             InetSocketAddress electionAddr;
-            if (self.quorumPeers.containsKey(sid)) {
-                electionAddr = self.quorumPeers.get(sid).electionAddr;
+            if (view.containsKey(sid)) {
+                electionAddr = view.get(sid).electionAddr;
             } else {
                 LOG.warn("Invalid server id: " + sid);
                 return;
             }
             try {
 
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Opening channel to server " + sid);
-                }
+                LOG.debug("Opening channel to server " + sid);
                 Socket sock = new Socket();
                 setSockOpts(sock);
-                sock.connect(self.getView().get(sid).electionAddr, cnxTO);
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Connected to server " + sid);
+                sock.connect(view.get(sid).electionAddr, cnxTO);
+                LOG.debug("Connected to server " + sid);
+
+                // Sends connection request asynchronously if the quorum
+                // sasl authentication is enabled. This is required because
+                // sasl server authentication process may take few seconds to
+                // finish, this may delay next peer connection requests.
+                if (quorumSaslAuthEnabled) {
+                    initiateConnectionAsync(sock, sid);
+                } else {
+                    initiateConnection(sock, sid);
                 }
-                initiateConnection(sock, sid);
             } catch (UnresolvedAddressException e) {
                 // Sun doesn't include the address that causes this
                 // exception to be thrown, also UAE cannot be wrapped cleanly
@@ -392,8 +579,8 @@ public class QuorumCnxManager {
                         + " at election address " + electionAddr, e);
                 // Resolve hostname for this server in case the
                 // underlying ip address has changed.
-                if (self.getView().containsKey(sid)) {
-                    self.getView().get(sid).recreateSocketAddresses();
+                if (view.containsKey(sid)) {
+                    view.get(sid).recreateSocketAddresses();
                 }
                 throw e;
             } catch (IOException e) {
@@ -403,8 +590,8 @@ public class QuorumCnxManager {
                 // We can't really tell if the server is actually down or it failed
                 // to connect to the server because the underlying IP address
                 // changed. Resolve the hostname again just in case.
-                if (self.getView().containsKey(sid)) {
-                    self.getView().get(sid).recreateSocketAddresses();
+                if (view.containsKey(sid)) {
+                    view.get(sid).recreateSocketAddresses();
                 }
             }
         } else {
@@ -451,6 +638,13 @@ public class QuorumCnxManager {
         listener.halt();
         
         softHalt();
+
+        // clear data structures used for auth
+        if (connectionExecutor != null) {
+            connectionExecutor.shutdown();
+        }
+        inprogressConnections.clear();
+        resetConnectionThreadCount();
     }
    
     /**
@@ -471,7 +665,7 @@ public class QuorumCnxManager {
      */
     private void setSockOpts(Socket sock) throws SocketException {
         sock.setTcpNoDelay(true);
-        sock.setSoTimeout(self.tickTime * self.syncLimit);
+        sock.setSoTimeout(socketTimeout);
     }
 
     /**
@@ -494,11 +688,19 @@ public class QuorumCnxManager {
     public long getThreadCount() {
         return threadCnt.get();
     }
+
+    /**
+     * Return number of connection processing threads.
+     */
+    public long getConnectionThreadCount() {
+        return connectionThreadCnt.get();
+    }
+
     /**
-     * Return reference to QuorumPeer
+     * Reset the value of connection processing threads count to zero.
      */
-    public QuorumPeer getQuorumPeer() {
-        return self;
+    private void resetConnectionThreadCount() {
+        connectionThreadCnt.set(0);
     }
 
     /**
@@ -525,22 +727,35 @@ public class QuorumCnxManager {
                 try {
                     ss = new ServerSocket();
                     ss.setReuseAddress(true);
-                    if (self.getQuorumListenOnAllIPs()) {
-                        int port = self.quorumPeers.get(self.getId()).electionAddr.getPort();
+                    if (listenOnAllIPs) {
+                        int port = view.get(QuorumCnxManager.this.mySid)
+                            .electionAddr.getPort();
                         addr = new InetSocketAddress(port);
                     } else {
-                        addr = self.quorumPeers.get(self.getId()).electionAddr;
+                        addr = view.get(QuorumCnxManager.this.mySid)
+                            .electionAddr;
                     }
                     LOG.info("My election bind port: " + addr.toString());
-                    setName(self.quorumPeers.get(self.getId()).electionAddr
-                            .toString());
+                    setName(view.get(QuorumCnxManager.this.mySid)
+                            .electionAddr.toString());
                     ss.bind(addr);
                     while (!shutdown) {
                         Socket client = ss.accept();
                         setSockOpts(client);
                         LOG.info("Received connection request "
                                 + client.getRemoteSocketAddress());
-                        receiveConnection(client);
+
+                        // Receive and handle the connection request
+                        // asynchronously if the quorum sasl authentication is
+                        // enabled. This is required because sasl server
+                        // authentication process may take few seconds to finish,
+                        // this may delay next peer connection requests.
+                        if (quorumSaslAuthEnabled) {
+                            receiveConnectionAsync(client);
+                        } else {
+                            receiveConnection(client);
+                        }
+
                         numRetries = 0;
                     }
                 } catch (IOException e) {
@@ -562,7 +777,7 @@ public class QuorumCnxManager {
                 LOG.error("As I'm leaving the listener thread, "
                         + "I won't be able to participate in leader "
                         + "election any longer: "
-                        + self.quorumPeers.get(self.getId()).electionAddr);
+                        + view.get(QuorumCnxManager.this.mySid).electionAddr);
             }
         }
         
@@ -573,7 +788,8 @@ public class QuorumCnxManager {
             try{
                 LOG.debug("Trying to close listener: " + ss);
                 if(ss != null) {
-                    LOG.debug("Closing listener: " + self.getId());
+                    LOG.debug("Closing listener: "
+                              + QuorumCnxManager.this.mySid);
                     ss.close();
                 }
             } catch (IOException e){
@@ -729,8 +945,9 @@ public class QuorumCnxManager {
                     }
                 }
             } catch (Exception e) {
-                LOG.warn("Exception when using channel: for id " + sid + " my id = " + 
-                        self.getId() + " error = " + e);
+                LOG.warn("Exception when using channel: for id " + sid
+                         + " my id = " + QuorumCnxManager.this.mySid
+                         + " error = " + e);
             }
             this.finish();
             LOG.warn("Send worker leaving thread");
@@ -745,16 +962,16 @@ public class QuorumCnxManager {
         Long sid;
         Socket sock;
         volatile boolean running = true;
-        DataInputStream din;
+        final DataInputStream din;
         final SendWorker sw;
 
-        RecvWorker(Socket sock, Long sid, SendWorker sw) {
+        RecvWorker(Socket sock, DataInputStream din, Long sid, SendWorker sw) {
             super("RecvWorker:" + sid);
             this.sid = sid;
             this.sock = sock;
             this.sw = sw;
+            this.din = din;
             try {
-                din = new DataInputStream(sock.getInputStream());
                 // OK to wait until socket disconnects while reading.
                 sock.setSoTimeout(0);
             } catch (IOException e) {
@@ -807,8 +1024,8 @@ public class QuorumCnxManager {
                     addToRecvQueue(new Message(message.duplicate(), sid));
                 }
             } catch (Exception e) {
-                LOG.warn("Connection broken for id " + sid + ", my id = " + 
-                        self.getId() + ", error = " , e);
+                LOG.warn("Connection broken for id " + sid + ", my id = "
+                         + QuorumCnxManager.this.mySid + ", error = " , e);
             } finally {
                 LOG.warn("Interrupting SendWorker");
                 sw.finish();
@@ -930,4 +1147,8 @@ public class QuorumCnxManager {
        throws InterruptedException {
        return recvQueue.poll(timeout, unit);
     }
+
+    public boolean connectedToPeer(long peerSid) {
+        return senderWorkerMap.get(peerSid) != null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
index 2f0f21b..2dbedcf 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
@@ -34,8 +34,12 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+
+import javax.security.sasl.SaslException;
 
 import org.apache.zookeeper.common.AtomicFileOutputStream;
 import org.apache.zookeeper.jmx.MBeanRegistry;
@@ -45,6 +49,13 @@ import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.ZooKeeperThread;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.auth.QuorumAuth;
+import org.apache.zookeeper.server.quorum.auth.QuorumAuthLearner;
+import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
+import org.apache.zookeeper.server.quorum.auth.SaslQuorumAuthLearner;
+import org.apache.zookeeper.server.quorum.auth.SaslQuorumAuthServer;
+import org.apache.zookeeper.server.quorum.auth.NullQuorumAuthLearner;
+import org.apache.zookeeper.server.quorum.auth.NullQuorumAuthServer;
 import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.server.util.ZxidUtils;
@@ -85,6 +96,8 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
     LocalPeerBean jmxLocalPeerBean;
     LeaderElectionBean jmxLeaderElectionBean;
     QuorumCnxManager qcm;
+    QuorumAuthServer authServer;
+    QuorumAuthLearner authLearner;
 
     /* ZKDatabase is a top level member of quorumpeer 
      * which will be used in all the zookeeperservers
@@ -102,7 +115,8 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
             this.electionAddr = electionAddr;
         }
 
-        private QuorumServer(long id, InetSocketAddress addr) {
+        // VisibleForTesting
+        public QuorumServer(long id, InetSocketAddress addr) {
             this.id = id;
             this.addr = addr;
             this.electionAddr = null;
@@ -338,6 +352,50 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
     protected boolean quorumListenOnAllIPs = false;
 
     /**
+     * Enable/Disables quorum authentication using sasl. Defaulting to false.
+     */
+    protected boolean quorumSaslEnableAuth;
+
+    /**
+     * If this is false, quorum peer server will accept another quorum peer client
+     * connection even if the authentication did not succeed. This can be used while
+     * upgrading ZooKeeper server. Defaulting to false (required).
+     */
+    protected boolean quorumServerSaslAuthRequired;
+
+    /**
+     * If this is false, quorum peer learner will talk to quorum peer server
+     * without authentication. This can be used while upgrading ZooKeeper
+     * server. Defaulting to false (required).
+     */
+    protected boolean quorumLearnerSaslAuthRequired;
+
+    /**
+     * Kerberos quorum service principal. Defaulting to 'zkquorum/localhost'.
+     */
+    protected String quorumServicePrincipal;
+
+    /**
+     * Quorum learner login context name in jaas-conf file to read the kerberos
+     * security details. Defaulting to 'QuorumLearner'.
+     */
+    protected String quorumLearnerLoginContext;
+
+    /**
+     * Quorum server login context name in jaas-conf file to read the kerberos
+     * security details. Defaulting to 'QuorumServer'.
+     */
+    protected String quorumServerLoginContext;
+
+    // TODO: need to tune the default value of thread size
+    private static final int QUORUM_CNXN_THREADS_SIZE_DEFAULT_VALUE = 20;
+    /**
+     * The maximum number of threads to allow in the connectionExecutors thread
+     * pool which will be used to initiate quorum server connections.
+     */
+    protected int quorumCnxnThreadsSize = QUORUM_CNXN_THREADS_SIZE_DEFAULT_VALUE;
+
+    /**
      * @deprecated As of release 3.4.0, this class has been deprecated, since
      * it is used with one of the udp-based versions of leader election, which
      * we are also deprecating. 
@@ -449,10 +507,15 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
     private FileTxnSnapLog logFactory = null;
 
     private final QuorumStats quorumStats;
-    
-    public QuorumPeer() {
+
+    public static QuorumPeer testingQuorumPeer() throws SaslException {
+        return new QuorumPeer();
+    }
+
+    private QuorumPeer() throws SaslException {
         super("QuorumPeer");
         quorumStats = new QuorumStats(this);
+        initialize();
     }
     
    
@@ -490,7 +553,24 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
             this.quorumConfig = new QuorumMaj(countParticipants(quorumPeers));
         else this.quorumConfig = quorumConfig;
     }
-    
+
+    public void initialize() throws SaslException {
+        // init quorum auth server & learner
+        if (isQuorumSaslAuthEnabled()) {
+            Set<String> authzHosts = new HashSet<String>();
+            for (QuorumServer qs : getView().values()) {
+                authzHosts.add(qs.hostname);
+            }
+            authServer = new SaslQuorumAuthServer(isQuorumServerSaslAuthRequired(),
+                    quorumServerLoginContext, authzHosts);
+            authLearner = new SaslQuorumAuthLearner(isQuorumLearnerSaslAuthRequired(),
+                    quorumServicePrincipal, quorumLearnerLoginContext);
+        } else {
+            authServer = new NullQuorumAuthServer();
+            authLearner = new NullQuorumAuthLearner();
+        }
+    }
+
     QuorumStats quorumStats() {
         return quorumStats;
     }
@@ -686,7 +766,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
             le = new AuthFastLeaderElection(this, true);
             break;
         case 3:
-            qcm = new QuorumCnxManager(this);
+            qcm = createCnxnManager();
             QuorumCnxManager.Listener listener = qcm.listener;
             if(listener != null){
                 listener.start();
@@ -903,33 +983,37 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
             zkDb.close();
         } catch (IOException ie) {
             LOG.warn("Error closing logs ", ie);
-        }     
+        }
     }
 
     /**
      * A 'view' is a node's current opinion of the membership of the entire
-     * ensemble.    
+     * ensemble.
      */
     public Map<Long,QuorumPeer.QuorumServer> getView() {
         return Collections.unmodifiableMap(this.quorumPeers);
     }
-    
+
     /**
      * Observers are not contained in this view, only nodes with 
-     * PeerType=PARTICIPANT.     
+     * PeerType=PARTICIPANT.
      */
     public Map<Long,QuorumPeer.QuorumServer> getVotingView() {
-        Map<Long,QuorumPeer.QuorumServer> ret = 
+        return QuorumPeer.viewToVotingView(getView());
+    }
+
+    static Map<Long,QuorumPeer.QuorumServer> viewToVotingView(
+            Map<Long,QuorumPeer.QuorumServer> view) {
+        Map<Long,QuorumPeer.QuorumServer> ret =
             new HashMap<Long, QuorumPeer.QuorumServer>();
-        Map<Long,QuorumPeer.QuorumServer> view = getView();
-        for (QuorumServer server : view.values()) {            
+        for (QuorumServer server : view.values()) {
             if (server.type == LearnerType.PARTICIPANT) {
                 ret.put(server.id, server);
             }
-        }        
+        }
         return ret;
     }
-    
+
     /**
      * Returns only observers, no followers.
      */
@@ -1306,4 +1390,73 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
         }
     }
 
+    void setQuorumServerSaslRequired(boolean serverSaslRequired) {
+        quorumServerSaslAuthRequired = serverSaslRequired;
+        LOG.info("{} set to {}", QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED,
+                serverSaslRequired);
+    }
+
+    void setQuorumLearnerSaslRequired(boolean learnerSaslRequired) {
+        quorumLearnerSaslAuthRequired = learnerSaslRequired;
+        LOG.info("{} set to {}", QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED,
+                learnerSaslRequired);
+    }
+
+    void setQuorumSaslEnabled(boolean enableAuth) {
+        quorumSaslEnableAuth = enableAuth;
+        if (!quorumSaslEnableAuth) {
+            LOG.info("QuorumPeer communication is not secured!");
+        } else {
+            LOG.info("{} set to {}",
+                    QuorumAuth.QUORUM_SASL_AUTH_ENABLED, enableAuth);
+        }
+    }
+
+    void setQuorumServicePrincipal(String servicePrincipal) {
+        quorumServicePrincipal = servicePrincipal;
+        LOG.info("{} set to {}",QuorumAuth.QUORUM_KERBEROS_SERVICE_PRINCIPAL,
+                quorumServicePrincipal);
+    }
+
+    void setQuorumLearnerLoginContext(String learnerContext) {
+        quorumLearnerLoginContext = learnerContext;
+        LOG.info("{} set to {}", QuorumAuth.QUORUM_LEARNER_SASL_LOGIN_CONTEXT,
+                quorumLearnerLoginContext);
+    }
+
+    void setQuorumServerLoginContext(String serverContext) {
+        quorumServerLoginContext = serverContext;
+        LOG.info("{} set to {}", QuorumAuth.QUORUM_SERVER_SASL_LOGIN_CONTEXT,
+                quorumServerLoginContext);
+    }
+
+    void setQuorumCnxnThreadsSize(int qCnxnThreadsSize) {
+        if (qCnxnThreadsSize > QUORUM_CNXN_THREADS_SIZE_DEFAULT_VALUE) {
+            quorumCnxnThreadsSize = qCnxnThreadsSize;
+        }
+        LOG.info("quorum.cnxn.threads.size set to {}", quorumCnxnThreadsSize);
+    }
+
+    boolean isQuorumSaslAuthEnabled() {
+        return quorumSaslEnableAuth;
+    }
+
+    private boolean isQuorumServerSaslAuthRequired() {
+        return quorumServerSaslAuthRequired;
+    }
+
+    private boolean isQuorumLearnerSaslAuthRequired() {
+        return quorumLearnerSaslAuthRequired;
+    }
+
+    public QuorumCnxManager createCnxnManager() {
+        return new QuorumCnxManager(this.getId(),
+                                    this.getView(),
+                                    this.authServer,
+                                    this.authLearner,
+                                    this.tickTime * this.syncLimit,
+                                    this.getQuorumListenOnAllIPs(),
+                                    this.quorumCnxnThreadsSize,
+                                    this.isQuorumSaslAuthEnabled());
+    }
 }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
index 0924ef6..621f830 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
@@ -38,6 +38,7 @@ import org.slf4j.MDC;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.auth.QuorumAuth;
 import org.apache.zookeeper.server.quorum.flexible.QuorumHierarchical;
 import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
@@ -75,7 +76,16 @@ public class QuorumPeerConfig {
     protected boolean syncEnabled = true;
 
     protected LearnerType peerType = LearnerType.PARTICIPANT;
-    
+
+    /** Configurations for the quorumpeer-to-quorumpeer sasl authentication */
+    protected boolean quorumServerRequireSasl = false;
+    protected boolean quorumLearnerRequireSasl = false;
+    protected boolean quorumEnableSasl = false;
+    protected String quorumServicePrincipal = QuorumAuth.QUORUM_KERBEROS_SERVICE_PRINCIPAL_DEFAULT_VALUE;
+    protected String quorumLearnerLoginContext = QuorumAuth.QUORUM_LEARNER_SASL_LOGIN_CONTEXT_DFAULT_VALUE;
+    protected String quorumServerLoginContext = QuorumAuth.QUORUM_SERVER_SASL_LOGIN_CONTEXT_DFAULT_VALUE;
+    protected int quorumCnxnThreadsSize;
+
     /**
      * Minimum snapshot retain count.
      * @see org.apache.zookeeper.server.PurgeTxnLog#purge(File, File, int)
@@ -246,11 +256,45 @@ public class QuorumPeerConfig {
                 int dot = key.indexOf('.');
                 long sid = Long.parseLong(key.substring(dot + 1));
                 serverWeight.put(sid, Long.parseLong(value));
+            } else if (key.equals(QuorumAuth.QUORUM_SASL_AUTH_ENABLED)) {
+                quorumEnableSasl = Boolean.parseBoolean(value);
+            } else if (key.equals(QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED)) {
+                quorumServerRequireSasl = Boolean.parseBoolean(value);
+            } else if (key.equals(QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED)) {
+                quorumLearnerRequireSasl = Boolean.parseBoolean(value);
+            } else if (key.equals(QuorumAuth.QUORUM_LEARNER_SASL_LOGIN_CONTEXT)) {
+                quorumLearnerLoginContext = value;
+            } else if (key.equals(QuorumAuth.QUORUM_SERVER_SASL_LOGIN_CONTEXT)) {
+                quorumServerLoginContext = value;
+            } else if (key.equals(QuorumAuth.QUORUM_KERBEROS_SERVICE_PRINCIPAL)) {
+                quorumServicePrincipal = value;
+            } else if (key.equals("quorum.cnxn.threads.size")) {
+                quorumCnxnThreadsSize = Integer.parseInt(value);
             } else {
                 System.setProperty("zookeeper." + key, value);
             }
         }
-        
+        if (!quorumEnableSasl && quorumServerRequireSasl) {
+            throw new IllegalArgumentException(
+                    QuorumAuth.QUORUM_SASL_AUTH_ENABLED
+                    + " is disabled, so cannot enable "
+                    + QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED);
+        }
+        if (!quorumEnableSasl && quorumLearnerRequireSasl) {
+            throw new IllegalArgumentException(
+                    QuorumAuth.QUORUM_SASL_AUTH_ENABLED
+                    + " is disabled, so cannot enable "
+                    + QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED);
+        }
+        // If quorumpeer learner is not auth enabled then self won't be able to
+        // join quorum. So this condition is ensuring that the quorumpeer learner
+        // is also auth enabled while enabling quorum server require sasl.
+        if (!quorumLearnerRequireSasl && quorumServerRequireSasl) {
+            throw new IllegalArgumentException(
+                    QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED
+                    + " is disabled, so cannot enable "
+                    + QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED);
+        }
         // Reset to MIN_SNAP_RETAIN_COUNT if invalid (less than 3)
         // PurgeTxnLog.purge(File, File, int) will not allow to purge less
         // than 3.