You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ph...@apache.org on 2016/12/06 00:17:01 UTC

[3/4] zookeeper git commit: ZOOKEEPER-1045: Support Quorum Peer mutual authentication via SASL (rakeshr via phunt)

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
index e9c8007..4ea7e54 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
@@ -130,26 +130,37 @@ public class QuorumPeerMain {
           cnxnFactory.configure(config.getClientPortAddress(),
                                 config.getMaxClientCnxns());
   
-          quorumPeer = new QuorumPeer();
+          quorumPeer = new QuorumPeer(config.getServers(),
+                                      new File(config.getDataDir()),
+                                      new File(config.getDataLogDir()),
+                                      config.getElectionAlg(),
+                                      config.getServerId(),
+                                      config.getTickTime(),
+                                      config.getInitLimit(),
+                                      config.getSyncLimit(),
+                                      config.getQuorumListenOnAllIPs(),
+                                      cnxnFactory,
+                                      config.getQuorumVerifier());
           quorumPeer.setClientPortAddress(config.getClientPortAddress());
-          quorumPeer.setTxnFactory(new FileTxnSnapLog(
-                      new File(config.getDataLogDir()),
-                      new File(config.getDataDir())));
-          quorumPeer.setQuorumPeers(config.getServers());
-          quorumPeer.setElectionType(config.getElectionAlg());
-          quorumPeer.setMyid(config.getServerId());
-          quorumPeer.setTickTime(config.getTickTime());
           quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
           quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
-          quorumPeer.setInitLimit(config.getInitLimit());
-          quorumPeer.setSyncLimit(config.getSyncLimit());
-          quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
-          quorumPeer.setCnxnFactory(cnxnFactory);
           quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
           quorumPeer.setLearnerType(config.getPeerType());
           quorumPeer.setSyncEnabled(config.getSyncEnabled());
-          quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
-  
+
+          // sets quorum sasl authentication configurations
+          quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
+          if(quorumPeer.isQuorumSaslAuthEnabled()){
+              quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
+              quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
+              quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
+              quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
+              quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
+          }
+
+          quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
+          quorumPeer.initialize();
+
           quorumPeer.start();
           quorumPeer.join();
       } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/main/org/apache/zookeeper/server/quorum/auth/NullQuorumAuthLearner.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/auth/NullQuorumAuthLearner.java b/src/java/main/org/apache/zookeeper/server/quorum/auth/NullQuorumAuthLearner.java
new file mode 100644
index 0000000..0af891c
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/quorum/auth/NullQuorumAuthLearner.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum.auth;
+
+import java.net.Socket;
+
+/**
+ * This class represents no authentication learner, it just return
+ * without performing any authentication.
+ */
+public class NullQuorumAuthLearner implements QuorumAuthLearner {
+
+    @Override
+    public void authenticate(Socket sock, String hostname) {
+        return; // simply return don't require auth
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/main/org/apache/zookeeper/server/quorum/auth/NullQuorumAuthServer.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/auth/NullQuorumAuthServer.java b/src/java/main/org/apache/zookeeper/server/quorum/auth/NullQuorumAuthServer.java
new file mode 100644
index 0000000..b26a54a
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/quorum/auth/NullQuorumAuthServer.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum.auth;
+
+import java.io.DataInputStream;
+import java.net.Socket;
+
+/**
+ * This class represents no authentication server, it just return
+ * without performing any authentication.
+ */
+public class NullQuorumAuthServer implements QuorumAuthServer {
+
+    @Override
+    public void authenticate(final Socket sock, final DataInputStream din) {
+        return; // simply return don't require auth
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/main/org/apache/zookeeper/server/quorum/auth/QuorumAuth.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/auth/QuorumAuth.java b/src/java/main/org/apache/zookeeper/server/quorum/auth/QuorumAuth.java
new file mode 100644
index 0000000..8bfa394
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/quorum/auth/QuorumAuth.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum.auth;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import org.apache.jute.BinaryInputArchive;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.zookeeper.server.quorum.QuorumAuthPacket;
+
+public class QuorumAuth {
+    private static final Logger LOG = LoggerFactory.getLogger(QuorumAuth.class);
+
+    public static final String QUORUM_SASL_AUTH_ENABLED = "quorum.auth.enableSasl";
+    public static final String QUORUM_SERVER_SASL_AUTH_REQUIRED = "quorum.auth.serverRequireSasl";
+    public static final String QUORUM_LEARNER_SASL_AUTH_REQUIRED = "quorum.auth.learnerRequireSasl";
+
+    public static final String QUORUM_KERBEROS_SERVICE_PRINCIPAL = "quorum.auth.kerberos.servicePrincipal";
+    public static final String QUORUM_KERBEROS_SERVICE_PRINCIPAL_DEFAULT_VALUE = "zkquorum/localhost";
+
+    public static final String QUORUM_LEARNER_SASL_LOGIN_CONTEXT = "quorum.auth.learner.saslLoginContext";
+    public static final String QUORUM_LEARNER_SASL_LOGIN_CONTEXT_DFAULT_VALUE = "QuorumLearner";
+
+    public static final String QUORUM_SERVER_SASL_LOGIN_CONTEXT = "quorum.auth.server.saslLoginContext";
+    public static final String QUORUM_SERVER_SASL_LOGIN_CONTEXT_DFAULT_VALUE = "QuorumServer";
+
+    static final String QUORUM_SERVER_PROTOCOL_NAME = "zookeeper-quorum";
+    static final String QUORUM_SERVER_SASL_DIGEST = "zk-quorum-sasl-md5";
+    static final String QUORUM_AUTH_MESSAGE_TAG = "qpconnect";
+
+    // this is negative, so that if a learner that does auth, connects to a
+    // server, it'll think the received packet is an authentication packet
+    public static final long QUORUM_AUTH_MAGIC_NUMBER = -0xa0dbcafecafe1234L;
+
+    public enum Status {
+         IN_PROGRESS(0), SUCCESS(1), ERROR(-1);
+        private int status;
+
+        Status(int status) {
+            this.status = status;
+        }
+
+        static Status getStatus(int status) {
+            switch (status) {
+            case 0:
+                return IN_PROGRESS;
+            case 1:
+                return SUCCESS;
+            case -1:
+                return ERROR;
+            default:
+                LOG.error("Unknown status:{}!", status);
+                assert false : "Unknown status!";
+                return ERROR;
+            }
+        }
+
+        int status() {
+            return status;
+        }
+    }
+
+    public static QuorumAuthPacket createPacket(Status status, byte[] response) {
+        return new QuorumAuthPacket(QUORUM_AUTH_MAGIC_NUMBER,
+                                    status.status(), response);
+    }
+
+    public static boolean nextPacketIsAuth(DataInputStream din)
+            throws IOException {
+        din.mark(32);
+        BinaryInputArchive bia = new BinaryInputArchive(din);
+        boolean firstIsAuth = (bia.readLong("NO_TAG")
+                               == QuorumAuth.QUORUM_AUTH_MAGIC_NUMBER);
+        din.reset();
+        return firstIsAuth;
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/main/org/apache/zookeeper/server/quorum/auth/QuorumAuthLearner.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/auth/QuorumAuthLearner.java b/src/java/main/org/apache/zookeeper/server/quorum/auth/QuorumAuthLearner.java
new file mode 100644
index 0000000..af71257
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/quorum/auth/QuorumAuthLearner.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum.auth;
+
+import java.io.IOException;
+import java.net.Socket;
+
+/**
+ * Interface for quorum learner authentication mechanisms.
+ */
+public interface QuorumAuthLearner {
+
+    /**
+     * Performs an authentication step for the given socket connection.
+     *
+     * @param sock
+     *            socket connection to other quorum peer server
+     * @param hostname
+     *            host name of other quorum peer server
+     * @throws IOException
+     *             if there is an authentication failure
+     */
+    public void authenticate(Socket sock, String hostname) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/main/org/apache/zookeeper/server/quorum/auth/QuorumAuthServer.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/auth/QuorumAuthServer.java b/src/java/main/org/apache/zookeeper/server/quorum/auth/QuorumAuthServer.java
new file mode 100644
index 0000000..e9de8f0
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/quorum/auth/QuorumAuthServer.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum.auth;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.Socket;
+
+/**
+ * Interface for quorum server authentication mechanisms.
+ */
+public interface QuorumAuthServer {
+
+    /**
+     * Performs an authentication step for the given socket connection.
+     *
+     * @param sock
+     *            socket connection to other quorum peer
+     * @param din
+     *            stream used to read auth data send by the quorum learner
+     * @throws IOException if the server fails to authenticate connecting quorum learner
+     */
+    public void authenticate(Socket sock, DataInputStream din)
+            throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/main/org/apache/zookeeper/server/quorum/auth/SaslQuorumAuthLearner.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/auth/SaslQuorumAuthLearner.java b/src/java/main/org/apache/zookeeper/server/quorum/auth/SaslQuorumAuthLearner.java
new file mode 100644
index 0000000..fffb55a
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/quorum/auth/SaslQuorumAuthLearner.java
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum.auth;
+
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+
+import javax.security.auth.Subject;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginException;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.zookeeper.Login;
+import org.apache.zookeeper.SaslClientCallbackHandler;
+import org.apache.zookeeper.server.quorum.QuorumAuthPacket;
+import org.apache.zookeeper.util.SecurityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SaslQuorumAuthLearner implements QuorumAuthLearner {
+    private static final Logger LOG = LoggerFactory
+            .getLogger(SaslQuorumAuthLearner.class);
+
+    private final Login learnerLogin;
+    private final boolean quorumRequireSasl;
+    private final String quorumServicePrincipal;
+
+    public SaslQuorumAuthLearner(boolean quorumRequireSasl,
+            String quorumServicePrincipal, String loginContext)
+                    throws SaslException {
+        this.quorumRequireSasl = quorumRequireSasl;
+        this.quorumServicePrincipal = quorumServicePrincipal;
+        try {
+            AppConfigurationEntry entries[] = Configuration
+                .getConfiguration()
+                .getAppConfigurationEntry(loginContext);
+            if (entries == null || entries.length == 0) {
+                throw new LoginException("SASL-authentication failed because"
+                                         + " the specified JAAS configuration "
+                                         + "section '" + loginContext
+                                         + "' could not be found.");
+            }
+            this.learnerLogin = new Login(loginContext,
+                                    new SaslClientCallbackHandler(null, "QuorumLearner"));
+            this.learnerLogin.startThreadIfNeeded();
+        } catch (LoginException e) {
+            throw new SaslException("Failed to initialize authentication mechanism using SASL", e);
+        }
+    }
+
+    @Override
+    public void authenticate(Socket sock, String hostName) throws IOException {
+        if (!quorumRequireSasl) { // let it through, we don't require auth
+            LOG.info("Skipping SASL authentication as {}={}",
+                    QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED,
+                    quorumRequireSasl);
+            return;
+        }
+        SaslClient sc = null;
+        String principalConfig = SecurityUtils
+                .getServerPrincipal(quorumServicePrincipal, hostName);
+        try {
+            DataOutputStream dout = new DataOutputStream(
+                    sock.getOutputStream());
+            DataInputStream din = new DataInputStream(sock.getInputStream());
+            byte[] responseToken = new byte[0];
+            sc = SecurityUtils.createSaslClient(learnerLogin.getSubject(),
+                    principalConfig,
+                    QuorumAuth.QUORUM_SERVER_PROTOCOL_NAME,
+                    QuorumAuth.QUORUM_SERVER_SASL_DIGEST, LOG, "QuorumLearner");
+
+            if (sc.hasInitialResponse()) {
+                responseToken = createSaslToken(new byte[0], sc, learnerLogin);
+            }
+            send(dout, responseToken);
+            QuorumAuthPacket authPacket = receive(din);
+            QuorumAuth.Status qpStatus = QuorumAuth.Status
+                    .getStatus(authPacket.getStatus());
+            while (!sc.isComplete()) {
+                switch (qpStatus) {
+                case SUCCESS:
+                    responseToken = createSaslToken(authPacket.getToken(), sc,
+                            learnerLogin);
+                    // we're done; don't expect to send another BIND
+                    if (responseToken != null) {
+                        throw new SaslException("Protocol error: attempting to send response after completion");
+                    }
+                    break;
+                case IN_PROGRESS:
+                    responseToken = createSaslToken(authPacket.getToken(), sc,
+                            learnerLogin);
+                    send(dout, responseToken);
+                    authPacket = receive(din);
+                    qpStatus = QuorumAuth.Status
+                            .getStatus(authPacket.getStatus());
+                    break;
+                case ERROR:
+                    throw new SaslException(
+                            "Authentication failed against server addr: "
+                                    + sock.getRemoteSocketAddress());
+                default:
+                    LOG.warn("Unknown status:{}!", qpStatus);
+                    throw new SaslException(
+                            "Authentication failed against server addr: "
+                                    + sock.getRemoteSocketAddress());
+                }
+            }
+
+            // Validate status code at the end of authentication exchange.
+            checkAuthStatus(sock, qpStatus);
+        } finally {
+            if (sc != null) {
+                try {
+                    sc.dispose();
+                } catch (SaslException e) {
+                    LOG.error("SaslClient dispose() failed", e);
+                }
+            }
+        }
+        return;
+    }
+
+    private void checkAuthStatus(Socket sock, QuorumAuth.Status qpStatus)
+            throws SaslException {
+        if (qpStatus == QuorumAuth.Status.SUCCESS) {
+            LOG.info("Successfully completed the authentication using SASL. server addr: {}, status: {}",
+                    sock.getRemoteSocketAddress(), qpStatus);
+        } else {
+            throw new SaslException("Authentication failed against server addr: "
+                            + sock.getRemoteSocketAddress() + ", qpStatus: "
+                            + qpStatus);
+        }
+    }
+
+    private QuorumAuthPacket receive(DataInputStream din) throws IOException {
+        QuorumAuthPacket authPacket = new QuorumAuthPacket();
+        BinaryInputArchive bia = BinaryInputArchive.getArchive(din);
+        authPacket.deserialize(bia, QuorumAuth.QUORUM_AUTH_MESSAGE_TAG);
+        return authPacket;
+    }
+
+    private void send(DataOutputStream dout, byte[] response)
+            throws IOException {
+        QuorumAuthPacket authPacket;
+        BufferedOutputStream bufferedOutput = new BufferedOutputStream(dout);
+        BinaryOutputArchive boa = BinaryOutputArchive
+                .getArchive(bufferedOutput);
+        if (response != null && response.length < 0) {
+            throw new IOException("Response length < 0");
+        } else if (response == null) {
+            authPacket = QuorumAuth.createPacket(
+                    QuorumAuth.Status.IN_PROGRESS, response);
+        } else {
+            authPacket = QuorumAuth.createPacket(
+                    QuorumAuth.Status.IN_PROGRESS, response);
+        }
+
+        boa.writeRecord(authPacket, QuorumAuth.QUORUM_AUTH_MESSAGE_TAG);
+        bufferedOutput.flush();
+    }
+
+    // TODO: need to consolidate the #createSaslToken() implementation between ZooKeeperSaslClient#createSaslToken().
+    private byte[] createSaslToken(final byte[] saslToken,
+            final SaslClient saslClient, final Login login)
+                    throws SaslException {
+        if (saslToken == null) {
+            throw new SaslException(
+                    "Error in authenticating with a Zookeeper Quorum member: the quorum member's saslToken is null.");
+        }
+        if (login.getSubject() != null) {
+            synchronized (login) {
+                try {
+                    final byte[] retval = Subject.doAs(login.getSubject(),
+                            new PrivilegedExceptionAction<byte[]>() {
+                                public byte[] run() throws SaslException {
+                                    LOG.debug("saslClient.evaluateChallenge(len="
+                                                    + saslToken.length + ")");
+                                    return saslClient.evaluateChallenge(saslToken);
+                                }
+                            });
+                    return retval;
+                } catch (PrivilegedActionException e) {
+                    String error = "An error: (" + e
+                            + ") occurred when evaluating Zookeeper Quorum Member's "
+                            + " received SASL token.";
+                    // Try to provide hints to use about what went wrong so they
+                    // can fix their configuration.
+                    // TODO: introspect about e: look for GSS information.
+                    final String UNKNOWN_SERVER_ERROR_TEXT = "(Mechanism level: Server not found in Kerberos database (7) - UNKNOWN_SERVER)";
+                    if (e.toString().indexOf(UNKNOWN_SERVER_ERROR_TEXT) > -1) {
+                        error += " This may be caused by Java's being unable to resolve the Zookeeper Quorum Member's"
+                                + " hostname correctly. You may want to try to adding"
+                                + " '-Dsun.net.spi.nameservice.provider.1=dns,sun' to your server's JVMFLAGS environment.";
+                    }
+                    LOG.error(error);
+                    throw new SaslException(error);
+                }
+            }
+        } else {
+            throw new SaslException(
+                    "Cannot make SASL token without subject defined. "
+                            + "For diagnosis, please look for WARNs and ERRORs in your log related to the Login class.");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/main/org/apache/zookeeper/server/quorum/auth/SaslQuorumAuthServer.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/auth/SaslQuorumAuthServer.java b/src/java/main/org/apache/zookeeper/server/quorum/auth/SaslQuorumAuthServer.java
new file mode 100644
index 0000000..8430da2
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/quorum/auth/SaslQuorumAuthServer.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum.auth;
+
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.util.Set;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginException;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.zookeeper.Login;
+import org.apache.zookeeper.server.quorum.QuorumAuthPacket;
+import org.apache.zookeeper.util.SecurityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SaslQuorumAuthServer implements QuorumAuthServer {
+
+    private static final Logger LOG = LoggerFactory
+            .getLogger(SaslQuorumAuthServer.class);
+
+    private final static int MAX_RETRIES = 5;
+    private final Login serverLogin;
+    private final boolean quorumRequireSasl;
+
+    public SaslQuorumAuthServer(boolean quorumRequireSasl, String loginContext, Set<String> authzHosts)
+            throws SaslException {
+        this.quorumRequireSasl = quorumRequireSasl;
+        try {
+            AppConfigurationEntry entries[] = Configuration.getConfiguration()
+                    .getAppConfigurationEntry(loginContext);
+            if (entries == null || entries.length == 0) {
+                throw new LoginException("SASL-authentication failed"
+                        + " because the specified JAAS configuration "
+                        + "section '" + loginContext + "' could not be found.");
+            }
+            SaslQuorumServerCallbackHandler saslServerCallbackHandler = new SaslQuorumServerCallbackHandler(
+                    Configuration.getConfiguration(), loginContext, authzHosts);
+            serverLogin = new Login(loginContext, saslServerCallbackHandler);
+            serverLogin.startThreadIfNeeded();
+        } catch (Throwable e) {
+            throw new SaslException(
+                    "Failed to initialize authentication mechanism using SASL",
+                    e);
+        }
+    }
+
+    @Override
+    public void authenticate(Socket sock, DataInputStream din)
+            throws SaslException {
+        DataOutputStream dout = null;
+        SaslServer ss = null;
+        try {
+            if (!QuorumAuth.nextPacketIsAuth(din)) {
+                if (quorumRequireSasl) {
+                    throw new SaslException("Learner not trying to authenticate"
+                                            + " and authentication is required");
+                } else {
+                    // let it through, we don't require auth
+                    return;
+                }
+            }
+
+            byte[] token = receive(din);
+            int tries = 0;
+            dout = new DataOutputStream(sock.getOutputStream());
+            byte[] challenge = null;
+            ss = SecurityUtils.createSaslServer(serverLogin.getSubject(),
+                    QuorumAuth.QUORUM_SERVER_PROTOCOL_NAME,
+                    QuorumAuth.QUORUM_SERVER_SASL_DIGEST, serverLogin.callbackHandler,
+                    LOG);
+            while (!ss.isComplete()) {
+                challenge = ss.evaluateResponse(token);
+                if (!ss.isComplete()) {
+                    // limited number of retries.
+                    if (++tries > MAX_RETRIES) {
+                        send(dout, challenge, QuorumAuth.Status.ERROR);
+                        LOG.warn("Failed to authenticate using SASL, server addr: {}, retries={} exceeded.",
+                                sock.getRemoteSocketAddress(), tries);
+                        break;
+                    }
+                    send(dout, challenge, QuorumAuth.Status.IN_PROGRESS);
+                    token = receive(din);
+                }
+            }
+            // Authentication exchange has completed
+            if (ss.isComplete()) {
+                send(dout, challenge, QuorumAuth.Status.SUCCESS);
+                LOG.info("Successfully completed the authentication using SASL. learner addr: {}",
+                        sock.getRemoteSocketAddress());
+            }
+        } catch (Exception e) {
+            try {
+                if (dout != null) {
+                    // send error message to the learner
+                    send(dout, new byte[0], QuorumAuth.Status.ERROR);
+                }
+            } catch (IOException ioe) {
+                LOG.warn("Exception while sending failed status", ioe);
+            }
+            // If sasl is not required, when a server initializes a
+            // connection it will try to log in, but it will also
+            // accept connections that do not start with a sasl
+            // handshake.
+            if (quorumRequireSasl) {
+                LOG.error("Failed to authenticate using SASL", e);
+                throw new SaslException(
+                        "Failed to authenticate using SASL: " + e.getMessage());
+            } else {
+                LOG.warn("Failed to authenticate using SASL", e);
+                LOG.warn("Maintaining learner connection despite SASL authentication failure."
+                                + " server addr: {}, {}: {}",
+                        new Object[] { sock.getRemoteSocketAddress(),
+                                QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED,
+                                quorumRequireSasl });
+                return; // let it through, we don't require auth
+            }
+        } finally {
+            if (ss != null) {
+                try {
+                    ss.dispose();
+                } catch (SaslException e) {
+                    LOG.error("SaslServer dispose() failed", e);
+                }
+            }
+        }
+        return;
+    }
+
+    private byte[] receive(DataInputStream din) throws IOException {
+        QuorumAuthPacket authPacket = new QuorumAuthPacket();
+        BinaryInputArchive bia = BinaryInputArchive.getArchive(din);
+        authPacket.deserialize(bia, QuorumAuth.QUORUM_AUTH_MESSAGE_TAG);
+        return authPacket.getToken();
+    }
+
+    private void send(DataOutputStream dout, byte[] challenge,
+            QuorumAuth.Status s) throws IOException {
+        BufferedOutputStream bufferedOutput = new BufferedOutputStream(dout);
+        BinaryOutputArchive boa = BinaryOutputArchive
+                .getArchive(bufferedOutput);
+        QuorumAuthPacket authPacket;
+        if (challenge != null && challenge.length < 0) {
+            throw new IOException("Response length < 0");
+        } else if (challenge == null && s != QuorumAuth.Status.SUCCESS) {
+            authPacket = QuorumAuth.createPacket(
+                    QuorumAuth.Status.IN_PROGRESS, challenge);
+        } else {
+            authPacket = QuorumAuth.createPacket(s, challenge);
+        }
+
+        boa.writeRecord(authPacket, QuorumAuth.QUORUM_AUTH_MESSAGE_TAG);
+        bufferedOutput.flush();
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/main/org/apache/zookeeper/server/quorum/auth/SaslQuorumServerCallbackHandler.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/auth/SaslQuorumServerCallbackHandler.java b/src/java/main/org/apache/zookeeper/server/quorum/auth/SaslQuorumServerCallbackHandler.java
new file mode 100644
index 0000000..3e71bb1
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/quorum/auth/SaslQuorumServerCallbackHandler.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.quorum.auth;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is used by the SASL mechanisms to get further information to complete
+ * the authentication. For example, a SASL mechanism might use this callback
+ * handler to do verification operation. This is used by the QuorumServer to
+ * perform the mutual quorum peer authentication.
+ */
+public class SaslQuorumServerCallbackHandler implements CallbackHandler {
+    private static final String USER_PREFIX = "user_";
+    private static final Logger LOG = LoggerFactory.getLogger(SaslQuorumServerCallbackHandler.class);
+
+    private String userName;
+    private final Map<String,String> credentials = new HashMap<String,String>();
+    private final Set<String> authzHosts;
+
+    public SaslQuorumServerCallbackHandler(Configuration configuration,
+            String serverSection, Set<String> authzHosts) throws IOException {
+        AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(serverSection);
+
+        if (configurationEntries == null) {
+            String errorMessage = "Could not find a '" + serverSection + "' entry in this configuration: Server cannot start.";
+            LOG.error(errorMessage);
+            throw new IOException(errorMessage);
+        }
+        credentials.clear();
+        for(AppConfigurationEntry entry: configurationEntries) {
+            Map<String,?> options = entry.getOptions();
+            // Populate DIGEST-MD5 user -> password map with JAAS configuration entries from the "QuorumServer" section.
+            // Usernames are distinguished from other options by prefixing the username with a "user_" prefix.
+            for(Map.Entry<String, ?> pair : options.entrySet()) {
+                String key = pair.getKey();
+                if (key.startsWith(USER_PREFIX)) {
+                    String userName = key.substring(USER_PREFIX.length());
+                    credentials.put(userName,(String)pair.getValue());
+                }
+            }
+        }
+
+        // authorized host lists
+        this.authzHosts = authzHosts;
+    }
+
+    public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
+        for (Callback callback : callbacks) {
+            if (callback instanceof NameCallback) {
+                handleNameCallback((NameCallback) callback);
+            } else if (callback instanceof PasswordCallback) {
+                handlePasswordCallback((PasswordCallback) callback);
+            } else if (callback instanceof RealmCallback) {
+                handleRealmCallback((RealmCallback) callback);
+            } else if (callback instanceof AuthorizeCallback) {
+                handleAuthorizeCallback((AuthorizeCallback) callback);
+            }
+        }
+    }
+
+    private void handleNameCallback(NameCallback nc) {
+        // check to see if this user is in the user password database.
+        if (credentials.get(nc.getDefaultName()) == null) {
+            LOG.warn("User '{}' not found in list of DIGEST-MD5 authenticateable users.",
+                    nc.getDefaultName());
+            return;
+        }
+        nc.setName(nc.getDefaultName());
+        userName = nc.getDefaultName();
+    }
+
+    private void handlePasswordCallback(PasswordCallback pc) {
+        if (credentials.containsKey(userName) ) {
+            pc.setPassword(credentials.get(userName).toCharArray());
+        } else {
+            LOG.warn("No password found for user: {}", userName);
+        }
+    }
+
+    private void handleRealmCallback(RealmCallback rc) {
+        LOG.debug("QuorumLearner supplied realm: {}", rc.getDefaultText());
+        rc.setText(rc.getDefaultText());
+    }
+
+    private void handleAuthorizeCallback(AuthorizeCallback ac) {
+        String authenticationID = ac.getAuthenticationID();
+        String authorizationID = ac.getAuthorizationID();
+
+        boolean authzFlag = false;
+        // 1. Matches authenticationID and authorizationID
+        authzFlag = authenticationID.equals(authorizationID);
+
+        // 2. Verify whether the connecting host is present in authorized hosts.
+        // If not exists, then connecting peer is not authorized to join the
+        // ensemble and will reject it.
+        if (authzFlag) {
+            String[] components = authorizationID.split("[/@]");
+            if (components.length == 3) {
+                authzFlag = authzHosts.contains(components[1]);
+            }
+            if (!authzFlag) {
+                LOG.error("SASL authorization completed, {} is not authorized to connect",
+                        components[1]);
+            }
+        }
+
+        // Sets authorization flag
+        ac.setAuthorized(authzFlag);
+        if (ac.isAuthorized()) {
+            ac.setAuthorizedID(authorizationID);
+            LOG.info("Successfully authenticated learner: authenticationID={};  authorizationID={}.",
+                    authenticationID, authorizationID);
+        }
+        LOG.debug("SASL authorization completed, authorized flag set to {}", ac.isAuthorized());
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/main/org/apache/zookeeper/util/SecurityUtils.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/util/SecurityUtils.java b/src/java/main/org/apache/zookeeper/util/SecurityUtils.java
new file mode 100644
index 0000000..67484e4
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/util/SecurityUtils.java
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.util;
+
+import java.security.Principal;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+
+import javax.security.auth.Subject;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+import org.apache.zookeeper.SaslClientCallbackHandler;
+import org.apache.zookeeper.server.auth.KerberosName;
+import org.ietf.jgss.GSSContext;
+import org.ietf.jgss.GSSCredential;
+import org.ietf.jgss.GSSException;
+import org.ietf.jgss.GSSManager;
+import org.ietf.jgss.GSSName;
+import org.ietf.jgss.Oid;
+import org.slf4j.Logger;
+
+public final class SecurityUtils {
+
+    public static final String QUORUM_HOSTNAME_PATTERN = "_HOST";
+
+    /**
+     * Create an instance of a SaslClient. It will return null if there is an exception.
+     *
+     * @param subject subject
+     * @param servicePrincipal principal
+     * @param protocol name of the protocol for which the authentication is being performed
+     * @param serverName name of the server to authenticate to
+     * @param LOG logger
+     * @param entity can be either zookeeper client or quorum learner
+     *
+     * @return saslclient object
+     * @throws SaslException
+     */
+    public static SaslClient createSaslClient(final Subject subject,
+            final String servicePrincipal, final String protocol,
+            final String serverName, final Logger LOG, final String entity) throws SaslException {
+        SaslClient saslClient;
+        // Use subject.getPrincipals().isEmpty() as an indication of which SASL
+        // mechanism to use: if empty, use DIGEST-MD5; otherwise, use GSSAPI.
+        if (subject.getPrincipals().isEmpty()) {
+            // no principals: must not be GSSAPI: use DIGEST-MD5 mechanism
+            // instead.
+            LOG.info("{} will use DIGEST-MD5 as SASL mechanism.", entity);
+            String[] mechs = { "DIGEST-MD5" };
+            String username = (String) (subject.getPublicCredentials()
+                    .toArray()[0]);
+            String password = (String) (subject.getPrivateCredentials()
+                    .toArray()[0]);
+            // 'domain' parameter is hard-wired between the server and client
+            saslClient = Sasl.createSaslClient(mechs, username, protocol,
+                    serverName, null, new SaslClientCallbackHandler(password, entity));
+            return saslClient;
+        } else { // GSSAPI.
+            final Object[] principals = subject.getPrincipals().toArray();
+            // determine client principal from subject.
+            final Principal clientPrincipal = (Principal) principals[0];
+            boolean usingNativeJgss = Boolean
+                    .getBoolean("sun.security.jgss.native");
+            if (usingNativeJgss) {
+                // http://docs.oracle.com/javase/6/docs/technotes/guides/security/jgss/jgss-features.html
+                // """
+                // In addition, when performing operations as a particular
+                // Subject, e.g. Subject.doAs(...) or
+                // Subject.doAsPrivileged(...),
+                // the to-be-used GSSCredential should be added to Subject's
+                // private credential set. Otherwise, the GSS operations will
+                // fail since no credential is found.
+                // """
+                try {
+                    GSSManager manager = GSSManager.getInstance();
+                    Oid krb5Mechanism = new Oid("1.2.840.113554.1.2.2");
+                    GSSCredential cred = manager.createCredential(null,
+                            GSSContext.DEFAULT_LIFETIME, krb5Mechanism,
+                            GSSCredential.INITIATE_ONLY);
+                    subject.getPrivateCredentials().add(cred);
+                    LOG.debug("Added private credential to {} principal name: '{}'",
+                            entity, clientPrincipal);
+                } catch (GSSException ex) {
+                    LOG.warn("Cannot add private credential to subject; "
+                                    + "authentication at the server may fail", ex);
+                }
+            }
+            final KerberosName clientKerberosName = new KerberosName(
+                    clientPrincipal.getName());
+            // assume that server and client are in the same realm (by default;
+            // unless the system property
+            // "zookeeper.server.realm" is set).
+            String serverRealm = System.getProperty("zookeeper.server.realm",
+                    clientKerberosName.getRealm());
+            KerberosName serviceKerberosName = new KerberosName(
+                    servicePrincipal + "@" + serverRealm);
+            final String serviceName = serviceKerberosName.getServiceName();
+            final String serviceHostname = serviceKerberosName.getHostName();
+            final String clientPrincipalName = clientKerberosName.toString();
+            try {
+                saslClient = Subject.doAs(subject,
+                        new PrivilegedExceptionAction<SaslClient>() {
+                            public SaslClient run() throws SaslException {
+                                LOG.info("{} will use GSSAPI as SASL mechanism.", entity);
+                                String[] mechs = { "GSSAPI" };
+                                LOG.debug("creating sasl client: {}={};service={};serviceHostname={}",
+                                        new Object[] { entity, clientPrincipalName, serviceName, serviceHostname });
+                                SaslClient saslClient = Sasl.createSaslClient(
+                                        mechs, clientPrincipalName, serviceName,
+                                        serviceHostname, null,
+                                        new SaslClientCallbackHandler(null, entity));
+                                return saslClient;
+                            }
+                        });
+                return saslClient;
+            } catch (Exception e) {
+                LOG.error("Exception while trying to create SASL client", e);
+                return null;
+            }
+        }
+    }
+
+    /**
+     * Create an instance of a SaslServer. It will return null if there is an exception.
+     *
+     * @param subject subject
+     * @param protocol protocol
+     * @param serverName server name
+     * @param callbackHandler login callback handler
+     * @param LOG logger
+     * @return sasl server object
+     */
+    public static SaslServer createSaslServer(final Subject subject,
+            final String protocol, final String serverName,
+            final CallbackHandler callbackHandler, final Logger LOG) {
+        if (subject != null) {
+            // server is using a JAAS-authenticated subject: determine service
+            // principal name and hostname from zk server's subject.
+            if (subject.getPrincipals().size() > 0) {
+                try {
+                    final Object[] principals = subject.getPrincipals()
+                            .toArray();
+                    final Principal servicePrincipal = (Principal) principals[0];
+
+                    // e.g. servicePrincipalNameAndHostname :=
+                    // "zookeeper/myhost.foo.com@FOO.COM"
+                    final String servicePrincipalNameAndHostname = servicePrincipal
+                            .getName();
+
+                    int indexOf = servicePrincipalNameAndHostname.indexOf("/");
+
+                    // e.g. servicePrincipalName := "zookeeper"
+                    final String servicePrincipalName = servicePrincipalNameAndHostname
+                            .substring(0, indexOf);
+
+                    // e.g. serviceHostnameAndKerbDomain :=
+                    // "myhost.foo.com@FOO.COM"
+                    final String serviceHostnameAndKerbDomain = servicePrincipalNameAndHostname
+                            .substring(indexOf + 1,
+                                    servicePrincipalNameAndHostname.length());
+
+                    indexOf = serviceHostnameAndKerbDomain.indexOf("@");
+                    // e.g. serviceHostname := "myhost.foo.com"
+                    final String serviceHostname = serviceHostnameAndKerbDomain
+                            .substring(0, indexOf);
+
+                    // TODO: should depend on zoo.cfg specified mechs, but if
+                    // subject is non-null, it can be assumed to be GSSAPI.
+                    final String mech = "GSSAPI";
+
+                    LOG.debug("serviceHostname is '" + serviceHostname + "'");
+                    LOG.debug("servicePrincipalName is '" + servicePrincipalName
+                            + "'");
+                    LOG.debug("SASL mechanism(mech) is '" + mech + "'");
+
+                    boolean usingNativeJgss = Boolean
+                            .getBoolean("sun.security.jgss.native");
+                    if (usingNativeJgss) {
+                        // http://docs.oracle.com/javase/6/docs/technotes/guides/security/jgss/jgss-features.html
+                        // """
+                        // In addition, when performing operations as a
+                        // particular
+                        // Subject, e.g. Subject.doAs(...) or
+                        // Subject.doAsPrivileged(...), the to-be-used
+                        // GSSCredential should be added to Subject's
+                        // private credential set. Otherwise, the GSS operations
+                        // will fail since no credential is found.
+                        // """
+                        try {
+                            GSSManager manager = GSSManager.getInstance();
+                            Oid krb5Mechanism = new Oid("1.2.840.113554.1.2.2");
+                            GSSName gssName = manager.createName(
+                                    servicePrincipalName + "@"
+                                            + serviceHostname,
+                                    GSSName.NT_HOSTBASED_SERVICE);
+                            GSSCredential cred = manager.createCredential(
+                                    gssName, GSSContext.DEFAULT_LIFETIME,
+                                    krb5Mechanism, GSSCredential.ACCEPT_ONLY);
+                            subject.getPrivateCredentials().add(cred);
+                            LOG.debug("Added private credential to service principal name: '{}',"
+                                            + " GSSCredential name: {}", servicePrincipalName, cred.getName());
+                        } catch (GSSException ex) {
+                            LOG.warn("Cannot add private credential to subject; "
+                                            + "clients authentication may fail", ex);
+                        }
+                    }
+                    try {
+                        return Subject.doAs(subject,
+                                new PrivilegedExceptionAction<SaslServer>() {
+                                    public SaslServer run() {
+                                        try {
+                                            SaslServer saslServer;
+                                            saslServer = Sasl.createSaslServer(
+                                                    mech, servicePrincipalName,
+                                                    serviceHostname, null,
+                                                    callbackHandler);
+                                            return saslServer;
+                                        } catch (SaslException e) {
+                                            LOG.error("Zookeeper Server failed to create a SaslServer to interact with a client during session initiation: ", e);
+                                            return null;
+                                        }
+                                    }
+                                });
+                    } catch (PrivilegedActionException e) {
+                        // TODO: exit server at this point(?)
+                        LOG.error("Zookeeper Quorum member experienced a PrivilegedActionException exception while creating a SaslServer using a JAAS principal context:", e);
+                    }
+                } catch (IndexOutOfBoundsException e) {
+                    LOG.error("server principal name/hostname determination error: ", e);
+                }
+            } else {
+                // JAAS non-GSSAPI authentication: assuming and supporting only
+                // DIGEST-MD5 mechanism for now.
+                // TODO: use 'authMech=' value in zoo.cfg.
+                try {
+                    SaslServer saslServer = Sasl.createSaslServer("DIGEST-MD5",
+                            protocol, serverName, null, callbackHandler);
+                    return saslServer;
+                } catch (SaslException e) {
+                    LOG.error("Zookeeper Quorum member failed to create a SaslServer to interact with a client during session initiation", e);
+                }
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Convert Kerberos principal name pattern to valid Kerberos principal name.
+     * If the principal name contains hostname pattern "_HOST" then it replaces
+     * with the given hostname, which should be fully-qualified domain name.
+     *
+     * @param principalConfig
+     *            the Kerberos principal name conf value to convert
+     * @param hostname
+     *            the fully-qualified domain name used for substitution
+     * @return converted Kerberos principal name
+     */
+    public static String getServerPrincipal(String principalConfig,
+            String hostname) {
+        String[] components = getComponents(principalConfig);
+        if (components == null || components.length != 2
+                || !components[1].equals(QUORUM_HOSTNAME_PATTERN)) {
+            return principalConfig;
+        } else {
+            return replacePattern(components, hostname);
+        }
+    }
+
+    private static String[] getComponents(String principalConfig) {
+        if (principalConfig == null)
+            return null;
+        return principalConfig.split("[/]");
+    }
+
+    private static String replacePattern(String[] components, String hostname) {
+        return components[0] + "/" + hostname.toLowerCase();
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/test/data/kerberos/minikdc-krb5.conf
----------------------------------------------------------------------
diff --git a/src/java/test/data/kerberos/minikdc-krb5.conf b/src/java/test/data/kerberos/minikdc-krb5.conf
new file mode 100644
index 0000000..43ec7c4
--- /dev/null
+++ b/src/java/test/data/kerberos/minikdc-krb5.conf
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# This resource is originally from HDFS, see the similarly named files there
+# in case of bug fixing, history, etc.
+# Branch : trunk
+# Github Revision: 1d1ab587e4e92ce3aea4cb144811f69145cb3b33
+#
+[libdefaults]
+    default_realm = {0}
+    udp_preference_limit = 1
+
+[realms]
+    {0} = '{'
+        kdc = {1}:{2}
+    '}'
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/test/data/kerberos/minikdc.ldiff
----------------------------------------------------------------------
diff --git a/src/java/test/data/kerberos/minikdc.ldiff b/src/java/test/data/kerberos/minikdc.ldiff
new file mode 100644
index 0000000..20c8d77
--- /dev/null
+++ b/src/java/test/data/kerberos/minikdc.ldiff
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# This resource is originally from HDFS, see the similarly named files there
+# in case of bug fixing, history, etc.
+# Branch : trunk
+# Github Revision: 1d1ab587e4e92ce3aea4cb144811f69145cb3b33
+#
+dn: ou=users,dc=${0},dc=${1}
+objectClass: organizationalUnit
+objectClass: top
+ou: users
+
+dn: uid=krbtgt,ou=users,dc=${0},dc=${1}
+objectClass: top
+objectClass: person
+objectClass: inetOrgPerson
+objectClass: krb5principal
+objectClass: krb5kdcentry
+cn: KDC Service
+sn: Service
+uid: krbtgt
+userPassword: secret
+krb5PrincipalName: krbtgt/${2}.${3}@${2}.${3}
+krb5KeyVersionNumber: 0
+
+dn: uid=ldap,ou=users,dc=${0},dc=${1}
+objectClass: top
+objectClass: person
+objectClass: inetOrgPerson
+objectClass: krb5principal
+objectClass: krb5kdcentry
+cn: LDAP
+sn: Service
+uid: ldap
+userPassword: secret
+krb5PrincipalName: ldap/${4}@${2}.${3}
+krb5KeyVersionNumber: 0
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/test/org/apache/zookeeper/server/quorum/CnxManagerTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/CnxManagerTest.java b/src/java/test/org/apache/zookeeper/server/quorum/CnxManagerTest.java
index 8db7fa8..a82a728 100644
--- a/src/java/test/org/apache/zookeeper/server/quorum/CnxManagerTest.java
+++ b/src/java/test/org/apache/zookeeper/server/quorum/CnxManagerTest.java
@@ -87,7 +87,7 @@ public class CnxManagerTest extends ZKTestCase {
         public void run(){
             try {
                 QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[0], peerTmpdir[0], peerClientPort[0], 3, 0, 1000, 2, 2);
-                QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
+                QuorumCnxManager cnxManager = peer.createCnxnManager();
                 QuorumCnxManager.Listener listener = cnxManager.listener;
                 if(listener != null){
                     listener.start();
@@ -131,7 +131,7 @@ public class CnxManagerTest extends ZKTestCase {
         thread.start();
         
         QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 1000, 2, 2);
-        QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
+        QuorumCnxManager cnxManager = peer.createCnxnManager();
         QuorumCnxManager.Listener listener = cnxManager.listener;
         if(listener != null){
             listener.start();
@@ -175,7 +175,7 @@ public class CnxManagerTest extends ZKTestCase {
         peerTmpdir[2] = ClientBase.createTmpDir();
     
         QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 1000, 2, 2);
-        QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
+        QuorumCnxManager cnxManager = peer.createCnxnManager();
         QuorumCnxManager.Listener listener = cnxManager.listener;
         if(listener != null){
             listener.start();
@@ -202,7 +202,7 @@ public class CnxManagerTest extends ZKTestCase {
     @Test
     public void testCnxManagerSpinLock() throws Exception {               
         QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 1000, 2, 2);
-        QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
+        QuorumCnxManager cnxManager = peer.createCnxnManager();
         QuorumCnxManager.Listener listener = cnxManager.listener;
         if(listener != null){
             listener.start();
@@ -258,7 +258,10 @@ public class CnxManagerTest extends ZKTestCase {
     class TestCnxManager extends QuorumCnxManager {
 
         TestCnxManager(QuorumPeer self) {
-            super(self);
+            super(self.getId(), self.getView(), self.authServer,
+                    self.authLearner, self.tickTime * self.syncLimit,
+                    self.getQuorumListenOnAllIPs(),
+                    self.quorumCnxnThreadsSize, false);
         }
         
         boolean senderWorkerMapContains(Long l){
@@ -359,7 +362,7 @@ public class CnxManagerTest extends ZKTestCase {
     @Test
     public void testSocketTimeout() throws Exception {
         QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 2000, 2, 2);
-        QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
+        QuorumCnxManager cnxManager = peer.createCnxnManager();
         QuorumCnxManager.Listener listener = cnxManager.listener;
         if(listener != null){
             listener.start();

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java b/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java
index c1259d1..c0ab3ea 100644
--- a/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java
+++ b/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java
@@ -113,7 +113,7 @@ public class FLEBackwardElectionRoundTest extends ZKTestCase {
          * Start mock server 1
          */
         QuorumPeer mockPeer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 1000, 2, 2);
-        cnxManagers[0] = new QuorumCnxManager(mockPeer);
+        cnxManagers[0] = mockPeer.createCnxnManager();
         QuorumCnxManager.Listener listener = cnxManagers[0].listener;
         listener.start();
 
@@ -124,7 +124,7 @@ public class FLEBackwardElectionRoundTest extends ZKTestCase {
          * Start mock server 2
          */
         mockPeer = new QuorumPeer(peers, tmpdir[2], tmpdir[2], port[2], 3, 2, 1000, 2, 2);
-        cnxManagers[1] = new QuorumCnxManager(mockPeer);
+        cnxManagers[1] = mockPeer.createCnxnManager();
         listener = cnxManagers[1].listener;
         listener.start();
 

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/test/org/apache/zookeeper/server/quorum/FLECompatibilityTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/FLECompatibilityTest.java b/src/java/test/org/apache/zookeeper/server/quorum/FLECompatibilityTest.java
index 72e4fc9..f1c04ca 100644
--- a/src/java/test/org/apache/zookeeper/server/quorum/FLECompatibilityTest.java
+++ b/src/java/test/org/apache/zookeeper/server/quorum/FLECompatibilityTest.java
@@ -288,7 +288,7 @@ public class FLECompatibilityTest extends ZKTestCase {
         
         QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2);
         peer.setPeerState(ServerState.LOOKING);
-        QuorumCnxManager mng = new QuorumCnxManager(peer);
+        QuorumCnxManager mng = peer.createCnxnManager();
         
         /*
          * Check that it generates an internal notification correctly
@@ -325,7 +325,7 @@ public class FLECompatibilityTest extends ZKTestCase {
 
         QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2);
         peer.setPeerState(ServerState.LOOKING);
-        QuorumCnxManager mng = new QuorumCnxManager(peer);
+        QuorumCnxManager mng = peer.createCnxnManager();
         
         /*
          * Check that it generates an internal notification correctly

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/test/org/apache/zookeeper/server/quorum/FLEDontCareTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/FLEDontCareTest.java b/src/java/test/org/apache/zookeeper/server/quorum/FLEDontCareTest.java
index a4c0cb0..3d4a02c 100644
--- a/src/java/test/org/apache/zookeeper/server/quorum/FLEDontCareTest.java
+++ b/src/java/test/org/apache/zookeeper/server/quorum/FLEDontCareTest.java
@@ -90,7 +90,7 @@ public class FLEDontCareTest {
 
     @Test
     public void testDontCare() {
-        MockFLE fle = new MockFLE(peer, new QuorumCnxManager(peer));
+        MockFLE fle = new MockFLE(peer, peer.createCnxnManager());
 
         HashMap<Long, Vote> votes = new HashMap<Long, Vote>();
         votes.put(0L, new Vote(0x1, 4L, ZxidUtils.makeZxid(1, 1), 1, 2, ServerState.FOLLOWING));
@@ -104,7 +104,7 @@ public class FLEDontCareTest {
 
     @Test
     public void testDontCareVersion() {
-        MockFLE fle = new MockFLE(peer, new QuorumCnxManager(peer));
+        MockFLE fle = new MockFLE(peer, peer.createCnxnManager());
 
         HashMap<Long, Vote> votes = new HashMap<Long, Vote>();
         votes.put(0L, new Vote(0x1, 4L, ZxidUtils.makeZxid(1, 1), 1, 1, ServerState.FOLLOWING));
@@ -118,7 +118,7 @@ public class FLEDontCareTest {
 
     @Test
     public void testLookingNormal() {
-        MockFLE fle = new MockFLE(peer, new QuorumCnxManager(peer));
+        MockFLE fle = new MockFLE(peer, peer.createCnxnManager());
 
         HashMap<Long, Vote> votes = new HashMap<Long, Vote>();
         votes.put(0L, new Vote(4L, ZxidUtils.makeZxid(2, 1), 1, 1, ServerState.LOOKING));
@@ -132,7 +132,7 @@ public class FLEDontCareTest {
 
     @Test
     public void testLookingDiffRounds() {
-        MockFLE fle = new MockFLE(peer, new QuorumCnxManager(peer));
+        MockFLE fle = new MockFLE(peer, peer.createCnxnManager());
 
         HashMap<Long, Vote> votes = new HashMap<Long, Vote>();
         votes.put(0L, new Vote(4L, ZxidUtils.makeZxid(1, 1), 1, 1, ServerState.LOOKING));
@@ -188,7 +188,7 @@ public class FLEDontCareTest {
 
     @Test
     public void testOutofElection() {
-        MockFLE fle = new MockFLE(peer, new QuorumCnxManager(peer));
+        MockFLE fle = new MockFLE(peer, peer.createCnxnManager());
         HashMap<Long,Vote> outofelection = new HashMap<Long,Vote>();
 
         /*

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java b/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java
index 39a53ca..190785c 100644
--- a/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java
+++ b/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java
@@ -101,7 +101,7 @@ public class FLELostMessageTest extends ZKTestCase {
          * Create an instance of the connection manager
          */
         QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2);
-        cnxManager = new QuorumCnxManager(peer);
+        cnxManager = peer.createCnxnManager();
         QuorumCnxManager.Listener listener = cnxManager.listener;
         listener.start();
 

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/test/org/apache/zookeeper/server/quorum/LearnerTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/LearnerTest.java b/src/java/test/org/apache/zookeeper/server/quorum/LearnerTest.java
index 2ae57ce..fd08d21 100644
--- a/src/java/test/org/apache/zookeeper/server/quorum/LearnerTest.java
+++ b/src/java/test/org/apache/zookeeper/server/quorum/LearnerTest.java
@@ -69,8 +69,8 @@ public class LearnerTest extends ZKTestCase {
 	}
 	class SimpleLearner extends Learner {
 		SimpleLearner(FileTxnSnapLog ftsl) throws IOException {
-			self = new QuorumPeer();
-			zk = new SimpleLearnerZooKeeperServer(ftsl, self);
+            self = QuorumPeer.testingQuorumPeer();
+            zk = new SimpleLearnerZooKeeperServer(ftsl, self);
 			((SimpleLearnerZooKeeperServer)zk).learner = this;
 		}
 	}