You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ha...@apache.org on 2017/07/24 17:39:50 UTC

[2/3] zookeeper git commit: ZOOKEEPER-2792 ZOOKEEPER-1045: Port implementation from branch-3.4 to branch-3.5.

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/5a29daed/src/java/main/org/apache/zookeeper/server/quorum/auth/NullQuorumAuthLearner.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/auth/NullQuorumAuthLearner.java b/src/java/main/org/apache/zookeeper/server/quorum/auth/NullQuorumAuthLearner.java
new file mode 100644
index 0000000..0af891c
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/quorum/auth/NullQuorumAuthLearner.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum.auth;
+
+import java.net.Socket;
+
+/**
+ * This class represents no authentication learner, it just return
+ * without performing any authentication.
+ */
+public class NullQuorumAuthLearner implements QuorumAuthLearner {
+
+    @Override
+    public void authenticate(Socket sock, String hostname) {
+        return; // simply return don't require auth
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/5a29daed/src/java/main/org/apache/zookeeper/server/quorum/auth/NullQuorumAuthServer.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/auth/NullQuorumAuthServer.java b/src/java/main/org/apache/zookeeper/server/quorum/auth/NullQuorumAuthServer.java
new file mode 100644
index 0000000..b26a54a
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/quorum/auth/NullQuorumAuthServer.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum.auth;
+
+import java.io.DataInputStream;
+import java.net.Socket;
+
+/**
+ * This class represents no authentication server, it just return
+ * without performing any authentication.
+ */
+public class NullQuorumAuthServer implements QuorumAuthServer {
+
+    @Override
+    public void authenticate(final Socket sock, final DataInputStream din) {
+        return; // simply return don't require auth
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/5a29daed/src/java/main/org/apache/zookeeper/server/quorum/auth/QuorumAuth.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/auth/QuorumAuth.java b/src/java/main/org/apache/zookeeper/server/quorum/auth/QuorumAuth.java
new file mode 100644
index 0000000..8bfa394
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/quorum/auth/QuorumAuth.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum.auth;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import org.apache.jute.BinaryInputArchive;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.zookeeper.server.quorum.QuorumAuthPacket;
+
+public class QuorumAuth {
+    private static final Logger LOG = LoggerFactory.getLogger(QuorumAuth.class);
+
+    public static final String QUORUM_SASL_AUTH_ENABLED = "quorum.auth.enableSasl";
+    public static final String QUORUM_SERVER_SASL_AUTH_REQUIRED = "quorum.auth.serverRequireSasl";
+    public static final String QUORUM_LEARNER_SASL_AUTH_REQUIRED = "quorum.auth.learnerRequireSasl";
+
+    public static final String QUORUM_KERBEROS_SERVICE_PRINCIPAL = "quorum.auth.kerberos.servicePrincipal";
+    public static final String QUORUM_KERBEROS_SERVICE_PRINCIPAL_DEFAULT_VALUE = "zkquorum/localhost";
+
+    public static final String QUORUM_LEARNER_SASL_LOGIN_CONTEXT = "quorum.auth.learner.saslLoginContext";
+    public static final String QUORUM_LEARNER_SASL_LOGIN_CONTEXT_DFAULT_VALUE = "QuorumLearner";
+
+    public static final String QUORUM_SERVER_SASL_LOGIN_CONTEXT = "quorum.auth.server.saslLoginContext";
+    public static final String QUORUM_SERVER_SASL_LOGIN_CONTEXT_DFAULT_VALUE = "QuorumServer";
+
+    static final String QUORUM_SERVER_PROTOCOL_NAME = "zookeeper-quorum";
+    static final String QUORUM_SERVER_SASL_DIGEST = "zk-quorum-sasl-md5";
+    static final String QUORUM_AUTH_MESSAGE_TAG = "qpconnect";
+
+    // this is negative, so that if a learner that does auth, connects to a
+    // server, it'll think the received packet is an authentication packet
+    public static final long QUORUM_AUTH_MAGIC_NUMBER = -0xa0dbcafecafe1234L;
+
+    public enum Status {
+         IN_PROGRESS(0), SUCCESS(1), ERROR(-1);
+        private int status;
+
+        Status(int status) {
+            this.status = status;
+        }
+
+        static Status getStatus(int status) {
+            switch (status) {
+            case 0:
+                return IN_PROGRESS;
+            case 1:
+                return SUCCESS;
+            case -1:
+                return ERROR;
+            default:
+                LOG.error("Unknown status:{}!", status);
+                assert false : "Unknown status!";
+                return ERROR;
+            }
+        }
+
+        int status() {
+            return status;
+        }
+    }
+
+    public static QuorumAuthPacket createPacket(Status status, byte[] response) {
+        return new QuorumAuthPacket(QUORUM_AUTH_MAGIC_NUMBER,
+                                    status.status(), response);
+    }
+
+    public static boolean nextPacketIsAuth(DataInputStream din)
+            throws IOException {
+        din.mark(32);
+        BinaryInputArchive bia = new BinaryInputArchive(din);
+        boolean firstIsAuth = (bia.readLong("NO_TAG")
+                               == QuorumAuth.QUORUM_AUTH_MAGIC_NUMBER);
+        din.reset();
+        return firstIsAuth;
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/5a29daed/src/java/main/org/apache/zookeeper/server/quorum/auth/QuorumAuthLearner.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/auth/QuorumAuthLearner.java b/src/java/main/org/apache/zookeeper/server/quorum/auth/QuorumAuthLearner.java
new file mode 100644
index 0000000..af71257
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/quorum/auth/QuorumAuthLearner.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum.auth;
+
+import java.io.IOException;
+import java.net.Socket;
+
+/**
+ * Interface for quorum learner authentication mechanisms.
+ */
+public interface QuorumAuthLearner {
+
+    /**
+     * Performs an authentication step for the given socket connection.
+     *
+     * @param sock
+     *            socket connection to other quorum peer server
+     * @param hostname
+     *            host name of other quorum peer server
+     * @throws IOException
+     *             if there is an authentication failure
+     */
+    public void authenticate(Socket sock, String hostname) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/5a29daed/src/java/main/org/apache/zookeeper/server/quorum/auth/QuorumAuthServer.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/auth/QuorumAuthServer.java b/src/java/main/org/apache/zookeeper/server/quorum/auth/QuorumAuthServer.java
new file mode 100644
index 0000000..e9de8f0
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/quorum/auth/QuorumAuthServer.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum.auth;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.Socket;
+
+/**
+ * Interface for quorum server authentication mechanisms.
+ */
+public interface QuorumAuthServer {
+
+    /**
+     * Performs an authentication step for the given socket connection.
+     *
+     * @param sock
+     *            socket connection to other quorum peer
+     * @param din
+     *            stream used to read auth data send by the quorum learner
+     * @throws IOException if the server fails to authenticate connecting quorum learner
+     */
+    public void authenticate(Socket sock, DataInputStream din)
+            throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/5a29daed/src/java/main/org/apache/zookeeper/server/quorum/auth/SaslQuorumAuthLearner.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/auth/SaslQuorumAuthLearner.java b/src/java/main/org/apache/zookeeper/server/quorum/auth/SaslQuorumAuthLearner.java
new file mode 100644
index 0000000..31f4f55
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/quorum/auth/SaslQuorumAuthLearner.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum.auth;
+
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+
+import javax.security.auth.Subject;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginException;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.zookeeper.Login;
+import org.apache.zookeeper.SaslClientCallbackHandler;
+import org.apache.zookeeper.common.ZKConfig;
+import org.apache.zookeeper.server.quorum.QuorumAuthPacket;
+import org.apache.zookeeper.util.SecurityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SaslQuorumAuthLearner implements QuorumAuthLearner {
+    private static final Logger LOG = LoggerFactory
+            .getLogger(SaslQuorumAuthLearner.class);
+
+    private final Login learnerLogin;
+    private final boolean quorumRequireSasl;
+    private final String quorumServicePrincipal;
+
+    public SaslQuorumAuthLearner(boolean quorumRequireSasl,
+            String quorumServicePrincipal, String loginContext)
+                    throws SaslException {
+        this.quorumRequireSasl = quorumRequireSasl;
+        this.quorumServicePrincipal = quorumServicePrincipal;
+        try {
+            AppConfigurationEntry entries[] = Configuration
+                .getConfiguration()
+                .getAppConfigurationEntry(loginContext);
+            if (entries == null || entries.length == 0) {
+                throw new LoginException("SASL-authentication failed because"
+                                         + " the specified JAAS configuration "
+                                         + "section '" + loginContext
+                                         + "' could not be found.");
+            }
+            this.learnerLogin = new Login(loginContext,
+                                    new SaslClientCallbackHandler(null, "QuorumLearner"), new ZKConfig());
+            this.learnerLogin.startThreadIfNeeded();
+        } catch (LoginException e) {
+            throw new SaslException("Failed to initialize authentication mechanism using SASL", e);
+        }
+    }
+
+    @Override
+    public void authenticate(Socket sock, String hostName) throws IOException {
+        if (!quorumRequireSasl) { // let it through, we don't require auth
+            LOG.info("Skipping SASL authentication as {}={}",
+                    QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED,
+                    quorumRequireSasl);
+            return;
+        }
+        SaslClient sc = null;
+        String principalConfig = SecurityUtils
+                .getServerPrincipal(quorumServicePrincipal, hostName);
+        try {
+            DataOutputStream dout = new DataOutputStream(
+                    sock.getOutputStream());
+            DataInputStream din = new DataInputStream(sock.getInputStream());
+            byte[] responseToken = new byte[0];
+            sc = SecurityUtils.createSaslClient(learnerLogin.getSubject(),
+                    principalConfig,
+                    QuorumAuth.QUORUM_SERVER_PROTOCOL_NAME,
+                    QuorumAuth.QUORUM_SERVER_SASL_DIGEST, LOG, "QuorumLearner");
+
+            if (sc.hasInitialResponse()) {
+                responseToken = createSaslToken(new byte[0], sc, learnerLogin);
+            }
+            send(dout, responseToken);
+            QuorumAuthPacket authPacket = receive(din);
+            QuorumAuth.Status qpStatus = QuorumAuth.Status
+                    .getStatus(authPacket.getStatus());
+            while (!sc.isComplete()) {
+                switch (qpStatus) {
+                case SUCCESS:
+                    responseToken = createSaslToken(authPacket.getToken(), sc,
+                            learnerLogin);
+                    // we're done; don't expect to send another BIND
+                    if (responseToken != null) {
+                        throw new SaslException("Protocol error: attempting to send response after completion");
+                    }
+                    break;
+                case IN_PROGRESS:
+                    responseToken = createSaslToken(authPacket.getToken(), sc,
+                            learnerLogin);
+                    send(dout, responseToken);
+                    authPacket = receive(din);
+                    qpStatus = QuorumAuth.Status
+                            .getStatus(authPacket.getStatus());
+                    break;
+                case ERROR:
+                    throw new SaslException(
+                            "Authentication failed against server addr: "
+                                    + sock.getRemoteSocketAddress());
+                default:
+                    LOG.warn("Unknown status:{}!", qpStatus);
+                    throw new SaslException(
+                            "Authentication failed against server addr: "
+                                    + sock.getRemoteSocketAddress());
+                }
+            }
+
+            // Validate status code at the end of authentication exchange.
+            checkAuthStatus(sock, qpStatus);
+        } finally {
+            if (sc != null) {
+                try {
+                    sc.dispose();
+                } catch (SaslException e) {
+                    LOG.error("SaslClient dispose() failed", e);
+                }
+            }
+        }
+        return;
+    }
+
+    private void checkAuthStatus(Socket sock, QuorumAuth.Status qpStatus)
+            throws SaslException {
+        if (qpStatus == QuorumAuth.Status.SUCCESS) {
+            LOG.info("Successfully completed the authentication using SASL. server addr: {}, status: {}",
+                    sock.getRemoteSocketAddress(), qpStatus);
+        } else {
+            throw new SaslException("Authentication failed against server addr: "
+                            + sock.getRemoteSocketAddress() + ", qpStatus: "
+                            + qpStatus);
+        }
+    }
+
+    private QuorumAuthPacket receive(DataInputStream din) throws IOException {
+        QuorumAuthPacket authPacket = new QuorumAuthPacket();
+        BinaryInputArchive bia = BinaryInputArchive.getArchive(din);
+        authPacket.deserialize(bia, QuorumAuth.QUORUM_AUTH_MESSAGE_TAG);
+        return authPacket;
+    }
+
+    private void send(DataOutputStream dout, byte[] response)
+            throws IOException {
+        QuorumAuthPacket authPacket;
+        BufferedOutputStream bufferedOutput = new BufferedOutputStream(dout);
+        BinaryOutputArchive boa = BinaryOutputArchive
+                .getArchive(bufferedOutput);
+        authPacket = QuorumAuth.createPacket(
+                QuorumAuth.Status.IN_PROGRESS, response);
+        boa.writeRecord(authPacket, QuorumAuth.QUORUM_AUTH_MESSAGE_TAG);
+        bufferedOutput.flush();
+    }
+
+    // TODO: need to consolidate the #createSaslToken() implementation between ZooKeeperSaslClient#createSaslToken().
+    private byte[] createSaslToken(final byte[] saslToken,
+            final SaslClient saslClient, final Login login)
+                    throws SaslException {
+        if (saslToken == null) {
+            throw new SaslException(
+                    "Error in authenticating with a Zookeeper Quorum member: the quorum member's saslToken is null.");
+        }
+        if (login.getSubject() != null) {
+            synchronized (login) {
+                try {
+                    final byte[] retval = Subject.doAs(login.getSubject(),
+                            new PrivilegedExceptionAction<byte[]>() {
+                                public byte[] run() throws SaslException {
+                                    LOG.debug("saslClient.evaluateChallenge(len="
+                                                    + saslToken.length + ")");
+                                    return saslClient.evaluateChallenge(saslToken);
+                                }
+                            });
+                    return retval;
+                } catch (PrivilegedActionException e) {
+                    String error = "An error: (" + e
+                            + ") occurred when evaluating Zookeeper Quorum Member's "
+                            + " received SASL token.";
+                    // Try to provide hints to use about what went wrong so they
+                    // can fix their configuration.
+                    // TODO: introspect about e: look for GSS information.
+                    final String UNKNOWN_SERVER_ERROR_TEXT = "(Mechanism level: Server not found in Kerberos database (7) - UNKNOWN_SERVER)";
+                    if (e.toString().indexOf(UNKNOWN_SERVER_ERROR_TEXT) > -1) {
+                        error += " This may be caused by Java's being unable to resolve the Zookeeper Quorum Member's"
+                                + " hostname correctly. You may want to try to adding"
+                                + " '-Dsun.net.spi.nameservice.provider.1=dns,sun' to your server's JVMFLAGS environment.";
+                    }
+                    LOG.error(error);
+                    throw new SaslException(error);
+                }
+            }
+        } else {
+            throw new SaslException(
+                    "Cannot make SASL token without subject defined. "
+                            + "For diagnosis, please look for WARNs and ERRORs in your log related to the Login class.");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/5a29daed/src/java/main/org/apache/zookeeper/server/quorum/auth/SaslQuorumAuthServer.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/auth/SaslQuorumAuthServer.java b/src/java/main/org/apache/zookeeper/server/quorum/auth/SaslQuorumAuthServer.java
new file mode 100644
index 0000000..fc5e3b6
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/quorum/auth/SaslQuorumAuthServer.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum.auth;
+
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.util.Set;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginException;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.zookeeper.Login;
+import org.apache.zookeeper.common.ZKConfig;
+import org.apache.zookeeper.server.quorum.QuorumAuthPacket;
+import org.apache.zookeeper.util.SecurityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SaslQuorumAuthServer implements QuorumAuthServer {
+
+    private static final Logger LOG = LoggerFactory
+            .getLogger(SaslQuorumAuthServer.class);
+
+    private final static int MAX_RETRIES = 5;
+    private final Login serverLogin;
+    private final boolean quorumRequireSasl;
+
+    public SaslQuorumAuthServer(boolean quorumRequireSasl, String loginContext, Set<String> authzHosts)
+            throws SaslException {
+        this.quorumRequireSasl = quorumRequireSasl;
+        try {
+            AppConfigurationEntry entries[] = Configuration.getConfiguration()
+                    .getAppConfigurationEntry(loginContext);
+            if (entries == null || entries.length == 0) {
+                throw new LoginException("SASL-authentication failed"
+                        + " because the specified JAAS configuration "
+                        + "section '" + loginContext + "' could not be found.");
+            }
+            SaslQuorumServerCallbackHandler saslServerCallbackHandler = new SaslQuorumServerCallbackHandler(
+                    Configuration.getConfiguration(), loginContext, authzHosts);
+            serverLogin = new Login(loginContext, saslServerCallbackHandler, new ZKConfig());
+            serverLogin.startThreadIfNeeded();
+        } catch (Throwable e) {
+            throw new SaslException(
+                    "Failed to initialize authentication mechanism using SASL",
+                    e);
+        }
+    }
+
+    @Override
+    public void authenticate(Socket sock, DataInputStream din)
+            throws SaslException {
+        DataOutputStream dout = null;
+        SaslServer ss = null;
+        try {
+            if (!QuorumAuth.nextPacketIsAuth(din)) {
+                if (quorumRequireSasl) {
+                    throw new SaslException("Learner not trying to authenticate"
+                                            + " and authentication is required");
+                } else {
+                    // let it through, we don't require auth
+                    return;
+                }
+            }
+
+            byte[] token = receive(din);
+            int tries = 0;
+            dout = new DataOutputStream(sock.getOutputStream());
+            byte[] challenge = null;
+            ss = SecurityUtils.createSaslServer(serverLogin.getSubject(),
+                    QuorumAuth.QUORUM_SERVER_PROTOCOL_NAME,
+                    QuorumAuth.QUORUM_SERVER_SASL_DIGEST, serverLogin.callbackHandler,
+                    LOG);
+            while (!ss.isComplete()) {
+                challenge = ss.evaluateResponse(token);
+                if (!ss.isComplete()) {
+                    // limited number of retries.
+                    if (++tries > MAX_RETRIES) {
+                        send(dout, challenge, QuorumAuth.Status.ERROR);
+                        LOG.warn("Failed to authenticate using SASL, server addr: {}, retries={} exceeded.",
+                                sock.getRemoteSocketAddress(), tries);
+                        break;
+                    }
+                    send(dout, challenge, QuorumAuth.Status.IN_PROGRESS);
+                    token = receive(din);
+                }
+            }
+            // Authentication exchange has completed
+            if (ss.isComplete()) {
+                send(dout, challenge, QuorumAuth.Status.SUCCESS);
+                LOG.info("Successfully completed the authentication using SASL. learner addr: {}",
+                        sock.getRemoteSocketAddress());
+            }
+        } catch (Exception e) {
+            try {
+                if (dout != null) {
+                    // send error message to the learner
+                    send(dout, new byte[0], QuorumAuth.Status.ERROR);
+                }
+            } catch (IOException ioe) {
+                LOG.warn("Exception while sending failed status", ioe);
+            }
+            // If sasl is not required, when a server initializes a
+            // connection it will try to log in, but it will also
+            // accept connections that do not start with a sasl
+            // handshake.
+            if (quorumRequireSasl) {
+                LOG.error("Failed to authenticate using SASL", e);
+                throw new SaslException(
+                        "Failed to authenticate using SASL: " + e.getMessage());
+            } else {
+                LOG.warn("Failed to authenticate using SASL", e);
+                LOG.warn("Maintaining learner connection despite SASL authentication failure."
+                                + " server addr: {}, {}: {}",
+                        new Object[] { sock.getRemoteSocketAddress(),
+                                QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED,
+                                quorumRequireSasl });
+                return; // let it through, we don't require auth
+            }
+        } finally {
+            if (ss != null) {
+                try {
+                    ss.dispose();
+                } catch (SaslException e) {
+                    LOG.error("SaslServer dispose() failed", e);
+                }
+            }
+        }
+        return;
+    }
+
+    private byte[] receive(DataInputStream din) throws IOException {
+        QuorumAuthPacket authPacket = new QuorumAuthPacket();
+        BinaryInputArchive bia = BinaryInputArchive.getArchive(din);
+        authPacket.deserialize(bia, QuorumAuth.QUORUM_AUTH_MESSAGE_TAG);
+        return authPacket.getToken();
+    }
+
+    private void send(DataOutputStream dout, byte[] challenge,
+            QuorumAuth.Status s) throws IOException {
+        BufferedOutputStream bufferedOutput = new BufferedOutputStream(dout);
+        BinaryOutputArchive boa = BinaryOutputArchive
+                .getArchive(bufferedOutput);
+        QuorumAuthPacket authPacket;
+        if (challenge == null && s != QuorumAuth.Status.SUCCESS) {
+            authPacket = QuorumAuth.createPacket(
+                    QuorumAuth.Status.IN_PROGRESS, null);
+        } else {
+            authPacket = QuorumAuth.createPacket(s, challenge);
+        }
+
+        boa.writeRecord(authPacket, QuorumAuth.QUORUM_AUTH_MESSAGE_TAG);
+        bufferedOutput.flush();
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/5a29daed/src/java/main/org/apache/zookeeper/server/quorum/auth/SaslQuorumServerCallbackHandler.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/auth/SaslQuorumServerCallbackHandler.java b/src/java/main/org/apache/zookeeper/server/quorum/auth/SaslQuorumServerCallbackHandler.java
new file mode 100644
index 0000000..3e71bb1
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/quorum/auth/SaslQuorumServerCallbackHandler.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.quorum.auth;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is used by the SASL mechanisms to get further information to complete
+ * the authentication. For example, a SASL mechanism might use this callback
+ * handler to do verification operation. This is used by the QuorumServer to
+ * perform the mutual quorum peer authentication.
+ */
+public class SaslQuorumServerCallbackHandler implements CallbackHandler {
+    private static final String USER_PREFIX = "user_";
+    private static final Logger LOG = LoggerFactory.getLogger(SaslQuorumServerCallbackHandler.class);
+
+    private String userName;
+    private final Map<String,String> credentials = new HashMap<String,String>();
+    private final Set<String> authzHosts;
+
+    public SaslQuorumServerCallbackHandler(Configuration configuration,
+            String serverSection, Set<String> authzHosts) throws IOException {
+        AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(serverSection);
+
+        if (configurationEntries == null) {
+            String errorMessage = "Could not find a '" + serverSection + "' entry in this configuration: Server cannot start.";
+            LOG.error(errorMessage);
+            throw new IOException(errorMessage);
+        }
+        credentials.clear();
+        for(AppConfigurationEntry entry: configurationEntries) {
+            Map<String,?> options = entry.getOptions();
+            // Populate DIGEST-MD5 user -> password map with JAAS configuration entries from the "QuorumServer" section.
+            // Usernames are distinguished from other options by prefixing the username with a "user_" prefix.
+            for(Map.Entry<String, ?> pair : options.entrySet()) {
+                String key = pair.getKey();
+                if (key.startsWith(USER_PREFIX)) {
+                    String userName = key.substring(USER_PREFIX.length());
+                    credentials.put(userName,(String)pair.getValue());
+                }
+            }
+        }
+
+        // authorized host lists
+        this.authzHosts = authzHosts;
+    }
+
+    public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
+        for (Callback callback : callbacks) {
+            if (callback instanceof NameCallback) {
+                handleNameCallback((NameCallback) callback);
+            } else if (callback instanceof PasswordCallback) {
+                handlePasswordCallback((PasswordCallback) callback);
+            } else if (callback instanceof RealmCallback) {
+                handleRealmCallback((RealmCallback) callback);
+            } else if (callback instanceof AuthorizeCallback) {
+                handleAuthorizeCallback((AuthorizeCallback) callback);
+            }
+        }
+    }
+
+    private void handleNameCallback(NameCallback nc) {
+        // check to see if this user is in the user password database.
+        if (credentials.get(nc.getDefaultName()) == null) {
+            LOG.warn("User '{}' not found in list of DIGEST-MD5 authenticateable users.",
+                    nc.getDefaultName());
+            return;
+        }
+        nc.setName(nc.getDefaultName());
+        userName = nc.getDefaultName();
+    }
+
+    private void handlePasswordCallback(PasswordCallback pc) {
+        if (credentials.containsKey(userName) ) {
+            pc.setPassword(credentials.get(userName).toCharArray());
+        } else {
+            LOG.warn("No password found for user: {}", userName);
+        }
+    }
+
+    private void handleRealmCallback(RealmCallback rc) {
+        LOG.debug("QuorumLearner supplied realm: {}", rc.getDefaultText());
+        rc.setText(rc.getDefaultText());
+    }
+
+    private void handleAuthorizeCallback(AuthorizeCallback ac) {
+        String authenticationID = ac.getAuthenticationID();
+        String authorizationID = ac.getAuthorizationID();
+
+        boolean authzFlag = false;
+        // 1. Matches authenticationID and authorizationID
+        authzFlag = authenticationID.equals(authorizationID);
+
+        // 2. Verify whether the connecting host is present in authorized hosts.
+        // If not exists, then connecting peer is not authorized to join the
+        // ensemble and will reject it.
+        if (authzFlag) {
+            String[] components = authorizationID.split("[/@]");
+            if (components.length == 3) {
+                authzFlag = authzHosts.contains(components[1]);
+            }
+            if (!authzFlag) {
+                LOG.error("SASL authorization completed, {} is not authorized to connect",
+                        components[1]);
+            }
+        }
+
+        // Sets authorization flag
+        ac.setAuthorized(authzFlag);
+        if (ac.isAuthorized()) {
+            ac.setAuthorizedID(authorizationID);
+            LOG.info("Successfully authenticated learner: authenticationID={};  authorizationID={}.",
+                    authenticationID, authorizationID);
+        }
+        LOG.debug("SASL authorization completed, authorized flag set to {}", ac.isAuthorized());
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/5a29daed/src/java/main/org/apache/zookeeper/util/SecurityUtils.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/util/SecurityUtils.java b/src/java/main/org/apache/zookeeper/util/SecurityUtils.java
new file mode 100644
index 0000000..67484e4
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/util/SecurityUtils.java
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.util;
+
+import java.security.Principal;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+
+import javax.security.auth.Subject;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+import org.apache.zookeeper.SaslClientCallbackHandler;
+import org.apache.zookeeper.server.auth.KerberosName;
+import org.ietf.jgss.GSSContext;
+import org.ietf.jgss.GSSCredential;
+import org.ietf.jgss.GSSException;
+import org.ietf.jgss.GSSManager;
+import org.ietf.jgss.GSSName;
+import org.ietf.jgss.Oid;
+import org.slf4j.Logger;
+
+public final class SecurityUtils {
+
+    public static final String QUORUM_HOSTNAME_PATTERN = "_HOST";
+
+    /**
+     * Create an instance of a SaslClient. It will return null if there is an exception.
+     *
+     * @param subject subject
+     * @param servicePrincipal principal
+     * @param protocol name of the protocol for which the authentication is being performed
+     * @param serverName name of the server to authenticate to
+     * @param LOG logger
+     * @param entity can be either zookeeper client or quorum learner
+     *
+     * @return saslclient object
+     * @throws SaslException
+     */
+    public static SaslClient createSaslClient(final Subject subject,
+            final String servicePrincipal, final String protocol,
+            final String serverName, final Logger LOG, final String entity) throws SaslException {
+        SaslClient saslClient;
+        // Use subject.getPrincipals().isEmpty() as an indication of which SASL
+        // mechanism to use: if empty, use DIGEST-MD5; otherwise, use GSSAPI.
+        if (subject.getPrincipals().isEmpty()) {
+            // no principals: must not be GSSAPI: use DIGEST-MD5 mechanism
+            // instead.
+            LOG.info("{} will use DIGEST-MD5 as SASL mechanism.", entity);
+            String[] mechs = { "DIGEST-MD5" };
+            String username = (String) (subject.getPublicCredentials()
+                    .toArray()[0]);
+            String password = (String) (subject.getPrivateCredentials()
+                    .toArray()[0]);
+            // 'domain' parameter is hard-wired between the server and client
+            saslClient = Sasl.createSaslClient(mechs, username, protocol,
+                    serverName, null, new SaslClientCallbackHandler(password, entity));
+            return saslClient;
+        } else { // GSSAPI.
+            final Object[] principals = subject.getPrincipals().toArray();
+            // determine client principal from subject.
+            final Principal clientPrincipal = (Principal) principals[0];
+            boolean usingNativeJgss = Boolean
+                    .getBoolean("sun.security.jgss.native");
+            if (usingNativeJgss) {
+                // http://docs.oracle.com/javase/6/docs/technotes/guides/security/jgss/jgss-features.html
+                // """
+                // In addition, when performing operations as a particular
+                // Subject, e.g. Subject.doAs(...) or
+                // Subject.doAsPrivileged(...),
+                // the to-be-used GSSCredential should be added to Subject's
+                // private credential set. Otherwise, the GSS operations will
+                // fail since no credential is found.
+                // """
+                try {
+                    GSSManager manager = GSSManager.getInstance();
+                    Oid krb5Mechanism = new Oid("1.2.840.113554.1.2.2");
+                    GSSCredential cred = manager.createCredential(null,
+                            GSSContext.DEFAULT_LIFETIME, krb5Mechanism,
+                            GSSCredential.INITIATE_ONLY);
+                    subject.getPrivateCredentials().add(cred);
+                    LOG.debug("Added private credential to {} principal name: '{}'",
+                            entity, clientPrincipal);
+                } catch (GSSException ex) {
+                    LOG.warn("Cannot add private credential to subject; "
+                                    + "authentication at the server may fail", ex);
+                }
+            }
+            final KerberosName clientKerberosName = new KerberosName(
+                    clientPrincipal.getName());
+            // assume that server and client are in the same realm (by default;
+            // unless the system property
+            // "zookeeper.server.realm" is set).
+            String serverRealm = System.getProperty("zookeeper.server.realm",
+                    clientKerberosName.getRealm());
+            KerberosName serviceKerberosName = new KerberosName(
+                    servicePrincipal + "@" + serverRealm);
+            final String serviceName = serviceKerberosName.getServiceName();
+            final String serviceHostname = serviceKerberosName.getHostName();
+            final String clientPrincipalName = clientKerberosName.toString();
+            try {
+                saslClient = Subject.doAs(subject,
+                        new PrivilegedExceptionAction<SaslClient>() {
+                            public SaslClient run() throws SaslException {
+                                LOG.info("{} will use GSSAPI as SASL mechanism.", entity);
+                                String[] mechs = { "GSSAPI" };
+                                LOG.debug("creating sasl client: {}={};service={};serviceHostname={}",
+                                        new Object[] { entity, clientPrincipalName, serviceName, serviceHostname });
+                                SaslClient saslClient = Sasl.createSaslClient(
+                                        mechs, clientPrincipalName, serviceName,
+                                        serviceHostname, null,
+                                        new SaslClientCallbackHandler(null, entity));
+                                return saslClient;
+                            }
+                        });
+                return saslClient;
+            } catch (Exception e) {
+                LOG.error("Exception while trying to create SASL client", e);
+                return null;
+            }
+        }
+    }
+
+    /**
+     * Create an instance of a SaslServer. It will return null if there is an exception.
+     *
+     * @param subject subject
+     * @param protocol protocol
+     * @param serverName server name
+     * @param callbackHandler login callback handler
+     * @param LOG logger
+     * @return sasl server object
+     */
+    public static SaslServer createSaslServer(final Subject subject,
+            final String protocol, final String serverName,
+            final CallbackHandler callbackHandler, final Logger LOG) {
+        if (subject != null) {
+            // server is using a JAAS-authenticated subject: determine service
+            // principal name and hostname from zk server's subject.
+            if (subject.getPrincipals().size() > 0) {
+                try {
+                    final Object[] principals = subject.getPrincipals()
+                            .toArray();
+                    final Principal servicePrincipal = (Principal) principals[0];
+
+                    // e.g. servicePrincipalNameAndHostname :=
+                    // "zookeeper/myhost.foo.com@FOO.COM"
+                    final String servicePrincipalNameAndHostname = servicePrincipal
+                            .getName();
+
+                    int indexOf = servicePrincipalNameAndHostname.indexOf("/");
+
+                    // e.g. servicePrincipalName := "zookeeper"
+                    final String servicePrincipalName = servicePrincipalNameAndHostname
+                            .substring(0, indexOf);
+
+                    // e.g. serviceHostnameAndKerbDomain :=
+                    // "myhost.foo.com@FOO.COM"
+                    final String serviceHostnameAndKerbDomain = servicePrincipalNameAndHostname
+                            .substring(indexOf + 1,
+                                    servicePrincipalNameAndHostname.length());
+
+                    indexOf = serviceHostnameAndKerbDomain.indexOf("@");
+                    // e.g. serviceHostname := "myhost.foo.com"
+                    final String serviceHostname = serviceHostnameAndKerbDomain
+                            .substring(0, indexOf);
+
+                    // TODO: should depend on zoo.cfg specified mechs, but if
+                    // subject is non-null, it can be assumed to be GSSAPI.
+                    final String mech = "GSSAPI";
+
+                    LOG.debug("serviceHostname is '" + serviceHostname + "'");
+                    LOG.debug("servicePrincipalName is '" + servicePrincipalName
+                            + "'");
+                    LOG.debug("SASL mechanism(mech) is '" + mech + "'");
+
+                    boolean usingNativeJgss = Boolean
+                            .getBoolean("sun.security.jgss.native");
+                    if (usingNativeJgss) {
+                        // http://docs.oracle.com/javase/6/docs/technotes/guides/security/jgss/jgss-features.html
+                        // """
+                        // In addition, when performing operations as a
+                        // particular
+                        // Subject, e.g. Subject.doAs(...) or
+                        // Subject.doAsPrivileged(...), the to-be-used
+                        // GSSCredential should be added to Subject's
+                        // private credential set. Otherwise, the GSS operations
+                        // will fail since no credential is found.
+                        // """
+                        try {
+                            GSSManager manager = GSSManager.getInstance();
+                            Oid krb5Mechanism = new Oid("1.2.840.113554.1.2.2");
+                            GSSName gssName = manager.createName(
+                                    servicePrincipalName + "@"
+                                            + serviceHostname,
+                                    GSSName.NT_HOSTBASED_SERVICE);
+                            GSSCredential cred = manager.createCredential(
+                                    gssName, GSSContext.DEFAULT_LIFETIME,
+                                    krb5Mechanism, GSSCredential.ACCEPT_ONLY);
+                            subject.getPrivateCredentials().add(cred);
+                            LOG.debug("Added private credential to service principal name: '{}',"
+                                            + " GSSCredential name: {}", servicePrincipalName, cred.getName());
+                        } catch (GSSException ex) {
+                            LOG.warn("Cannot add private credential to subject; "
+                                            + "clients authentication may fail", ex);
+                        }
+                    }
+                    try {
+                        return Subject.doAs(subject,
+                                new PrivilegedExceptionAction<SaslServer>() {
+                                    public SaslServer run() {
+                                        try {
+                                            SaslServer saslServer;
+                                            saslServer = Sasl.createSaslServer(
+                                                    mech, servicePrincipalName,
+                                                    serviceHostname, null,
+                                                    callbackHandler);
+                                            return saslServer;
+                                        } catch (SaslException e) {
+                                            LOG.error("Zookeeper Server failed to create a SaslServer to interact with a client during session initiation: ", e);
+                                            return null;
+                                        }
+                                    }
+                                });
+                    } catch (PrivilegedActionException e) {
+                        // TODO: exit server at this point(?)
+                        LOG.error("Zookeeper Quorum member experienced a PrivilegedActionException exception while creating a SaslServer using a JAAS principal context:", e);
+                    }
+                } catch (IndexOutOfBoundsException e) {
+                    LOG.error("server principal name/hostname determination error: ", e);
+                }
+            } else {
+                // JAAS non-GSSAPI authentication: assuming and supporting only
+                // DIGEST-MD5 mechanism for now.
+                // TODO: use 'authMech=' value in zoo.cfg.
+                try {
+                    SaslServer saslServer = Sasl.createSaslServer("DIGEST-MD5",
+                            protocol, serverName, null, callbackHandler);
+                    return saslServer;
+                } catch (SaslException e) {
+                    LOG.error("Zookeeper Quorum member failed to create a SaslServer to interact with a client during session initiation", e);
+                }
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Convert Kerberos principal name pattern to valid Kerberos principal name.
+     * If the principal name contains hostname pattern "_HOST" then it replaces
+     * with the given hostname, which should be fully-qualified domain name.
+     *
+     * @param principalConfig
+     *            the Kerberos principal name conf value to convert
+     * @param hostname
+     *            the fully-qualified domain name used for substitution
+     * @return converted Kerberos principal name
+     */
+    public static String getServerPrincipal(String principalConfig,
+            String hostname) {
+        String[] components = getComponents(principalConfig);
+        if (components == null || components.length != 2
+                || !components[1].equals(QUORUM_HOSTNAME_PATTERN)) {
+            return principalConfig;
+        } else {
+            return replacePattern(components, hostname);
+        }
+    }
+
+    private static String[] getComponents(String principalConfig) {
+        if (principalConfig == null)
+            return null;
+        return principalConfig.split("[/]");
+    }
+
+    private static String replacePattern(String[] components, String hostname) {
+        return components[0] + "/" + hostname.toLowerCase();
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/5a29daed/src/java/test/config/findbugsExcludeFile.xml
----------------------------------------------------------------------
diff --git a/src/java/test/config/findbugsExcludeFile.xml b/src/java/test/config/findbugsExcludeFile.xml
index 7a7fa4b..b0be95e 100644
--- a/src/java/test/config/findbugsExcludeFile.xml
+++ b/src/java/test/config/findbugsExcludeFile.xml
@@ -75,6 +75,11 @@
   </Match>
 
   <Match>
+      <Class name="org.apache.zookeeper.server.quorum.QuorumAuthPacket" />
+      <Bug code="EI2, EI" />
+  </Match>
+
+  <Match>
     <Class name="org.apache.zookeeper.ClientCnxn"/>
       <Bug code="EI, EI2" />
   </Match>

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/5a29daed/src/java/test/data/kerberos/minikdc-krb5.conf
----------------------------------------------------------------------
diff --git a/src/java/test/data/kerberos/minikdc-krb5.conf b/src/java/test/data/kerberos/minikdc-krb5.conf
new file mode 100644
index 0000000..43ec7c4
--- /dev/null
+++ b/src/java/test/data/kerberos/minikdc-krb5.conf
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# This resource is originally from HDFS, see the similarly named files there
+# in case of bug fixing, history, etc.
+# Branch : trunk
+# Github Revision: 1d1ab587e4e92ce3aea4cb144811f69145cb3b33
+#
+[libdefaults]
+    default_realm = {0}
+    udp_preference_limit = 1
+
+[realms]
+    {0} = '{'
+        kdc = {1}:{2}
+    '}'
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/5a29daed/src/java/test/data/kerberos/minikdc.ldiff
----------------------------------------------------------------------
diff --git a/src/java/test/data/kerberos/minikdc.ldiff b/src/java/test/data/kerberos/minikdc.ldiff
new file mode 100644
index 0000000..20c8d77
--- /dev/null
+++ b/src/java/test/data/kerberos/minikdc.ldiff
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# This resource is originally from HDFS, see the similarly named files there
+# in case of bug fixing, history, etc.
+# Branch : trunk
+# Github Revision: 1d1ab587e4e92ce3aea4cb144811f69145cb3b33
+#
+dn: ou=users,dc=${0},dc=${1}
+objectClass: organizationalUnit
+objectClass: top
+ou: users
+
+dn: uid=krbtgt,ou=users,dc=${0},dc=${1}
+objectClass: top
+objectClass: person
+objectClass: inetOrgPerson
+objectClass: krb5principal
+objectClass: krb5kdcentry
+cn: KDC Service
+sn: Service
+uid: krbtgt
+userPassword: secret
+krb5PrincipalName: krbtgt/${2}.${3}@${2}.${3}
+krb5KeyVersionNumber: 0
+
+dn: uid=ldap,ou=users,dc=${0},dc=${1}
+objectClass: top
+objectClass: person
+objectClass: inetOrgPerson
+objectClass: krb5principal
+objectClass: krb5kdcentry
+cn: LDAP
+sn: Service
+uid: ldap
+userPassword: secret
+krb5PrincipalName: ldap/${4}@${2}.${3}
+krb5KeyVersionNumber: 0
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/5a29daed/src/java/test/org/apache/zookeeper/server/quorum/EphemeralNodeDeletionTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/EphemeralNodeDeletionTest.java b/src/java/test/org/apache/zookeeper/server/quorum/EphemeralNodeDeletionTest.java
index 4ec9a07..9edb4be 100644
--- a/src/java/test/org/apache/zookeeper/server/quorum/EphemeralNodeDeletionTest.java
+++ b/src/java/test/org/apache/zookeeper/server/quorum/EphemeralNodeDeletionTest.java
@@ -38,6 +38,8 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 
+import javax.security.sasl.SaslException;
+
 public class EphemeralNodeDeletionTest extends QuorumPeerTestBase {
     private static int SERVER_COUNT = 3;
     private MainThread[] mt = new MainThread[SERVER_COUNT];
@@ -181,6 +183,10 @@ public class EphemeralNodeDeletionTest extends QuorumPeerTestBase {
     static class CustomQuorumPeer extends QuorumPeer {
         private boolean injectError = false;
 
+        public CustomQuorumPeer() throws SaslException {
+
+        }
+
         @Override
         protected Follower makeFollower(FileTxnSnapLog logFactory)
                 throws IOException {
@@ -215,7 +221,7 @@ public class EphemeralNodeDeletionTest extends QuorumPeerTestBase {
 
     static class MockTestQPMain extends TestQPMain {
         @Override
-        protected QuorumPeer getQuorumPeer() {
+        protected QuorumPeer getQuorumPeer() throws SaslException {
             return new CustomQuorumPeer();
         }
     }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/5a29daed/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java b/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java
index 302fc09..bb5aa8f 100644
--- a/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java
+++ b/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java
@@ -113,7 +113,7 @@ public class FLEBackwardElectionRoundTest extends ZKTestCase {
          * Start mock server 1
          */
         QuorumPeer mockPeer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 1000, 2, 2);
-        cnxManagers[0] = new QuorumCnxManager(mockPeer);
+        cnxManagers[0] = mockPeer.createCnxnManager();
         cnxManagers[0].listener.start();
 
         cnxManagers[0].toSend(0l, initialMsg);
@@ -122,7 +122,7 @@ public class FLEBackwardElectionRoundTest extends ZKTestCase {
          * Start mock server 2
          */
         mockPeer = new QuorumPeer(peers, tmpdir[2], tmpdir[2], port[2], 3, 2, 1000, 2, 2);
-        cnxManagers[1] = new QuorumCnxManager(mockPeer);
+        cnxManagers[1] = mockPeer.createCnxnManager();
         cnxManagers[1].listener.start();
 
         cnxManagers[1].toSend(0l, initialMsg);

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/5a29daed/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java b/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java
index cc44243..6583f90 100644
--- a/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java
+++ b/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java
@@ -95,7 +95,7 @@ public class FLELostMessageTest extends ZKTestCase {
 
     void mockServer() throws InterruptedException, IOException {
         QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2);
-        cnxManager = new QuorumCnxManager(peer);
+        cnxManager = peer.createCnxnManager();
         cnxManager.listener.start();
 
         cnxManager.toSend(1l, FLETestUtils.createMsg(ServerState.LOOKING.ordinal(), 0, 0, 0));

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/5a29daed/src/java/test/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java b/src/java/test/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java
index 95e5e53..843c8aa 100644
--- a/src/java/test/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java
+++ b/src/java/test/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.net.Socket;
 import java.util.Iterator;
@@ -54,7 +55,7 @@ public class LearnerHandlerTest extends ZKTestCase {
         boolean threadStarted = false;
 
         MockLearnerHandler(Socket sock, Leader leader) throws IOException {
-            super(sock, leader);
+            super(sock, new BufferedInputStream(sock.getInputStream()), leader);
         }
 
         protected void startSendingPackets() {

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/5a29daed/src/java/test/org/apache/zookeeper/server/quorum/LearnerTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/LearnerTest.java b/src/java/test/org/apache/zookeeper/server/quorum/LearnerTest.java
index 85284f6..1793550 100644
--- a/src/java/test/org/apache/zookeeper/server/quorum/LearnerTest.java
+++ b/src/java/test/org/apache/zookeeper/server/quorum/LearnerTest.java
@@ -110,7 +110,7 @@ public class LearnerTest extends ZKTestCase {
         InetSocketAddress addr = new InetSocketAddress(1111);
 
         // we expect this to throw an IOException since we're faking socket connect errors every time
-        learner.connectToLeader(addr);
+        learner.connectToLeader(addr, "");
     }
     @Test
     public void connectionInitLimitTimeoutTest() throws Exception {
@@ -130,7 +130,7 @@ public class LearnerTest extends ZKTestCase {
 
         // we expect this to throw an IOException since we're faking socket connect errors every time
         try {
-            learner.connectToLeader(addr);
+            learner.connectToLeader(addr, "");
             Assert.fail("should have thrown IOException!");
         } catch (IOException e) {
             //good, wanted to see that, let's make sure we ran out of time

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/5a29daed/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
index 51f444b..0481a04 100644
--- a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
+++ b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
@@ -26,6 +26,10 @@ import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.FilenameFilter;
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 import java.util.Properties;
 
 import org.slf4j.Logger;
@@ -47,7 +51,7 @@ public class QuorumPeerTestBase extends ZKTestCase implements Watcher {
     protected static final Logger LOG = LoggerFactory
             .getLogger(QuorumPeerTestBase.class);
 
-    public static final int TIMEOUT = 3000;
+    public static final int TIMEOUT = 5000;
 
     public void process(WatchedEvent event) {
         // ignore for this test
@@ -72,6 +76,60 @@ public class QuorumPeerTestBase extends ZKTestCase implements Watcher {
 
         volatile TestQPMain main;
 
+        File baseDir;
+        private int myid;
+        private int clientPort;
+        private String quorumCfgSection;
+        private Map<String, String> otherConfigs;
+
+        public MainThread(int myid, int clientPort, String quorumCfgSection,
+                          Map<String, String> otherConfigs) throws IOException {
+            baseDir = ClientBase.createTmpDir();
+            this.myid = myid;
+            this.clientPort = clientPort;
+            this.quorumCfgSection = quorumCfgSection;
+            this.otherConfigs = otherConfigs;
+            LOG.info("id = " + myid + " tmpDir = " + baseDir + " clientPort = "
+                    + clientPort);
+            confFile = new File(baseDir, "zoo.cfg");
+
+            FileWriter fwriter = new FileWriter(confFile);
+            fwriter.write("tickTime=4000\n");
+            fwriter.write("initLimit=10\n");
+            fwriter.write("syncLimit=5\n");
+
+            tmpDir = new File(baseDir, "data");
+            if (!tmpDir.mkdir()) {
+                throw new IOException("Unable to mkdir " + tmpDir);
+            }
+
+            // Convert windows path to UNIX to avoid problems with "\"
+            String dir = tmpDir.toString();
+            String osname = java.lang.System.getProperty("os.name");
+            if (osname.toLowerCase().contains("windows")) {
+                dir = dir.replace('\\', '/');
+            }
+            fwriter.write("dataDir=" + dir + "\n");
+
+            fwriter.write("clientPort=" + clientPort + "\n");
+
+            // write extra configurations
+            Set<Entry<String, String>> entrySet = otherConfigs.entrySet();
+            for (Entry<String, String> entry : entrySet) {
+                fwriter.write(entry.getKey() + "=" + entry.getValue() + "\n");
+            }
+
+            fwriter.write(quorumCfgSection + "\n");
+            fwriter.flush();
+            fwriter.close();
+
+            File myidFile = new File(tmpDir, "myid");
+            fwriter = new FileWriter(myidFile);
+            fwriter.write(Integer.toString(myid));
+            fwriter.flush();
+            fwriter.close();
+        }
+
         public MainThread(int myid, String quorumCfgSection) throws IOException {
             this(myid, quorumCfgSection, true);
         }
@@ -87,11 +145,6 @@ public class QuorumPeerTestBase extends ZKTestCase implements Watcher {
             this(myid, UNSET_STATIC_CLIENTPORT, quorumCfgSection, writeDynamicConfigFile);
         }
 
-        public MainThread(int myid, int clientPort, String quorumCfgSection)
-                throws IOException {
-            this(myid, clientPort, JettyAdminServer.DEFAULT_PORT, quorumCfgSection, null, true);
-        }
-
         public MainThread(int myid, int clientPort, String quorumCfgSection, boolean writeDynamicConfigFile)
                 throws IOException {
             this(myid, clientPort, JettyAdminServer.DEFAULT_PORT, quorumCfgSection, null, writeDynamicConfigFile);
@@ -226,6 +279,12 @@ public class QuorumPeerTestBase extends ZKTestCase implements Watcher {
             fwriter.close();
         }
 
+        public MainThread(int myid, int clientPort, String quorumCfgSection)
+                throws IOException {
+            this(myid, clientPort, quorumCfgSection,
+                    new HashMap<String, String>());
+        }
+
         Thread currentThread;
 
         synchronized public void start() {
@@ -289,5 +348,31 @@ public class QuorumPeerTestBase extends ZKTestCase implements Watcher {
         public QuorumPeer getQuorumPeer() {
             return main.quorumPeer;
         }
+
+
+        public void deleteBaseDir() {
+            ClientBase.recursiveDelete(baseDir);
+        }
+
+        public int getMyid() {
+            return myid;
+        }
+
+        public int getClientPort() {
+            return clientPort;
+        }
+
+        public String getQuorumCfgSection() {
+            return quorumCfgSection;
+        }
+
+        public Map<String, String> getOtherConfigs() {
+            return otherConfigs;
+        }
+
+        public File getConfFile() {
+            return confFile;
+        }
+
     }
 }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/5a29daed/src/java/test/org/apache/zookeeper/server/quorum/RaceConditionTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/RaceConditionTest.java b/src/java/test/org/apache/zookeeper/server/quorum/RaceConditionTest.java
index c65a794..e96d273 100644
--- a/src/java/test/org/apache/zookeeper/server/quorum/RaceConditionTest.java
+++ b/src/java/test/org/apache/zookeeper/server/quorum/RaceConditionTest.java
@@ -42,6 +42,8 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.security.sasl.SaslException;
+
 /**
  * This test class contains test cases related to race condition in complete
  * ZooKeeper
@@ -156,6 +158,9 @@ public class RaceConditionTest extends QuorumPeerTestBase {
     private static class CustomQuorumPeer extends QuorumPeer {
         private boolean stopPing;
 
+        public CustomQuorumPeer() throws SaslException {
+        }
+
         public void setStopPing(boolean stopPing) {
             this.stopPing = stopPing;
         }
@@ -239,7 +244,7 @@ public class RaceConditionTest extends QuorumPeerTestBase {
     private static class MockTestQPMain extends TestQPMain {
 
         @Override
-        protected QuorumPeer getQuorumPeer() {
+        protected QuorumPeer getQuorumPeer() throws SaslException {
             return new CustomQuorumPeer();
         }
     }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/5a29daed/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java b/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
index 778ea1e..32706d1 100644
--- a/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
+++ b/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
@@ -23,6 +23,7 @@ import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 
+import java.io.BufferedInputStream;
 import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
@@ -385,7 +386,9 @@ public class Zab1_0Test extends ZKTestCase {
                 Thread.sleep(20);
             }
             
-            LearnerHandler lh = new LearnerHandler(leaderSocket, leader);
+            LearnerHandler lh = new LearnerHandler(leaderSocket,
+                    new BufferedInputStream(leaderSocket.getInputStream()),
+                    leader);
             lh.start();
             leaderSocket.setSoTimeout(4000);
 
@@ -453,8 +456,10 @@ public class Zab1_0Test extends ZKTestCase {
             while(leader.cnxAcceptor == null || !leader.cnxAcceptor.isAlive()) {
                 Thread.sleep(20);
             }
-            
-            LearnerHandler lh = new LearnerHandler(leaderSocket, leader);
+
+            LearnerHandler lh = new LearnerHandler(leaderSocket,
+                    new BufferedInputStream(leaderSocket.getInputStream()),
+                    leader);
             lh.start();
             leaderSocket.setSoTimeout(4000);
 
@@ -476,7 +481,6 @@ public class Zab1_0Test extends ZKTestCase {
         }
     }
     
-    
     public void testFollowerConversation(FollowerConversation conversation) throws Exception {
         File tmpDir = File.createTempFile("test", "dir", testData);
         tmpDir.delete();
@@ -490,8 +494,10 @@ public class Zab1_0Test extends ZKTestCase {
             peer.follower = follower;
             
             ServerSocket ss =
-                new ServerSocket(0, 50, InetAddress.getByName("127.0.0.1"));
-            follower.setLeaderSocketAddress((InetSocketAddress)ss.getLocalSocketAddress());
+                    new ServerSocket(0, 50, InetAddress.getByName("127.0.0.1"));
+            QuorumServer leaderQS = new QuorumServer(1,
+                    (InetSocketAddress) ss.getLocalSocketAddress());
+            follower.setLeaderQuorumServer(leaderQS);
             final Follower followerForThread = follower;
             
             followerThread = new Thread() {
@@ -544,7 +550,8 @@ public class Zab1_0Test extends ZKTestCase {
 
             ServerSocket ss =
                 new ServerSocket(0, 50, InetAddress.getByName("127.0.0.1"));
-            observer.setLeaderSocketAddress((InetSocketAddress)ss.getLocalSocketAddress());
+            QuorumServer leaderQS = new QuorumServer(1, (InetSocketAddress) ss.getLocalSocketAddress());
+            observer.setLeaderQuorumServer(leaderQS);
             final Observer observerForThread = observer;
 
             observerThread = new Thread() {
@@ -1271,14 +1278,14 @@ public class Zab1_0Test extends ZKTestCase {
             super(self, zk);
         }
 
-        InetSocketAddress leaderAddr;
-        public void setLeaderSocketAddress(InetSocketAddress addr) {
-            leaderAddr = addr;
+        QuorumServer leaderQuorumServer;
+        public void setLeaderQuorumServer(QuorumServer quorumServer) {
+            leaderQuorumServer = quorumServer;
         }
         
         @Override
-        protected InetSocketAddress findLeader() {
-            return leaderAddr;
+        protected QuorumServer findLeader() {
+            return leaderQuorumServer;
         }
     }
     private ConversableFollower createFollower(File tmpDir, QuorumPeer peer)
@@ -1297,14 +1304,14 @@ public class Zab1_0Test extends ZKTestCase {
             super(self, zk);
         }
 
-        InetSocketAddress leaderAddr;
-        public void setLeaderSocketAddress(InetSocketAddress addr) {
-            leaderAddr = addr;
+        QuorumServer leaderQuorumServer;
+        public void setLeaderQuorumServer(QuorumServer quorumServer) {
+            leaderQuorumServer = quorumServer;
         }
 
         @Override
-        protected InetSocketAddress findLeader() {
-            return leaderAddr;
+        protected QuorumServer findLeader() {
+            return leaderQuorumServer;
         }
     }
 
@@ -1320,7 +1327,7 @@ public class Zab1_0Test extends ZKTestCase {
 
     private QuorumPeer createQuorumPeer(File tmpDir) throws IOException, FileNotFoundException {
         HashMap<Long, QuorumServer> peers = new HashMap<Long, QuorumServer>();
-        QuorumPeer peer = new QuorumPeer();
+        QuorumPeer peer = QuorumPeer.testingQuorumPeer();
         peer.syncLimit = SYNC_LIMIT;
         peer.initLimit = 2;
         peer.tickTime = 2000;
@@ -1372,7 +1379,7 @@ public class Zab1_0Test extends ZKTestCase {
                     new ErrorTxn(1), zxid));
             logFactory.commit();
             ZKDatabase zkDb = new ZKDatabase(logFactory);
-            QuorumPeer peer = new QuorumPeer();
+            QuorumPeer peer = QuorumPeer.testingQuorumPeer();
             peer.setZKDatabase(zkDb);
             peer.setTxnFactory(logFactory);
             peer.getLastLoggedZxid();

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/5a29daed/src/java/test/org/apache/zookeeper/server/quorum/auth/KerberosSecurityTestcase.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/auth/KerberosSecurityTestcase.java b/src/java/test/org/apache/zookeeper/server/quorum/auth/KerberosSecurityTestcase.java
new file mode 100644
index 0000000..9617c70
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/quorum/auth/KerberosSecurityTestcase.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.quorum.auth;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
+
+/*
+ * This code is originally from HDFS, see the similarly named file there
+ * in case of bug fixing, history, etc.
+ *
+ * Branch : trunk
+ * Github Revision: 1d1ab587e4e92ce3aea4cb144811f69145cb3b33
+ */
+
+/**
+ * KerberosSecurityTestcase provides a base class for using MiniKdc with other
+ * test cases. KerberosSecurityTestcase starts the MiniKdc (@Before) before
+ * running tests, and stop the MiniKdc (@After) after the testcases, using
+ * default settings (working dir and kdc configurations).
+ * <p>
+ * Users can directly inherit this class and implement their own test functions
+ * using the default settings, or override functions getTestDir() and
+ * createMiniKdcConf() to provide new settings.
+ */
+public class KerberosSecurityTestcase extends QuorumAuthTestBase {
+    private static MiniKdc kdc;
+    private static File workDir;
+    private static Properties conf;
+
+    @BeforeClass
+    public static void setUpSasl() throws Exception {
+        startMiniKdc();
+    }
+
+    @AfterClass
+    public static void tearDownSasl() throws Exception {
+        stopMiniKdc();
+        FileUtils.deleteQuietly(workDir);
+    }
+
+    public static void startMiniKdc() throws Exception {
+        createTestDir();
+        createMiniKdcConf();
+
+        kdc = new MiniKdc(conf, workDir);
+        kdc.start();
+    }
+
+    /**
+     * Create a working directory, it should be the build directory. Under this
+     * directory an ApacheDS working directory will be created, this directory
+     * will be deleted when the MiniKdc stops.
+     *
+     * @throws IOException
+     */
+    public static void createTestDir() throws IOException {
+        workDir = createTmpDir(
+                new File(System.getProperty("build.test.dir", "build")));
+    }
+
+    static File createTmpDir(File parentDir) throws IOException {
+        File tmpFile = File.createTempFile("test", ".junit", parentDir);
+        // don't delete tmpFile - this ensures we don't attempt to create
+        // a tmpDir with a duplicate name
+        File tmpDir = new File(tmpFile + ".dir");
+        // never true if tmpfile does it's job
+        Assert.assertFalse(tmpDir.exists());
+        Assert.assertTrue(tmpDir.mkdirs());
+        return tmpDir;
+    }
+
+    /**
+     * Create a Kdc configuration
+     */
+    public static void createMiniKdcConf() {
+        conf = MiniKdc.createConf();
+    }
+
+    public static void stopMiniKdc() {
+        if (kdc != null) {
+            kdc.stop();
+        }
+    }
+
+    public static MiniKdc getKdc() {
+        return kdc;
+    }
+
+    public static File getWorkDir() {
+        return workDir;
+    }
+
+    public static Properties getConf() {
+        return conf;
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/5a29daed/src/java/test/org/apache/zookeeper/server/quorum/auth/KerberosTestUtils.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/auth/KerberosTestUtils.java b/src/java/test/org/apache/zookeeper/server/quorum/auth/KerberosTestUtils.java
new file mode 100644
index 0000000..4a75f83
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/quorum/auth/KerberosTestUtils.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum.auth;
+
+import java.io.File;
+import java.util.UUID;
+
+import org.apache.zookeeper.util.SecurityUtils;
+
+public class KerberosTestUtils {
+    private static String keytabFile = new File(System.getProperty("test.dir", "build"), UUID.randomUUID().toString())
+            .getAbsolutePath();
+
+    public static String getRealm() {
+        return "EXAMPLE.COM";
+    }
+
+    public static String getLearnerPrincipal() {
+        return "learner@EXAMPLE.COM";
+    }
+
+    public static String getServerPrincipal() {
+        return "zkquorum/localhost@EXAMPLE.COM";
+    }
+
+    public static String getHostLearnerPrincipal() {
+        return "learner/_HOST@EXAMPLE.COM";
+    }
+
+    public static String getHostServerPrincipal() {
+        return "zkquorum/_HOST@EXAMPLE.COM";
+    }
+
+    public static String getHostNamedLearnerPrincipal(String myHostname) {
+        return "learner/" + myHostname + "@EXAMPLE.COM";
+    }
+
+    public static String getKeytabFile() {
+        return keytabFile;
+    }
+
+    public static String replaceHostPattern(String principal) {
+        String[] components = principal.split("[/@]");
+        if (components == null || components.length < 2
+                || !components[1].equals(SecurityUtils.QUORUM_HOSTNAME_PATTERN)) {
+            return principal;
+        } else {
+            return replacePattern(components, "localhost");
+        }
+    }
+
+    public static String replacePattern(String[] components, String hostname) {
+        if (components.length == 3) {
+            return components[0] + "/" + hostname.toLowerCase() + "@"
+                    + components[2];
+        } else {
+            return components[0] + "/" + hostname.toLowerCase();
+        }
+    }
+}