You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ph...@apache.org on 2017/11/27 23:11:30 UTC

[1/3] zookeeper git commit: ZOOKEEPER-2935: [QP MutualAuth]: Port ZOOKEEPER-1045 implementation from branch-3.5 to trunk

Repository: zookeeper
Updated Branches:
  refs/heads/master 748585d20 -> 75411ab34


http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/test/org/apache/zookeeper/server/quorum/auth/MiniKdc.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/auth/MiniKdc.java b/src/java/test/org/apache/zookeeper/server/quorum/auth/MiniKdc.java
new file mode 100644
index 0000000..4afef41
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/quorum/auth/MiniKdc.java
@@ -0,0 +1,418 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum.auth;
+import org.apache.commons.io.Charsets;
+import org.apache.kerby.kerberos.kerb.KrbException;
+import org.apache.kerby.kerberos.kerb.server.KdcConfigKey;
+import org.apache.kerby.kerberos.kerb.server.SimpleKdcServer;
+import org.apache.kerby.util.IOUtil;
+import org.apache.kerby.util.NetworkUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Mini KDC based on Apache Directory Server that can be embedded in testcases
+ * or used from command line as a standalone KDC.
+ * <p>
+ * <b>From within testcases:</b>
+ * <p>
+ * MiniKdc sets one System property when started and un-set when stopped:
+ * <ul>
+ *   <li>sun.security.krb5.debug: set to the debug value provided in the
+ *   configuration</li>
+ * </ul>
+ * Because of this, multiple MiniKdc instances cannot be started in parallel.
+ * For example, running testcases in parallel that start a KDC each. To
+ * accomplish this a single MiniKdc should be used for all testcases running
+ * in parallel.
+ * <p>
+ * MiniKdc default configuration values are:
+ * <ul>
+ *   <li>org.name=EXAMPLE (used to create the REALM)</li>
+ *   <li>org.domain=COM (used to create the REALM)</li>
+ *   <li>kdc.bind.address=localhost</li>
+ *   <li>kdc.port=0 (ephemeral port)</li>
+ *   <li>instance=DefaultKrbServer</li>
+ *   <li>max.ticket.lifetime=86400000 (1 day)</li>
+ *   <li>max.renewable.lifetime=604800000 (7 days)</li>
+ *   <li>transport=TCP</li>
+ *   <li>debug=false</li>
+ * </ul>
+ * The generated krb5.conf forces TCP connections.
+ */
+/*
+ * This code is originally from HDFS, see the file name MiniKdc there
+ * in case of bug fixing, history, etc.
+ *
+ * Branch : trunk
+ * Github Revision: 916140604ffef59466ba30832478311d3e6249bd
+ */
+public class MiniKdc {
+
+    public static final String JAVA_SECURITY_KRB5_CONF =
+            "java.security.krb5.conf";
+    public static final String SUN_SECURITY_KRB5_DEBUG =
+            "sun.security.krb5.debug";
+
+    public static void main(String[] args) throws Exception {
+        if (args.length < 4) {
+            System.out.println("Arguments: <WORKDIR> <MINIKDCPROPERTIES> " +
+                    "<KEYTABFILE> [<PRINCIPALS>]+");
+            System.exit(1);
+        }
+        File workDir = new File(args[0]);
+        if (!workDir.exists()) {
+            throw new RuntimeException("Specified work directory does not exists: "
+                    + workDir.getAbsolutePath());
+        }
+        Properties conf = createConf();
+        File file = new File(args[1]);
+        if (!file.exists()) {
+            throw new RuntimeException("Specified configuration does not exists: "
+                    + file.getAbsolutePath());
+        }
+        Properties userConf = new Properties();
+        InputStreamReader r = null;
+        try {
+            r = new InputStreamReader(new FileInputStream(file), Charsets.UTF_8);
+            userConf.load(r);
+        } finally {
+            if (r != null) {
+                r.close();
+            }
+        }
+        for (Map.Entry<?, ?> entry : userConf.entrySet()) {
+            conf.put(entry.getKey(), entry.getValue());
+        }
+        final MiniKdc miniKdc = new MiniKdc(conf, workDir);
+        miniKdc.start();
+        File krb5conf = new File(workDir, "krb5.conf");
+        if (miniKdc.getKrb5conf().renameTo(krb5conf)) {
+            File keytabFile = new File(args[2]).getAbsoluteFile();
+            String[] principals = new String[args.length - 3];
+            System.arraycopy(args, 3, principals, 0, args.length - 3);
+            miniKdc.createPrincipal(keytabFile, principals);
+            System.out.println();
+            System.out.println("Standalone MiniKdc Running");
+            System.out.println("---------------------------------------------------");
+            System.out.println("  Realm           : " + miniKdc.getRealm());
+            System.out.println("  Running at      : " + miniKdc.getHost() + ":" +
+                    miniKdc.getHost());
+            System.out.println("  krb5conf        : " + krb5conf);
+            System.out.println();
+            System.out.println("  created keytab  : " + keytabFile);
+            System.out.println("  with principals : " + Arrays.asList(principals));
+            System.out.println();
+            System.out.println(" Do <CTRL-C> or kill <PID> to stop it");
+            System.out.println("---------------------------------------------------");
+            System.out.println();
+            Runtime.getRuntime().addShutdownHook(new Thread() {
+                @Override
+                public void run() {
+                    miniKdc.stop();
+                }
+            });
+        } else {
+            throw new RuntimeException("Cannot rename KDC's krb5conf to "
+                    + krb5conf.getAbsolutePath());
+        }
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(MiniKdc.class);
+
+    public static final String ORG_NAME = "org.name";
+    public static final String ORG_DOMAIN = "org.domain";
+    public static final String KDC_BIND_ADDRESS = "kdc.bind.address";
+    public static final String KDC_PORT = "kdc.port";
+    public static final String INSTANCE = "instance";
+    public static final String MAX_TICKET_LIFETIME = "max.ticket.lifetime";
+    public static final String MAX_RENEWABLE_LIFETIME = "max.renewable.lifetime";
+    public static final String TRANSPORT = "transport";
+    public static final String DEBUG = "debug";
+
+    private static final Set<String> PROPERTIES = new HashSet<String>();
+    private static final Properties DEFAULT_CONFIG = new Properties();
+
+    static {
+        PROPERTIES.add(ORG_NAME);
+        PROPERTIES.add(ORG_DOMAIN);
+        PROPERTIES.add(KDC_BIND_ADDRESS);
+        PROPERTIES.add(KDC_BIND_ADDRESS);
+        PROPERTIES.add(KDC_PORT);
+        PROPERTIES.add(INSTANCE);
+        PROPERTIES.add(TRANSPORT);
+        PROPERTIES.add(MAX_TICKET_LIFETIME);
+        PROPERTIES.add(MAX_RENEWABLE_LIFETIME);
+
+        DEFAULT_CONFIG.setProperty(KDC_BIND_ADDRESS, "localhost");
+        DEFAULT_CONFIG.setProperty(KDC_PORT, "0");
+        DEFAULT_CONFIG.setProperty(INSTANCE, "DefaultKrbServer");
+        DEFAULT_CONFIG.setProperty(ORG_NAME, "EXAMPLE");
+        DEFAULT_CONFIG.setProperty(ORG_DOMAIN, "COM");
+        DEFAULT_CONFIG.setProperty(TRANSPORT, "TCP");
+        DEFAULT_CONFIG.setProperty(MAX_TICKET_LIFETIME, "86400000");
+        DEFAULT_CONFIG.setProperty(MAX_RENEWABLE_LIFETIME, "604800000");
+        DEFAULT_CONFIG.setProperty(DEBUG, "false");
+    }
+
+    /**
+     * Convenience method that returns MiniKdc default configuration.
+     * <p>
+     * The returned configuration is a copy, it can be customized before using
+     * it to create a MiniKdc.
+     * @return a MiniKdc default configuration.
+     */
+    public static Properties createConf() {
+        return (Properties) DEFAULT_CONFIG.clone();
+    }
+
+    private Properties conf;
+    private SimpleKdcServer simpleKdc;
+    private int port;
+    private String realm;
+    private File workDir;
+    private File krb5conf;
+    private String transport;
+    private boolean krb5Debug;
+
+    public void setTransport(String transport) {
+        this.transport = transport;
+    }
+    /**
+     * Creates a MiniKdc.
+     *
+     * @param conf MiniKdc configuration.
+     * @param workDir working directory, it should be the build directory. Under
+     * this directory an ApacheDS working directory will be created, this
+     * directory will be deleted when the MiniKdc stops.
+     * @throws Exception thrown if the MiniKdc could not be created.
+     */
+    public MiniKdc(Properties conf, File workDir) throws Exception {
+        if (!conf.keySet().containsAll(PROPERTIES)) {
+            Set<String> missingProperties = new HashSet<String>(PROPERTIES);
+            missingProperties.removeAll(conf.keySet());
+            throw new IllegalArgumentException("Missing configuration properties: "
+                    + missingProperties);
+        }
+        this.workDir = new File(workDir, Long.toString(System.currentTimeMillis()));
+        if (!this.workDir.exists()
+                && !this.workDir.mkdirs()) {
+            throw new RuntimeException("Cannot create directory " + this.workDir);
+        }
+        LOG.info("Configuration:");
+        LOG.info("---------------------------------------------------------------");
+        for (Map.Entry<?, ?> entry : conf.entrySet()) {
+            LOG.info("  {}: {}", entry.getKey(), entry.getValue());
+        }
+        LOG.info("---------------------------------------------------------------");
+        this.conf = conf;
+        port = Integer.parseInt(conf.getProperty(KDC_PORT));
+        String orgName= conf.getProperty(ORG_NAME);
+        String orgDomain = conf.getProperty(ORG_DOMAIN);
+        realm = orgName.toUpperCase(Locale.ENGLISH) + "."
+                + orgDomain.toUpperCase(Locale.ENGLISH);
+    }
+
+    /**
+     * Returns the port of the MiniKdc.
+     *
+     * @return the port of the MiniKdc.
+     */
+    public int getPort() {
+        return port;
+    }
+
+    /**
+     * Returns the host of the MiniKdc.
+     *
+     * @return the host of the MiniKdc.
+     */
+    public String getHost() {
+        return conf.getProperty(KDC_BIND_ADDRESS);
+    }
+
+    /**
+     * Returns the realm of the MiniKdc.
+     *
+     * @return the realm of the MiniKdc.
+     */
+    public String getRealm() {
+        return realm;
+    }
+
+    public File getKrb5conf() {
+        krb5conf = new File(System.getProperty(JAVA_SECURITY_KRB5_CONF));
+        return krb5conf;
+    }
+
+    /**
+     * Starts the MiniKdc.
+     *
+     * @throws Exception thrown if the MiniKdc could not be started.
+     */
+    public synchronized void start() throws Exception {
+        if (simpleKdc != null) {
+            throw new RuntimeException("Already started");
+        }
+        simpleKdc = new SimpleKdcServer();
+        prepareKdcServer();
+        simpleKdc.init();
+        resetDefaultRealm();
+        simpleKdc.start();
+        LOG.info("MiniKdc stated.");
+    }
+
+    private void resetDefaultRealm() throws IOException {
+        InputStream templateResource = new FileInputStream(
+                getKrb5conf().getAbsolutePath());
+        String content = IOUtil.readInput(templateResource);
+        content = content.replaceAll("default_realm = .*\n",
+                "default_realm = " + getRealm() + "\n");
+        IOUtil.writeFile(content, getKrb5conf());
+    }
+
+    private void prepareKdcServer() throws Exception {
+        // transport
+        simpleKdc.setWorkDir(workDir);
+        simpleKdc.setKdcHost(getHost());
+        simpleKdc.setKdcRealm(realm);
+        if (transport == null) {
+            transport = conf.getProperty(TRANSPORT);
+        }
+        if (port == 0) {
+            port = NetworkUtil.getServerPort();
+        }
+        if (transport != null) {
+            if (transport.trim().equals("TCP")) {
+                simpleKdc.setKdcTcpPort(port);
+                simpleKdc.setAllowUdp(false);
+            } else if (transport.trim().equals("UDP")) {
+                simpleKdc.setKdcUdpPort(port);
+                simpleKdc.setAllowTcp(false);
+            } else {
+                throw new IllegalArgumentException("Invalid transport: " + transport);
+            }
+        } else {
+            throw new IllegalArgumentException("Need to set transport!");
+        }
+        simpleKdc.getKdcConfig().setString(KdcConfigKey.KDC_SERVICE_NAME,
+                conf.getProperty(INSTANCE));
+        if (conf.getProperty(DEBUG) != null) {
+            krb5Debug = getAndSet(SUN_SECURITY_KRB5_DEBUG, conf.getProperty(DEBUG));
+        }
+    }
+
+    /**
+     * Stops the MiniKdc
+     */
+    public synchronized void stop() {
+        if (simpleKdc != null) {
+            try {
+                simpleKdc.stop();
+            } catch (KrbException e) {
+                e.printStackTrace();
+            } finally {
+                if(conf.getProperty(DEBUG) != null) {
+                    System.setProperty(SUN_SECURITY_KRB5_DEBUG,
+                            Boolean.toString(krb5Debug));
+                }
+            }
+        }
+        delete(workDir);
+        try {
+            // Will be fixed in next Kerby version.
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+        LOG.info("MiniKdc stopped.");
+    }
+
+    private void delete(File f) {
+        if (f.isFile()) {
+            if (! f.delete()) {
+                LOG.warn("WARNING: cannot delete file " + f.getAbsolutePath());
+            }
+        } else {
+            for (File c: f.listFiles()) {
+                delete(c);
+            }
+            if (! f.delete()) {
+                LOG.warn("WARNING: cannot delete directory " + f.getAbsolutePath());
+            }
+        }
+    }
+
+    /**
+     * Creates a principal in the KDC with the specified user and password.
+     *
+     * @param principal principal name, do not include the domain.
+     * @param password password.
+     * @throws Exception thrown if the principal could not be created.
+     */
+    public synchronized void createPrincipal(String principal, String password)
+            throws Exception {
+        simpleKdc.createPrincipal(principal, password);
+    }
+
+    /**
+     * Creates multiple principals in the KDC and adds them to a keytab file.
+     *
+     * @param keytabFile keytab file to add the created principals.
+     * @param principals principals to add to the KDC, do not include the domain.
+     * @throws Exception thrown if the principals or the keytab file could not be
+     * created.
+     */
+    public synchronized void createPrincipal(File keytabFile,
+                                             String ... principals)
+            throws Exception {
+        simpleKdc.createPrincipals(principals);
+        if (keytabFile.exists() && !keytabFile.delete()) {
+            LOG.error("Failed to delete keytab file: " + keytabFile);
+        }
+        for (String principal : principals) {
+            simpleKdc.getKadmin().exportKeytab(keytabFile, principal);
+        }
+    }
+
+    /**
+     * Set the System property; return the old value for caching.
+     *
+     * @param sysprop property
+     * @param debug true or false
+     * @return the previous value
+     */
+    private boolean getAndSet(String sysprop, String debug) {
+        boolean old = Boolean.getBoolean(sysprop);
+        System.setProperty(sysprop, debug);
+        return old;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/test/org/apache/zookeeper/server/quorum/auth/MiniKdcTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/auth/MiniKdcTest.java b/src/java/test/org/apache/zookeeper/server/quorum/auth/MiniKdcTest.java
new file mode 100644
index 0000000..69dbcd1
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/quorum/auth/MiniKdcTest.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum.auth;
+
+import org.apache.kerby.kerberos.kerb.keytab.Keytab;
+import org.apache.kerby.kerberos.kerb.type.base.PrincipalName;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.security.auth.Subject;
+import javax.security.auth.kerberos.KerberosPrincipal;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginContext;
+import java.io.File;
+import java.security.Principal;
+import java.util.List;
+import java.util.Set;
+import java.util.Map;
+import java.util.HashSet;
+import java.util.HashMap;
+import java.util.Arrays;
+
+/*
+ * This code is originally from HDFS, see the file name TestMiniKdc there
+ * in case of bug fixing, history, etc.
+ *
+ * Branch : trunk
+ * Github Revision: 916140604ffef59466ba30832478311d3e6249bd
+ */
+public class MiniKdcTest extends KerberosSecurityTestcase {
+    private static final boolean IBM_JAVA = System.getProperty("java.vendor")
+            .contains("IBM");
+
+    @Test(timeout = 60000)
+    public void testMiniKdcStart() {
+        MiniKdc kdc = getKdc();
+        Assert.assertNotSame(0, kdc.getPort());
+    }
+
+    @Test(timeout = 60000)
+    public void testKeytabGen() throws Exception {
+        MiniKdc kdc = getKdc();
+        File workDir = getWorkDir();
+
+        kdc.createPrincipal(new File(workDir, "keytab"), "foo/bar", "bar/foo");
+        List<PrincipalName> principalNameList =
+                Keytab.loadKeytab(new File(workDir, "keytab")).getPrincipals();
+
+        Set<String> principals = new HashSet<String>();
+        for (PrincipalName principalName : principalNameList) {
+          principals.add(principalName.getName());
+        }
+
+        Assert.assertEquals(new HashSet<String>(Arrays.asList(
+                "foo/bar@" + kdc.getRealm(), "bar/foo@" + kdc.getRealm())),
+                principals);
+      }
+
+    private static class KerberosConfiguration extends Configuration {
+        private String principal;
+        private String keytab;
+        private boolean isInitiator;
+
+        private KerberosConfiguration(String principal, File keytab,
+                boolean client) {
+            this.principal = principal;
+            this.keytab = keytab.getAbsolutePath();
+            this.isInitiator = client;
+        }
+
+        public static Configuration createClientConfig(String principal,
+                File keytab) {
+            return new KerberosConfiguration(principal, keytab, true);
+        }
+
+        public static Configuration createServerConfig(String principal,
+                File keytab) {
+            return new KerberosConfiguration(principal, keytab, false);
+        }
+
+        private static String getKrb5LoginModuleName() {
+            return System.getProperty("java.vendor").contains("IBM")
+                    ? "com.ibm.security.auth.module.Krb5LoginModule"
+                    : "com.sun.security.auth.module.Krb5LoginModule";
+        }
+
+        @Override
+        public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
+            Map<String, String> options = new HashMap<String, String>();
+            options.put("principal", principal);
+            options.put("refreshKrb5Config", "true");
+            if (IBM_JAVA) {
+                options.put("useKeytab", keytab);
+                options.put("credsType", "both");
+            } else {
+                options.put("keyTab", keytab);
+                options.put("useKeyTab", "true");
+                options.put("storeKey", "true");
+                options.put("doNotPrompt", "true");
+                options.put("useTicketCache", "true");
+                options.put("renewTGT", "true");
+                options.put("isInitiator", Boolean.toString(isInitiator));
+            }
+            String ticketCache = System.getenv("KRB5CCNAME");
+            if (ticketCache != null) {
+                options.put("ticketCache", ticketCache);
+            }
+            options.put("debug", "true");
+
+            return new AppConfigurationEntry[] {
+                    new AppConfigurationEntry(getKrb5LoginModuleName(),
+                            AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
+                            options) };
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testKerberosLogin() throws Exception {
+        MiniKdc kdc = getKdc();
+        File workDir = getWorkDir();
+        LoginContext loginContext = null;
+        try {
+            String principal = "foo";
+            File keytab = new File(workDir, "foo.keytab");
+            kdc.createPrincipal(keytab, principal);
+
+            Set<Principal> principals = new HashSet<Principal>();
+            principals.add(new KerberosPrincipal(principal));
+
+            // client login
+            Subject subject = new Subject(false, principals,
+                    new HashSet<Object>(), new HashSet<Object>());
+            loginContext = new LoginContext("", subject, null,
+                    KerberosConfiguration.createClientConfig(principal,
+                            keytab));
+            loginContext.login();
+            subject = loginContext.getSubject();
+            Assert.assertEquals(1, subject.getPrincipals().size());
+            Assert.assertEquals(KerberosPrincipal.class,
+                    subject.getPrincipals().iterator().next().getClass());
+            Assert.assertEquals(principal + "@" + kdc.getRealm(),
+                    subject.getPrincipals().iterator().next().getName());
+            loginContext.logout();
+
+            // server login
+            subject = new Subject(false, principals, new HashSet<Object>(),
+                    new HashSet<Object>());
+            loginContext = new LoginContext("", subject, null,
+                    KerberosConfiguration.createServerConfig(principal,
+                            keytab));
+            loginContext.login();
+            subject = loginContext.getSubject();
+            Assert.assertEquals(1, subject.getPrincipals().size());
+            Assert.assertEquals(KerberosPrincipal.class,
+                    subject.getPrincipals().iterator().next().getClass());
+            Assert.assertEquals(principal + "@" + kdc.getRealm(),
+                    subject.getPrincipals().iterator().next().getName());
+            loginContext.logout();
+
+        } finally {
+            if (loginContext != null && loginContext.getSubject() != null
+                    && !loginContext.getSubject().getPrincipals().isEmpty()) {
+                loginContext.logout();
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/test/org/apache/zookeeper/server/quorum/auth/QuorumAuthTestBase.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/auth/QuorumAuthTestBase.java b/src/java/test/org/apache/zookeeper/server/quorum/auth/QuorumAuthTestBase.java
new file mode 100644
index 0000000..8978d17
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/quorum/auth/QuorumAuthTestBase.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum.auth;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.server.quorum.QuorumPeerTestBase.MainThread;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * QuorumAuthTestBase provides a base class for testing quorum peer mutual
+ * authentication using SASL mechanisms.
+ */
+public class QuorumAuthTestBase extends ZKTestCase {
+    protected static final Logger LOG = LoggerFactory.getLogger(QuorumAuthTestBase.class);
+    protected List<MainThread> mt = new ArrayList<MainThread>();
+    protected static File jaasConfigDir;
+
+    public static void setupJaasConfig(String jaasEntries) {
+        try {
+            jaasConfigDir = ClientBase.createTmpDir();
+            File saslConfFile = new File(jaasConfigDir, "jaas.conf");
+            FileWriter fwriter = new FileWriter(saslConfFile);
+            fwriter.write(jaasEntries);
+            fwriter.close();
+            System.setProperty("java.security.auth.login.config",
+                    saslConfFile.getAbsolutePath());
+        } catch (IOException ioe) {
+            LOG.error("Failed to create tmp directory to hold JAAS conf file", ioe);
+            // could not create tmp directory to hold JAAS conf file : test will
+            // fail now.
+        }
+    }
+
+    public static void cleanupJaasConfig() {
+        if (jaasConfigDir != null) {
+            FileUtils.deleteQuietly(jaasConfigDir);
+        }
+    }
+
+    protected String startQuorum(final int serverCount,
+            Map<String, String> authConfigs, int authServerCount) throws IOException {
+        StringBuilder connectStr = new StringBuilder();
+        final int[] clientPorts = startQuorum(serverCount, connectStr,
+                authConfigs, authServerCount);
+        for (int i = 0; i < serverCount; i++) {
+            Assert.assertTrue("waiting for server " + i + " being up",
+                    ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i],
+                            ClientBase.CONNECTION_TIMEOUT));
+        }
+        return connectStr.toString();
+    }
+
+    protected int[] startQuorum(final int serverCount, StringBuilder connectStr,
+            Map<String, String> authConfigs, int authServerCount) throws IOException {
+        final int clientPorts[] = new int[serverCount];
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < serverCount; i++) {
+            clientPorts[i] = PortAssignment.unique();
+            String server = String.format(
+                    "server.%d=localhost:%d:%d:participant", i,
+                    PortAssignment.unique(), PortAssignment.unique());
+            sb.append(server + "\n");
+            connectStr.append("127.0.0.1:" + clientPorts[i]);
+            if (i < serverCount - 1) {
+                connectStr.append(",");
+            }
+        }
+        String quorumCfg = sb.toString();
+        // servers with authentication interfaces configured
+        int i = 0;
+        for (; i < authServerCount; i++) {
+            startServer(authConfigs, clientPorts, quorumCfg, i);
+        }
+        // servers without any authentication configured
+        for (int j = 0; j < serverCount - authServerCount; j++, i++) {
+            MainThread mthread = new MainThread(i, clientPorts[i], quorumCfg);
+            mt.add(mthread);
+            mthread.start();
+        }
+        return clientPorts;
+    }
+
+    private void startServer(Map<String, String> authConfigs,
+            final int[] clientPorts, String quorumCfg, int i)
+                    throws IOException {
+        MainThread mthread = new MainThread(i, clientPorts[i], quorumCfg,
+                authConfigs);
+        mt.add(mthread);
+        mthread.start();
+    }
+
+    protected void startServer(MainThread restartPeer,
+            Map<String, String> authConfigs) throws IOException {
+        MainThread mthread = new MainThread(restartPeer.getMyid(),
+                restartPeer.getClientPort(), restartPeer.getQuorumCfgSection(),
+                authConfigs);
+        mt.add(mthread);
+        mthread.start();
+    }
+
+    void shutdownAll() {
+        for (int i = 0; i < mt.size(); i++) {
+            shutdown(i);
+        }
+    }
+
+    MainThread shutdown(int index) {
+        MainThread mainThread = mt.get(index);
+        try {
+            mainThread.shutdown();
+        } catch (InterruptedException e) {
+        } finally {
+            mt.remove(index);
+        }
+        mainThread.deleteBaseDir();
+        return mainThread;
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/test/org/apache/zookeeper/server/quorum/auth/QuorumAuthUpgradeTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/auth/QuorumAuthUpgradeTest.java b/src/java/test/org/apache/zookeeper/server/quorum/auth/QuorumAuthUpgradeTest.java
new file mode 100644
index 0000000..3593245
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/quorum/auth/QuorumAuthUpgradeTest.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum.auth;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.quorum.QuorumPeerTestBase.MainThread;
+import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.test.ClientTest;
+import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Rolling upgrade should do in three steps:
+ *
+ * step-1) Stop the server and set the flags and restart the server.
+ * quorum.auth.enableSasl=true, quorum.auth.learnerRequireSasl=false and quorum.auth.serverRequireSasl=false
+ * Ensure that all the servers should complete this step. Now, move to next step.
+ *
+ * step-2) Stop the server one by one and change the flags and restart the server.
+ * quorum.auth.enableSasl=true, quorum.auth.learnerRequireSasl=true and quorum.auth.serverRequireSasl=false
+ * Ensure that all the servers should complete this step. Now, move to next step.
+ *
+ * step-3) Stop the server one by one and change the flags and restart the server.
+ * quorum.auth.enableSasl=true, quorum.auth.learnerRequireSasl=true and quorum.auth.serverRequireSasl=true
+ * Now, all the servers are fully upgraded and running in secured mode.
+ */
+public class QuorumAuthUpgradeTest extends QuorumAuthTestBase {
+    static {
+        String jaasEntries = new String("" + "QuorumServer {\n"
+                + "       org.apache.zookeeper.server.auth.DigestLoginModule required\n"
+                + "       user_test=\"mypassword\";\n" + "};\n"
+                + "QuorumLearner {\n"
+                + "       org.apache.zookeeper.server.auth.DigestLoginModule required\n"
+                + "       username=\"test\"\n"
+                + "       password=\"mypassword\";\n" + "};\n");
+        setupJaasConfig(jaasEntries);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        shutdownAll();
+    }
+
+    @AfterClass
+    public static void cleanup() {
+        cleanupJaasConfig();
+    }
+
+    /**
+     * Test to verify that servers are able to start without any authentication.
+     * peer0 -> quorum.auth.enableSasl=false
+     * peer1 -> quorum.auth.enableSasl=false
+     */
+    @Test(timeout = 30000)
+    public void testNullAuthLearnerServer() throws Exception {
+        Map<String, String> authConfigs = new HashMap<String, String>();
+        authConfigs.put(QuorumAuth.QUORUM_SASL_AUTH_ENABLED, "false");
+
+        String connectStr = startQuorum(2, authConfigs, 0);
+        CountdownWatcher watcher = new CountdownWatcher();
+        ZooKeeper zk = new ZooKeeper(connectStr, ClientBase.CONNECTION_TIMEOUT,
+                watcher);
+        watcher.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
+        zk.create("/foo", new byte[0], Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        zk.close();
+    }
+
+    /**
+     * Test to verify that servers are able to form quorum.
+     * peer0 -> quorum.auth.enableSasl=true, quorum.auth.learnerRequireSasl=false, quorum.auth.serverRequireSasl=false
+     * peer1 -> quorum.auth.enableSasl=false, quorum.auth.learnerRequireSasl=false, quorum.auth.serverRequireSasl=false
+     */
+    @Test(timeout = 30000)
+    public void testAuthLearnerAgainstNullAuthServer() throws Exception {
+        Map<String, String> authConfigs = new HashMap<String, String>();
+        authConfigs.put(QuorumAuth.QUORUM_SASL_AUTH_ENABLED, "true");
+
+        String connectStr = startQuorum(2, authConfigs, 1);
+        CountdownWatcher watcher = new CountdownWatcher();
+        ZooKeeper zk = new ZooKeeper(connectStr, ClientBase.CONNECTION_TIMEOUT,
+                watcher);
+        watcher.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
+        zk.create("/foo", new byte[0], Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        zk.close();
+    }
+
+    /**
+     * Test to verify that servers are able to form quorum.
+     * peer0 -> quorum.auth.enableSasl=true, quorum.auth.learnerRequireSasl=false, quorum.auth.serverRequireSasl=false
+     * peer1 -> quorum.auth.enableSasl=true, quorum.auth.learnerRequireSasl=false, quorum.auth.serverRequireSasl=false
+     */
+    @Test(timeout = 30000)
+    public void testAuthLearnerAgainstNoAuthRequiredServer() throws Exception {
+        Map<String, String> authConfigs = new HashMap<String, String>();
+        authConfigs.put(QuorumAuth.QUORUM_SASL_AUTH_ENABLED, "true");
+
+        String connectStr = startQuorum(2, authConfigs, 2);
+        CountdownWatcher watcher = new CountdownWatcher();
+        ZooKeeper zk = new ZooKeeper(connectStr, ClientBase.CONNECTION_TIMEOUT,
+                watcher);
+        watcher.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
+        zk.create("/foo", new byte[0], Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        zk.close();
+    }
+
+    /**
+     * Test to verify that servers are able to form quorum.
+     * peer0 -> quorum.auth.enableSasl=true, quorum.auth.learnerRequireSasl=true, quorum.auth.serverRequireSasl=true
+     * peer1 -> quorum.auth.enableSasl=true, quorum.auth.learnerRequireSasl=true, quorum.auth.serverRequireSasl=true
+     */
+    @Test(timeout = 30000)
+    public void testAuthLearnerServer() throws Exception {
+        Map<String, String> authConfigs = new HashMap<String, String>();
+        authConfigs.put(QuorumAuth.QUORUM_SASL_AUTH_ENABLED, "true");
+        authConfigs.put(QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED, "true");
+        authConfigs.put(QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED, "true");
+
+        String connectStr = startQuorum(2, authConfigs, 2);
+        CountdownWatcher watcher = new CountdownWatcher();
+        ZooKeeper zk = new ZooKeeper(connectStr, ClientBase.CONNECTION_TIMEOUT,
+                watcher);
+        watcher.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
+        zk.create("/foo", new byte[0], Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        zk.close();
+    }
+
+    /**
+     * Rolling upgrade should do in three steps:
+     *
+     * step-1) Stop the server and set the flags and restart the server.
+     * quorum.auth.enableSasl=true, quorum.auth.learnerRequireSasl=false and quorum.auth.serverRequireSasl=false
+     * Ensure that all the servers should complete this step. Now, move to next step.
+     *
+     * step-2) Stop the server one by one and change the flags and restart the server.
+     * quorum.auth.enableSasl=true, quorum.auth.learnerRequireSasl=true and quorum.auth.serverRequireSasl=false
+     * Ensure that all the servers should complete this step. Now, move to next step.
+     *
+     * step-3) Stop the server one by one and change the flags and restart the server.
+     * quorum.auth.enableSasl=true, quorum.auth.learnerRequireSasl=true and quorum.auth.serverRequireSasl=true
+     * Now, all the servers are fully upgraded and running in secured mode.
+     */
+    @Test(timeout = 90000)
+    public void testRollingUpgrade() throws Exception {
+        // Start peer0,1,2 servers with quorum.auth.enableSasl=false and
+        // quorum.auth.learnerRequireSasl=false, quorum.auth.serverRequireSasl=false
+        // Assume this is an existing cluster.
+        Map<String, String> authConfigs = new HashMap<String, String>();
+        authConfigs.put(QuorumAuth.QUORUM_SASL_AUTH_ENABLED, "false");
+
+        String connectStr = startQuorum(3, authConfigs, 0);
+        CountdownWatcher watcher = new CountdownWatcher();
+        ZooKeeper zk = new ZooKeeper(connectStr, ClientBase.CONNECTION_TIMEOUT,
+                watcher);
+        watcher.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
+        zk.create("/foo", new byte[0], Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT_SEQUENTIAL);
+
+        //1. Upgrade peer0,1,2 with quorum.auth.enableSasl=true and
+        // quorum.auth.learnerRequireSasl=false, quorum.auth.serverRequireSasl=false
+        authConfigs.put(QuorumAuth.QUORUM_SASL_AUTH_ENABLED, "true");
+        authConfigs.put(QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED, "false");
+        authConfigs.put(QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED, "false");
+        restartServer(authConfigs, 0, zk, watcher);
+        restartServer(authConfigs, 1, zk, watcher);
+        restartServer(authConfigs, 2, zk, watcher);
+
+        //2. Upgrade peer0,1,2 with quorum.auth.enableSasl=true and
+        // quorum.auth.learnerRequireSasl=true, quorum.auth.serverRequireSasl=false
+        authConfigs.put(QuorumAuth.QUORUM_SASL_AUTH_ENABLED, "true");
+        authConfigs.put(QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED, "true");
+        authConfigs.put(QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED, "false");
+        restartServer(authConfigs, 0, zk, watcher);
+        restartServer(authConfigs, 1, zk, watcher);
+        restartServer(authConfigs, 2, zk, watcher);
+
+        //3. Upgrade peer0,1,2 with quorum.auth.enableSasl=true and
+        // quorum.auth.learnerRequireSasl=true, quorum.auth.serverRequireSasl=true
+        authConfigs.put(QuorumAuth.QUORUM_SASL_AUTH_ENABLED, "true");
+        authConfigs.put(QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED, "true");
+        authConfigs.put(QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED, "true");
+        restartServer(authConfigs, 0, zk, watcher);
+        restartServer(authConfigs, 1, zk, watcher);
+        restartServer(authConfigs, 2, zk, watcher);
+
+        //4. Restart peer2 with quorum.auth.learnerEnableSasl=false and
+        // quorum.auth.serverRequireSasl=false. It should fail to join the
+        // quorum as this needs auth.
+        authConfigs.put(QuorumAuth.QUORUM_SASL_AUTH_ENABLED, "false");
+        MainThread m = shutdown(2);
+        startServer(m, authConfigs);
+        Assert.assertFalse("waiting for server 2 being up", ClientBase
+                .waitForServerUp("127.0.0.1:" + m.getClientPort(), 5000));
+    }
+
+    private void restartServer(Map<String, String> authConfigs, int index,
+            ZooKeeper zk, CountdownWatcher watcher) throws IOException,
+                    KeeperException, InterruptedException, TimeoutException {
+        LOG.info("Restarting server myid=" + index);
+        MainThread m = shutdown(index);
+        startServer(m, authConfigs);
+        Assert.assertTrue("waiting for server" + index + "being up",
+                ClientBase.waitForServerUp("127.0.0.1:" + m.getClientPort(),
+                        ClientBase.CONNECTION_TIMEOUT));
+        watcher.waitForConnected(ClientTest.CONNECTION_TIMEOUT);
+        zk.create("/foo", new byte[0], Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT_SEQUENTIAL);
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/test/org/apache/zookeeper/server/quorum/auth/QuorumDigestAuthTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/auth/QuorumDigestAuthTest.java b/src/java/test/org/apache/zookeeper/server/quorum/auth/QuorumDigestAuthTest.java
new file mode 100644
index 0000000..5eebdb3
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/quorum/auth/QuorumDigestAuthTest.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum.auth;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.admin.AdminServer;
+import org.apache.zookeeper.server.quorum.QuorumPeerMain;
+import org.apache.zookeeper.server.quorum.QuorumPeerTestBase;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
+import org.apache.zookeeper.server.quorum.QuorumPeerTestBase.MainThread;
+import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class QuorumDigestAuthTest extends QuorumAuthTestBase {
+
+    static {
+        String jaasEntries = new String(""
+                + "QuorumServer {\n"
+                + "       org.apache.zookeeper.server.auth.DigestLoginModule required\n"
+                + "       user_test=\"mypassword\";\n" + "};\n"
+                + "QuorumLearner {\n"
+                + "       org.apache.zookeeper.server.auth.DigestLoginModule required\n"
+                + "       username=\"test\"\n"
+                + "       password=\"mypassword\";\n" + "};\n"
+                + "QuorumLearnerInvalid {\n"
+                + "       org.apache.zookeeper.server.auth.DigestLoginModule required\n"
+                + "       username=\"test\"\n"
+                + "       password=\"invalid\";\n" + "};" + "\n");
+        setupJaasConfig(jaasEntries);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        for (MainThread mainThread : mt) {
+            mainThread.shutdown();
+            mainThread.deleteBaseDir();
+        }
+    }
+
+    @AfterClass
+    public static void cleanup(){
+        cleanupJaasConfig();
+    }
+
+    /**
+     * Test to verify that server is able to start with valid credentials
+     */
+    @Test(timeout = 30000)
+    public void testValidCredentials() throws Exception {
+        Map<String, String> authConfigs = new HashMap<String, String>();
+        authConfigs.put(QuorumAuth.QUORUM_SASL_AUTH_ENABLED, "true");
+        authConfigs.put(QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED, "true");
+        authConfigs.put(QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED, "true");
+
+        String connectStr = startQuorum(3, authConfigs, 3);
+        CountdownWatcher watcher = new CountdownWatcher();
+        ZooKeeper zk = new ZooKeeper(connectStr, ClientBase.CONNECTION_TIMEOUT,
+                watcher);
+        watcher.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
+        for (int i = 0; i < 10; i++) {
+            zk.create("/" + i, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+        }
+        zk.close();
+    }
+
+    /**
+     * Test to verify that server is able to start with invalid credentials if
+     * the configuration is set to quorum.auth.serverRequireSasl=false.
+     * Quorum will talk each other even if the authentication is not succeeded
+     */
+    @Test(timeout = 30000)
+    public void testSaslNotRequiredWithInvalidCredentials() throws Exception {
+        Map<String, String> authConfigs = new HashMap<String, String>();
+        authConfigs.put(QuorumAuth.QUORUM_LEARNER_SASL_LOGIN_CONTEXT, "QuorumLearnerInvalid");
+        authConfigs.put(QuorumAuth.QUORUM_SASL_AUTH_ENABLED, "false");
+        authConfigs.put(QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED, "false");
+        String connectStr = startQuorum(3, authConfigs, 3);
+        CountdownWatcher watcher = new CountdownWatcher();
+        ZooKeeper zk = new ZooKeeper(connectStr, ClientBase.CONNECTION_TIMEOUT,
+                watcher);
+        watcher.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
+        for (int i = 0; i < 10; i++) {
+            zk.create("/" + i, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+        }
+        zk.close();
+    }
+
+    /**
+     * Test to verify that server shouldn't start with invalid credentials
+     * if the configuration is set to quorum.auth.serverRequireSasl=true,
+     * quorum.auth.learnerRequireSasl=true
+     */
+    @Test(timeout = 30000)
+    public void testSaslRequiredInvalidCredentials() throws Exception {
+        Map<String, String> authConfigs = new HashMap<String, String>();
+        authConfigs.put(QuorumAuth.QUORUM_LEARNER_SASL_LOGIN_CONTEXT, "QuorumLearnerInvalid");
+        authConfigs.put(QuorumAuth.QUORUM_SASL_AUTH_ENABLED, "true");
+        authConfigs.put(QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED, "true");
+        authConfigs.put(QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED, "true");
+        int serverCount = 2;
+        final int[] clientPorts = startQuorum(serverCount, new StringBuilder(),
+                authConfigs, serverCount);
+        for (int i = 0; i < serverCount; i++) {
+            boolean waitForServerUp = ClientBase.waitForServerUp(
+                    "127.0.0.1:" + clientPorts[i], QuorumPeerTestBase.TIMEOUT);
+            Assert.assertFalse("Shouldn't start server with invalid credentials",
+                    waitForServerUp);
+        }
+    }
+
+    /**
+     * If quorumpeer learner is not auth enabled then self won't be able to join
+     * quorum. So this test is ensuring that the quorumpeer learner is also auth
+     * enabled while enabling quorum server require sasl.
+     */
+    @Test(timeout = 10000)
+    public void testEnableQuorumServerRequireSaslWithoutQuorumLearnerRequireSasl()
+            throws Exception {
+        Map<String, String> authConfigs = new HashMap<String, String>();
+        authConfigs.put(QuorumAuth.QUORUM_LEARNER_SASL_LOGIN_CONTEXT,
+                "QuorumLearner");
+        authConfigs.put(QuorumAuth.QUORUM_SASL_AUTH_ENABLED, "true");
+        authConfigs.put(QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED, "true");
+        authConfigs.put(QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED, "false");
+        MainThread mthread = new MainThread(1, PortAssignment.unique(), "",
+                authConfigs);
+        String args[] = new String[1];
+        args[0] = mthread.getConfFile().toString();
+        try {
+            new QuorumPeerMain() {
+                @Override
+                protected void initializeAndRun(String[] args)
+                        throws ConfigException, IOException, AdminServer.AdminServerException {
+                    super.initializeAndRun(args);
+                }
+            }.initializeAndRun(args);
+            Assert.fail("Must throw exception as quorumpeer learner is not enabled!");
+        } catch (ConfigException e) {
+            // expected
+        }
+    }
+
+
+    /**
+     * If quorumpeer learner is not auth enabled then self won't be able to join
+     * quorum. So this test is ensuring that the quorumpeer learner is also auth
+     * enabled while enabling quorum server require sasl.
+     */
+    @Test(timeout = 10000)
+    public void testEnableQuorumAuthenticationConfigurations()
+            throws Exception {
+        Map<String, String> authConfigs = new HashMap<String, String>();
+        authConfigs.put(QuorumAuth.QUORUM_LEARNER_SASL_LOGIN_CONTEXT,
+                "QuorumLearner");
+        authConfigs.put(QuorumAuth.QUORUM_SASL_AUTH_ENABLED, "false");
+
+        // case-1) 'quorum.auth.enableSasl' is off. Tries to enable server sasl.
+        authConfigs.put(QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED, "true");
+        authConfigs.put(QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED, "false");
+        MainThread mthread = new MainThread(1, PortAssignment.unique(), "",
+                authConfigs);
+        String args[] = new String[1];
+        args[0] = mthread.getConfFile().toString();
+        try {
+            new QuorumPeerMain() {
+                @Override
+                protected void initializeAndRun(String[] args)
+                        throws ConfigException, IOException, AdminServer.AdminServerException {
+                    super.initializeAndRun(args);
+                }
+            }.initializeAndRun(args);
+            Assert.fail("Must throw exception as quorum sasl is not enabled!");
+        } catch (ConfigException e) {
+            // expected
+        }
+
+        // case-1) 'quorum.auth.enableSasl' is off. Tries to enable learner sasl.
+        authConfigs.put(QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED, "false");
+        authConfigs.put(QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED, "true");
+        try {
+            new QuorumPeerMain() {
+                @Override
+                protected void initializeAndRun(String[] args)
+                        throws ConfigException, IOException, AdminServer.AdminServerException {
+                    super.initializeAndRun(args);
+                }
+            }.initializeAndRun(args);
+            Assert.fail("Must throw exception as quorum sasl is not enabled!");
+        } catch (ConfigException e) {
+            // expected
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/test/org/apache/zookeeper/server/quorum/auth/QuorumKerberosAuthTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/auth/QuorumKerberosAuthTest.java b/src/java/test/org/apache/zookeeper/server/quorum/auth/QuorumKerberosAuthTest.java
new file mode 100644
index 0000000..2cc56a7
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/quorum/auth/QuorumKerberosAuthTest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum.auth;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.quorum.QuorumPeerTestBase.MainThread;
+import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+
+public class QuorumKerberosAuthTest extends KerberosSecurityTestcase {
+    private static File keytabFile;
+    static {
+        String keytabFilePath = FilenameUtils.normalize(KerberosTestUtils.getKeytabFile(), true);
+        String jaasEntries = new String(""
+                + "QuorumServer {\n"
+                + "       com.sun.security.auth.module.Krb5LoginModule required\n"
+                + "       useKeyTab=true\n"
+                + "       keyTab=\"" + keytabFilePath + "\"\n"
+                + "       storeKey=true\n"
+                + "       useTicketCache=false\n"
+                + "       debug=false\n"
+                + "       principal=\"" + KerberosTestUtils.getServerPrincipal() + "\";\n" + "};\n"
+                + "QuorumLearner {\n"
+                + "       com.sun.security.auth.module.Krb5LoginModule required\n"
+                + "       useKeyTab=true\n"
+                + "       keyTab=\"" + keytabFilePath + "\"\n"
+                + "       storeKey=true\n"
+                + "       useTicketCache=false\n"
+                + "       debug=false\n"
+                + "       principal=\"" + KerberosTestUtils.getLearnerPrincipal() + "\";\n" + "};\n");
+        setupJaasConfig(jaasEntries);
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        // create keytab
+        keytabFile = new File(KerberosTestUtils.getKeytabFile());
+        String learnerPrincipal = KerberosTestUtils.getLearnerPrincipal();
+        String serverPrincipal = KerberosTestUtils.getServerPrincipal();
+        learnerPrincipal = learnerPrincipal.substring(0, learnerPrincipal.lastIndexOf("@"));
+        serverPrincipal = serverPrincipal.substring(0, serverPrincipal.lastIndexOf("@"));
+        getKdc().createPrincipal(keytabFile, learnerPrincipal, serverPrincipal);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        for (MainThread mainThread : mt) {
+            mainThread.shutdown();
+            mainThread.deleteBaseDir();
+        }
+    }
+
+    @AfterClass
+    public static void cleanup() {
+        if(keytabFile != null){
+            FileUtils.deleteQuietly(keytabFile);
+        }
+        cleanupJaasConfig();
+    }
+
+    /**
+     * Test to verify that server is able to start with valid credentials
+     */
+    @Test(timeout = 120000)
+    public void testValidCredentials() throws Exception {
+        String serverPrincipal = KerberosTestUtils.getServerPrincipal();
+        serverPrincipal = serverPrincipal.substring(0, serverPrincipal.lastIndexOf("@"));
+        Map<String, String> authConfigs = new HashMap<String, String>();
+        authConfigs.put(QuorumAuth.QUORUM_SASL_AUTH_ENABLED, "true");
+        authConfigs.put(QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED, "true");
+        authConfigs.put(QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED, "true");
+        authConfigs.put(QuorumAuth.QUORUM_KERBEROS_SERVICE_PRINCIPAL, serverPrincipal);
+        String connectStr = startQuorum(3, authConfigs, 3);
+        CountdownWatcher watcher = new CountdownWatcher();
+        ZooKeeper zk = new ZooKeeper(connectStr, ClientBase.CONNECTION_TIMEOUT, watcher);
+        watcher.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
+        for (int i = 0; i < 10; i++) {
+            zk.create("/" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        }
+        zk.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/test/org/apache/zookeeper/server/quorum/auth/QuorumKerberosHostBasedAuthTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/auth/QuorumKerberosHostBasedAuthTest.java b/src/java/test/org/apache/zookeeper/server/quorum/auth/QuorumKerberosHostBasedAuthTest.java
new file mode 100644
index 0000000..fcb7691
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/quorum/auth/QuorumKerberosHostBasedAuthTest.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum.auth;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.quorum.QuorumPeerTestBase.MainThread;
+import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+public class QuorumKerberosHostBasedAuthTest extends KerberosSecurityTestcase {
+    private static File keytabFile;
+    private static String hostServerPrincipal = KerberosTestUtils.getHostServerPrincipal();
+    private static String hostLearnerPrincipal = KerberosTestUtils.getHostLearnerPrincipal();
+    private static String hostNamedLearnerPrincipal = KerberosTestUtils.getHostNamedLearnerPrincipal("myHost");
+    static {
+        setupJaasConfigEntries(hostServerPrincipal, hostLearnerPrincipal, hostNamedLearnerPrincipal);
+    }
+
+    private static void setupJaasConfigEntries(String hostServerPrincipal,
+            String hostLearnerPrincipal, String hostNamedLearnerPrincipal) {
+        String keytabFilePath = FilenameUtils.normalize(KerberosTestUtils.getKeytabFile(), true);
+        String jaasEntries = new String(""
+                + "QuorumServer {\n"
+                + "       com.sun.security.auth.module.Krb5LoginModule required\n"
+                + "       useKeyTab=true\n"
+                + "       keyTab=\"" + keytabFilePath + "\"\n"
+                + "       storeKey=true\n"
+                + "       useTicketCache=false\n"
+                + "       debug=false\n"
+                + "       principal=\"" + KerberosTestUtils.replaceHostPattern(hostServerPrincipal) + "\";\n" + "};\n"
+                + "QuorumLearner {\n"
+                + "       com.sun.security.auth.module.Krb5LoginModule required\n"
+                + "       useKeyTab=true\n"
+                + "       keyTab=\"" + keytabFilePath + "\"\n"
+                + "       storeKey=true\n"
+                + "       useTicketCache=false\n"
+                + "       debug=false\n"
+                + "       principal=\"" + KerberosTestUtils.replaceHostPattern(hostLearnerPrincipal) + "\";\n" + "};\n"
+                + "QuorumLearnerMyHost {\n"
+                + "       com.sun.security.auth.module.Krb5LoginModule required\n"
+                + "       useKeyTab=true\n"
+                + "       keyTab=\"" + keytabFilePath + "\"\n"
+                + "       storeKey=true\n"
+                + "       useTicketCache=false\n"
+                + "       debug=false\n"
+                + "       principal=\"" + hostNamedLearnerPrincipal + "\";\n" + "};\n");
+        setupJaasConfig(jaasEntries);
+    }
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        // create keytab
+        keytabFile = new File(KerberosTestUtils.getKeytabFile());
+
+        // Creates principals in the KDC and adds them to a keytab file.
+        String learnerPrincipal = hostLearnerPrincipal.substring(0, hostLearnerPrincipal.lastIndexOf("@"));
+        learnerPrincipal = KerberosTestUtils.replaceHostPattern(learnerPrincipal);
+        String serverPrincipal = hostServerPrincipal.substring(0, hostServerPrincipal.lastIndexOf("@"));
+        serverPrincipal = KerberosTestUtils.replaceHostPattern(serverPrincipal);
+
+        // learner with ipaddress in principal
+        String learnerPrincipal2 = hostNamedLearnerPrincipal.substring(0, hostNamedLearnerPrincipal.lastIndexOf("@"));
+        getKdc().createPrincipal(keytabFile, learnerPrincipal, learnerPrincipal2, serverPrincipal);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        for (MainThread mainThread : mt) {
+            mainThread.shutdown();
+            mainThread.deleteBaseDir();
+        }
+    }
+
+    @AfterClass
+    public static void cleanup() {
+        if(keytabFile != null){
+            FileUtils.deleteQuietly(keytabFile);
+        }
+        cleanupJaasConfig();
+    }
+
+    /**
+     * Test to verify that server is able to start with valid credentials
+     */
+    @Test(timeout = 120000)
+    public void testValidCredentials() throws Exception {
+        String serverPrincipal = hostServerPrincipal.substring(0, hostServerPrincipal.lastIndexOf("@"));
+        Map<String, String> authConfigs = new HashMap<String, String>();
+        authConfigs.put(QuorumAuth.QUORUM_SASL_AUTH_ENABLED, "true");
+        authConfigs.put(QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED, "true");
+        authConfigs.put(QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED, "true");
+        authConfigs.put(QuorumAuth.QUORUM_KERBEROS_SERVICE_PRINCIPAL, serverPrincipal);
+        String connectStr = startQuorum(3, authConfigs, 3);
+        CountdownWatcher watcher = new CountdownWatcher();
+        ZooKeeper zk = new ZooKeeper(connectStr, ClientBase.CONNECTION_TIMEOUT, watcher);
+        watcher.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
+        for (int i = 0; i < 10; i++) {
+            zk.create("/" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        }
+        zk.close();
+    }
+
+    /**
+     * Test to verify that the bad server connection to the quorum should be rejected.
+     */
+    @Test(timeout = 120000)
+    public void testConnectBadServer() throws Exception {
+        String serverPrincipal = hostServerPrincipal.substring(0, hostServerPrincipal.lastIndexOf("@"));
+        Map<String, String> authConfigs = new HashMap<String, String>();
+        authConfigs.put(QuorumAuth.QUORUM_SASL_AUTH_ENABLED, "true");
+        authConfigs.put(QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED, "true");
+        authConfigs.put(QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED, "true");
+        authConfigs.put(QuorumAuth.QUORUM_KERBEROS_SERVICE_PRINCIPAL, serverPrincipal);
+        String connectStr = startQuorum(3, authConfigs, 3);
+        CountdownWatcher watcher = new CountdownWatcher();
+        ZooKeeper zk = new ZooKeeper(connectStr, ClientBase.CONNECTION_TIMEOUT, watcher);
+        watcher.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
+        for (int i = 0; i < 10; i++) {
+            zk.create("/" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        }
+        zk.close();
+
+        String quorumCfgSection = mt.get(0).getQuorumCfgSection();
+        StringBuilder sb = new StringBuilder();
+        sb.append(quorumCfgSection);
+
+        int myid = mt.size() + 1;
+        final int clientPort = PortAssignment.unique();
+        String server = String.format("server.%d=localhost:%d:%d:participant",
+                myid, PortAssignment.unique(), PortAssignment.unique());
+        sb.append(server + "\n");
+        quorumCfgSection = sb.toString();
+        authConfigs.put(QuorumAuth.QUORUM_LEARNER_SASL_LOGIN_CONTEXT,
+                "QuorumLearnerMyHost");
+        MainThread badServer = new MainThread(myid, clientPort, quorumCfgSection,
+                authConfigs);
+        badServer.start();
+        watcher = new CountdownWatcher();
+        connectStr = "127.0.0.1:" + clientPort;
+        zk = new ZooKeeper(connectStr, ClientBase.CONNECTION_TIMEOUT, watcher);
+        try{
+            watcher.waitForConnected(ClientBase.CONNECTION_TIMEOUT/3);
+            Assert.fail("Must throw exception as the myHost is not an authorized one!");
+        } catch (TimeoutException e){
+            // expected
+        } finally {
+            zk.close();
+            badServer.shutdown();
+            badServer.deleteBaseDir();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java b/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
index b9d9212..3cee7ba 100644
--- a/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
+++ b/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
@@ -110,7 +110,7 @@ public class CnxManagerTest extends ZKTestCase {
         public void run(){
             try {
                 QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[0], peerTmpdir[0], peerClientPort[0], 3, 0, 1000, 2, 2);
-                QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
+                QuorumCnxManager cnxManager = peer.createCnxnManager();
                 QuorumCnxManager.Listener listener = cnxManager.listener;
                 if(listener != null){
                     listener.start();
@@ -154,7 +154,7 @@ public class CnxManagerTest extends ZKTestCase {
         thread.start();
 
         QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 1000, 2, 2);
-        QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
+        QuorumCnxManager cnxManager = peer.createCnxnManager();
         QuorumCnxManager.Listener listener = cnxManager.listener;
         if(listener != null){
             listener.start();
@@ -201,7 +201,7 @@ public class CnxManagerTest extends ZKTestCase {
         peerTmpdir[2] = ClientBase.createTmpDir();
 
         QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 1000, 2, 2);
-        QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
+        QuorumCnxManager cnxManager = peer.createCnxnManager();
         QuorumCnxManager.Listener listener = cnxManager.listener;
         if(listener != null){
             listener.start();
@@ -229,7 +229,7 @@ public class CnxManagerTest extends ZKTestCase {
     @Test
     public void testCnxManagerSpinLock() throws Exception {
         QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 1000, 2, 2);
-        QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
+        QuorumCnxManager cnxManager = peer.createCnxnManager();
         QuorumCnxManager.Listener listener = cnxManager.listener;
         if(listener != null){
             listener.start();
@@ -294,7 +294,7 @@ public class CnxManagerTest extends ZKTestCase {
         peers.get(2L).type = LearnerType.OBSERVER;
         QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1],
                 peerClientPort[1], 3, 1, 1000, 2, 2);
-        QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
+        QuorumCnxManager cnxManager = peer.createCnxnManager();
         QuorumCnxManager.Listener listener = cnxManager.listener;
         if (listener != null) {
             listener.start();
@@ -341,7 +341,7 @@ public class CnxManagerTest extends ZKTestCase {
     @Test
     public void testSocketTimeout() throws Exception {
         QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 2000, 2, 2);
-        QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
+        QuorumCnxManager cnxManager = peer.createCnxnManager();
         QuorumCnxManager.Listener listener = cnxManager.listener;
         if(listener != null){
             listener.start();

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/test/org/apache/zookeeper/test/FLEPredicateTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/test/FLEPredicateTest.java b/src/java/test/org/apache/zookeeper/test/FLEPredicateTest.java
index a4244d8..bc43775 100644
--- a/src/java/test/org/apache/zookeeper/test/FLEPredicateTest.java
+++ b/src/java/test/org/apache/zookeeper/test/FLEPredicateTest.java
@@ -41,7 +41,7 @@ public class FLEPredicateTest extends ZKTestCase {
     
     class MockFLE extends FastLeaderElection {
         MockFLE(QuorumPeer peer){
-            super(peer, new QuorumCnxManager(peer));
+            super(peer, peer.createCnxnManager());
         }
         
         boolean predicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch){

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/zookeeper.jute
----------------------------------------------------------------------
diff --git a/src/zookeeper.jute b/src/zookeeper.jute
index a404e78..2533ddf 100644
--- a/src/zookeeper.jute
+++ b/src/zookeeper.jute
@@ -236,6 +236,11 @@ module org.apache.zookeeper.server.quorum {
         buffer data; // Only significant when type is request
         vector<org.apache.zookeeper.data.Id> authinfo;
     }
+    class QuorumAuthPacket {
+        long magic;
+        int status;
+        buffer token;
+    }
 }
 
 module org.apache.zookeeper.server.persistence {


[3/3] zookeeper git commit: ZOOKEEPER-2935: [QP MutualAuth]: Port ZOOKEEPER-1045 implementation from branch-3.5 to trunk

Posted by ph...@apache.org.
ZOOKEEPER-2935: [QP MutualAuth]: Port ZOOKEEPER-1045 implementation from branch-3.5 to trunk

Author: Abraham Fine <af...@apache.org>

Reviewers: phunt@apache.org

Closes #417 from afine/ZOOKEEPER-2935 and squashes the following commits:

99bfbe94 [Abraham Fine] Add debugging line and improve ivy.xml by removing unnecessary excludes
1d6c7de5 [Abraham Fine] Fix missing test.data.kerberos.dir in build.xml
06d0b6fa [Abraham Fine] ZOOKEEPER-2935: [QP MutualAuth]: Port ZOOKEEPER-1045 implementation from  branch-3.5 to trunk.

Change-Id: I2b88221c0006e4336a39f74fd5a87d1aded68c90


Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/75411ab3
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/75411ab3
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/75411ab3

Branch: refs/heads/master
Commit: 75411ab34a3d53c43c2d508b12314a9788aa417d
Parents: 748585d
Author: Abraham Fine <af...@apache.org>
Authored: Mon Nov 27 15:10:13 2017 -0800
Committer: Patrick Hunt <ph...@apache.org>
Committed: Mon Nov 27 15:10:13 2017 -0800

----------------------------------------------------------------------
 build.xml                                       |   9 +
 ivy.xml                                         |  33 ++
 src/java/main/org/apache/zookeeper/Login.java   |   2 +-
 .../zookeeper/SaslClientCallbackHandler.java    | 104 +++++
 .../zookeeper/client/ZooKeeperSaslClient.java   | 164 +-------
 .../org/apache/zookeeper/server/ServerCnxn.java |   8 +-
 .../zookeeper/server/ZooKeeperSaslServer.java   | 121 +-----
 .../zookeeper/server/ZooKeeperServer.java       |   1 +
 .../server/auth/SaslServerCallbackHandler.java  |  10 +-
 .../server/quorum/FastLeaderElection.java       |   2 +
 .../zookeeper/server/quorum/Follower.java       |   5 +-
 .../apache/zookeeper/server/quorum/Leader.java  |  10 +-
 .../apache/zookeeper/server/quorum/Learner.java |  18 +-
 .../zookeeper/server/quorum/LearnerHandler.java |  27 +-
 .../zookeeper/server/quorum/Observer.java       |   8 +-
 .../server/quorum/QuorumCnxManager.java         | 300 +++++++++++--
 .../zookeeper/server/quorum/QuorumPeer.java     | 198 ++++++++-
 .../server/quorum/QuorumPeerConfig.java         |  48 +++
 .../zookeeper/server/quorum/QuorumPeerMain.java |  15 +-
 .../quorum/auth/NullQuorumAuthLearner.java      |  33 ++
 .../quorum/auth/NullQuorumAuthServer.java       |  34 ++
 .../server/quorum/auth/QuorumAuth.java          |  96 +++++
 .../server/quorum/auth/QuorumAuthLearner.java   |  40 ++
 .../server/quorum/auth/QuorumAuthServer.java    |  41 ++
 .../quorum/auth/SaslQuorumAuthLearner.java      | 223 ++++++++++
 .../quorum/auth/SaslQuorumAuthServer.java       | 179 ++++++++
 .../auth/SaslQuorumServerCallbackHandler.java   | 148 +++++++
 .../apache/zookeeper/util/SecurityUtils.java    | 298 +++++++++++++
 src/java/test/config/findbugsExcludeFile.xml    |   5 +
 src/java/test/data/kerberos/minikdc-krb5.conf   |  30 ++
 src/java/test/data/kerberos/minikdc.ldiff       |  52 +++
 .../quorum/EphemeralNodeDeletionTest.java       |   8 +-
 .../quorum/FLEBackwardElectionRoundTest.java    |   4 +-
 .../server/quorum/FLELostMessageTest.java       |   2 +-
 .../server/quorum/LearnerHandlerTest.java       |   3 +-
 .../zookeeper/server/quorum/LearnerTest.java    |   4 +-
 .../server/quorum/QuorumPeerTestBase.java       |  97 ++++-
 .../server/quorum/RaceConditionTest.java        |   7 +-
 .../zookeeper/server/quorum/Zab1_0Test.java     |  45 +-
 .../quorum/auth/KerberosSecurityTestcase.java   | 120 ++++++
 .../server/quorum/auth/KerberosTestUtils.java   |  76 ++++
 .../zookeeper/server/quorum/auth/MiniKdc.java   | 418 +++++++++++++++++++
 .../server/quorum/auth/MiniKdcTest.java         | 185 ++++++++
 .../server/quorum/auth/QuorumAuthTestBase.java  | 146 +++++++
 .../quorum/auth/QuorumAuthUpgradeTest.java      | 239 +++++++++++
 .../quorum/auth/QuorumDigestAuthTest.java       | 222 ++++++++++
 .../quorum/auth/QuorumKerberosAuthTest.java     | 110 +++++
 .../auth/QuorumKerberosHostBasedAuthTest.java   | 184 ++++++++
 .../apache/zookeeper/test/CnxManagerTest.java   |  12 +-
 .../apache/zookeeper/test/FLEPredicateTest.java |   2 +-
 src/zookeeper.jute                              |   5 +
 51 files changed, 3755 insertions(+), 396 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 7528487..e61f7d7 100644
--- a/build.xml
+++ b/build.xml
@@ -79,6 +79,7 @@ xmlns:cs="antlib:com.puppycrawl.tools.checkstyle.ant">
     <property name="test.data.dir" value="${test.java.build.dir}/data" />
     <property name="test.data.invalid.dir" value="${test.data.dir}/invalidsnap" />
     <property name="test.data.buffersize.dir" value="${test.data.dir}/buffersize" />
+    <property name="test.data.kerberos.dir" value="${test.data.dir}/kerberos" />
     <property name="test.data.ssl.dir" value="${test.data.dir}/ssl" />
     <property name="test.cppunit.dir" value="${test.java.build.dir}/test-cppunit"/>
     <property name="test.tmp.dir" value="${test.java.build.dir}/tmp" />
@@ -226,6 +227,9 @@ xmlns:cs="antlib:com.puppycrawl.tools.checkstyle.ant">
     <property name="jackson-mapper-asl.version" value="1.9.11"/>
     <property name="dependency-check-ant.version" value="2.1.0"/>
 
+    <property name="commons-io.version" value="2.4"/>
+    <property name="kerby.version" value="1.0.0-RC2"/>
+
     <!-- ====================================================== -->
     <!-- Macro definitions                                      -->
     <!-- ====================================================== -->
@@ -1165,6 +1169,7 @@ xmlns:cs="antlib:com.puppycrawl.tools.checkstyle.ant">
         <delete dir="${test.tmp.dir}" />
         <delete dir="${test.data.invalid.dir}" />
         <delete dir="${test.data.buffersize.dir}" />
+        <delete dir="${test.data.kerberos.dir}" />
         <delete dir="${test.data.ssl.dir}" />
         <delete dir="${test.data.dir}" />
         <mkdir dir="${test.log.dir}" />
@@ -1182,6 +1187,10 @@ xmlns:cs="antlib:com.puppycrawl.tools.checkstyle.ant">
         <copy todir="${test.data.ssl.dir}">
             <fileset dir="${basedir}/src/java/test/data/ssl"/>
         </copy>
+        <mkdir dir="${test.data.kerberos.dir}" />
+        <copy todir="${test.data.kerberos.dir}">
+            <fileset dir="${basedir}/src/java/test/data/kerberos"/>
+        </copy>
     </target>
 
     <target name="junit-init" depends="test-category">

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/ivy.xml
----------------------------------------------------------------------
diff --git a/ivy.xml b/ivy.xml
index 8a9c603..4298aa9 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -90,6 +90,39 @@
     <dependency org="org.owasp" name="dependency-check-ant"
                 rev="${dependency-check-ant.version}" conf="owasp->default"/>
 
+    <dependency org="commons-io" name="commons-io"
+                rev="${commons-io.version}" conf="test->default"/>
+
+    <dependency org="org.apache.kerby" name="kerby-config" rev="${kerby.version}" conf="test->default">
+          <exclude org="org.slf4j" module="slf4j-api"/>
+          <exclude org="org.slf4j" module="slf4j-log4j12"/>
+    </dependency>
+    <dependency org="org.apache.kerby" name="kerb-simplekdc" rev="${kerby.version}" conf="test->default">
+        <exclude org="org.slf4j" module="slf4j-api"/>
+    </dependency>
+    <dependency org="org.apache.kerby" name="kerb-core"
+                rev="${kerby.version}" conf="test->default">
+        <exclude org="org.slf4j" module="slf4j-api"/>
+    </dependency>
+    <dependency org="org.apache.kerby" name="kerb-server"
+                rev="${kerby.version}" conf="test->default"/>
+    <dependency org="org.apache.kerby" name="kerb-common"
+                rev="${kerby.version}" conf="test->default"/>
+    <dependency org="org.apache.kerby" name="kerb-admin"
+                rev="${kerby.version}" conf="test->default"/>
+    <dependency org="org.apache.kerby" name="kerb-identity"
+                rev="${kerby.version}" conf="test->default"/>
+    <dependency org="org.apache.kerby" name="kerb-client"
+                rev="${kerby.version}" conf="test->default"/>
+    <dependency org="org.apache.kerby" name="kerb-util"
+                rev="${kerby.version}" conf="test->default"/>
+    <dependency org="org.apache.kerby" name="kerb-crypto"
+                rev="${kerby.version}" conf="test->default"/>
+    <dependency org="org.apache.kerby" name="kerby-util"
+                rev="${kerby.version}" conf="test->default"/>
+    <dependency org="org.apache.kerby" name="kerby-asn1"
+                rev="${kerby.version}" conf="test->default"/>
+
     <dependency org="net.java.dev.javacc" name="javacc" rev="${javacc.version}"
                 conf="javacc->default" />
 

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/main/org/apache/zookeeper/Login.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/Login.java b/src/java/main/org/apache/zookeeper/Login.java
index fd9a4c2..d97d6c1 100644
--- a/src/java/main/org/apache/zookeeper/Login.java
+++ b/src/java/main/org/apache/zookeeper/Login.java
@@ -299,7 +299,7 @@ public class Login {
         }
         LoginContext loginContext = new LoginContext(loginContextName,callbackHandler);
         loginContext.login();
-        LOG.info("successfully logged in.");
+        LOG.info("{} successfully logged in.", loginContextName);
         return loginContext;
     }
 

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/main/org/apache/zookeeper/SaslClientCallbackHandler.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/SaslClientCallbackHandler.java b/src/java/main/org/apache/zookeeper/SaslClientCallbackHandler.java
new file mode 100644
index 0000000..d6f5549
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/SaslClientCallbackHandler.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is used by the SASL mechanisms to get further information to complete
+ * the authentication. For example, a SASL mechanism might use this callback
+ * handler to do verification operation. The CallbackHandler interface here
+ * refers to javax.security.auth.callback.CallbackHandler. It should not be
+ * confused with ZooKeeper packet callbacks like
+ * org.apache.zookeeper.server.auth.SaslServerCallbackHandler.
+ */
+public class SaslClientCallbackHandler implements CallbackHandler {
+    private String password = null;
+    private static final Logger LOG = LoggerFactory.getLogger(SaslClientCallbackHandler.class);
+    private final String entity;
+    public SaslClientCallbackHandler(String password, String client) {
+        this.password = password;
+        this.entity = client;
+    }
+
+    public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
+        for (Callback callback : callbacks) {
+            if (callback instanceof NameCallback) {
+                NameCallback nc = (NameCallback) callback;
+                nc.setName(nc.getDefaultName());
+            }
+            else {
+                if (callback instanceof PasswordCallback) {
+                    PasswordCallback pc = (PasswordCallback)callback;
+                    if (password != null) {
+                        pc.setPassword(this.password.toCharArray());
+                    } else {
+                        LOG.warn("Could not login: the {} is being asked for a password, but the ZooKeeper {}" +
+                          " code does not currently support obtaining a password from the user." +
+                          " Make sure that the {} is configured to use a ticket cache (using" +
+                          " the JAAS configuration setting 'useTicketCache=true)' and restart the {}. If" +
+                          " you still get this message after that, the TGT in the ticket cache has expired and must" +
+                          " be manually refreshed. To do so, first determine if you are using a password or a" +
+                          " keytab. If the former, run kinit in a Unix shell in the environment of the user who" +
+                          " is running this Zookeeper {} using the command" +
+                          " 'kinit <princ>' (where <princ> is the name of the {}'s Kerberos principal)." +
+                          " If the latter, do" +
+                          " 'kinit -k -t <keytab> <princ>' (where <princ> is the name of the Kerberos principal, and" +
+                          " <keytab> is the location of the keytab file). After manually refreshing your cache," +
+                          " restart this {}. If you continue to see this message after manually refreshing" +
+                          " your cache, ensure that your KDC host's clock is in sync with this host's clock.",
+                          new Object[]{entity, entity, entity, entity, entity, entity, entity});
+                    }
+                }
+                else {
+                    if (callback instanceof RealmCallback) {
+                        RealmCallback rc = (RealmCallback) callback;
+                        rc.setText(rc.getDefaultText());
+                    }
+                    else {
+                        if (callback instanceof AuthorizeCallback) {
+                            AuthorizeCallback ac = (AuthorizeCallback) callback;
+                            String authid = ac.getAuthenticationID();
+                            String authzid = ac.getAuthorizationID();
+                            if (authid.equals(authzid)) {
+                                ac.setAuthorized(true);
+                            } else {
+                                ac.setAuthorized(false);
+                            }
+                            if (ac.isAuthorized()) {
+                                ac.setAuthorizedID(authzid);
+                            }
+                        }
+                        else {
+                            throw new UnsupportedCallbackException(callback, "Unrecognized SASL " + entity + "Callback");
+                        }
+                    }
+                }
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/main/org/apache/zookeeper/client/ZooKeeperSaslClient.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/client/ZooKeeperSaslClient.java b/src/java/main/org/apache/zookeeper/client/ZooKeeperSaslClient.java
index f921ca3..8550626 100644
--- a/src/java/main/org/apache/zookeeper/client/ZooKeeperSaslClient.java
+++ b/src/java/main/org/apache/zookeeper/client/ZooKeeperSaslClient.java
@@ -19,40 +19,27 @@
 package org.apache.zookeeper.client;
 
 import java.io.IOException;
-import java.security.Principal;
 import java.security.PrivilegedActionException;
 import java.security.PrivilegedExceptionAction;
 
 import javax.security.auth.Subject;
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
 import javax.security.auth.login.AppConfigurationEntry;
 import javax.security.auth.login.Configuration;
 import javax.security.auth.login.LoginException;
-import javax.security.sasl.AuthorizeCallback;
-import javax.security.sasl.RealmCallback;
-import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslClient;
 import javax.security.sasl.SaslException;
 
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.ClientCnxn;
 import org.apache.zookeeper.Login;
+import org.apache.zookeeper.SaslClientCallbackHandler;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.common.ZKConfig;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.proto.GetSASLRequest;
 import org.apache.zookeeper.proto.SetSASLResponse;
-import org.apache.zookeeper.server.auth.KerberosName;
-import org.ietf.jgss.GSSContext;
-import org.ietf.jgss.GSSCredential;
-import org.ietf.jgss.GSSException;
-import org.ietf.jgss.GSSManager;
-import org.ietf.jgss.Oid;
+
+import org.apache.zookeeper.util.SecurityUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -259,84 +246,14 @@ public class ZooKeeperSaslClient {
                         }
                         // note that the login object is static: it's shared amongst all zookeeper-related connections.
                         // in order to ensure the login is initialized only once, it must be synchronized the code snippet.
-                        login = new Login(loginContext, new ClientCallbackHandler(null), clientConfig);
+                        login = new Login(loginContext, new SaslClientCallbackHandler(null, "Client"), clientConfig);
                         login.startThreadIfNeeded();
                         initializedLogin = true;
                     }
                 }
             }
-            Subject subject = login.getSubject();
-            SaslClient saslClient;
-            // Use subject.getPrincipals().isEmpty() as an indication of which SASL mechanism to use:
-            // if empty, use DIGEST-MD5; otherwise, use GSSAPI.
-            if (subject.getPrincipals().isEmpty()) {
-                // no principals: must not be GSSAPI: use DIGEST-MD5 mechanism instead.
-                LOG.info("Client will use DIGEST-MD5 as SASL mechanism.");
-                String[] mechs = {"DIGEST-MD5"};
-                String username = (String)(subject.getPublicCredentials().toArray()[0]);
-                String password = (String)(subject.getPrivateCredentials().toArray()[0]);
-                // "zk-sasl-md5" is a hard-wired 'domain' parameter shared with zookeeper server code (see ServerCnxnFactory.java)
-                saslClient = Sasl.createSaslClient(mechs, username, "zookeeper", "zk-sasl-md5", null, new ClientCallbackHandler(password));
-                return saslClient;
-            }
-            else { // GSSAPI.
-                boolean usingNativeJgss = clientConfig.getBoolean(ZKConfig.JGSS_NATIVE);
-            	if (usingNativeJgss) {
-            		// http://docs.oracle.com/javase/6/docs/technotes/guides/security/jgss/jgss-features.html
-            		// """
-            		// In addition, when performing operations as a particular
-            		// Subject, e.g. Subject.doAs(...) or Subject.doAsPrivileged(...),
-            		// the to-be-used GSSCredential should be added to Subject's
-            		// private credential set. Otherwise, the GSS operations will
-            		// fail since no credential is found.
-            		// """
-            		try {
-            			GSSManager manager = GSSManager.getInstance();
-            			Oid krb5Mechanism = new Oid("1.2.840.113554.1.2.2");
-            			GSSCredential cred = manager.createCredential(null,
-            					GSSContext.DEFAULT_LIFETIME,
-            					krb5Mechanism,
-            					GSSCredential.INITIATE_ONLY);
-            			subject.getPrivateCredentials().add(cred);
-            			if (LOG.isDebugEnabled()) {
-            				LOG.debug("Added private credential to subject: " + cred);
-            			}
-            		} catch (GSSException ex) {
-            			LOG.warn("Cannot add private credential to subject; " +
-            					"authentication at the server may fail", ex);
-            		}
-            	}
-                final Object[] principals = subject.getPrincipals().toArray();
-                // determine client principal from subject.
-                final Principal clientPrincipal = (Principal)principals[0];
-                final KerberosName clientKerberosName = new KerberosName(clientPrincipal.getName());
-                // assume that server and client are in the same realm (by default; unless the system property
-                // "zookeeper.server.realm" is set).
-                String serverRealm = clientConfig.getProperty(
-                        ZKClientConfig.ZOOKEEPER_SERVER_REALM,
-                        clientKerberosName.getRealm());
-                KerberosName serviceKerberosName = new KerberosName(servicePrincipal+"@"+serverRealm);
-                final String serviceName = serviceKerberosName.getServiceName();
-                final String serviceHostname = serviceKerberosName.getHostName();
-                final String clientPrincipalName = clientKerberosName.toString();
-                try {
-                    saslClient = Subject.doAs(subject,new PrivilegedExceptionAction<SaslClient>() {
-                        public SaslClient run() throws SaslException {
-                            LOG.info("Client will use GSSAPI as SASL mechanism.");
-                            String[] mechs = {"GSSAPI"};
-                            LOG.debug("creating sasl client: client="+clientPrincipalName+";service="+serviceName+";serviceHostname="+serviceHostname);
-                            SaslClient saslClient = Sasl.createSaslClient(mechs,clientPrincipalName,serviceName,serviceHostname,null,new ClientCallbackHandler(null));
-                            return saslClient;
-                        }
-                    });
-                    return saslClient;
-                }
-                catch (Exception e) {
-                	LOG.error("Exception while trying to create SASL client", e);
-                    e.printStackTrace();
-                    return null;
-                }
-            }
+            return SecurityUtils.createSaslClient(login.getSubject(),
+                    servicePrincipal, "zookeeper", "zk-sasl-md5", LOG, "Client");
         } catch (LoginException e) {
             // We throw LoginExceptions...
             throw e;
@@ -505,75 +422,6 @@ public class ZooKeeperSaslClient {
         }
     }
 
-    // The CallbackHandler interface here refers to
-    // javax.security.auth.callback.CallbackHandler.
-    // It should not be confused with Zookeeper packet callbacks like
-    //  org.apache.zookeeper.server.auth.SaslServerCallbackHandler.
-    public static class ClientCallbackHandler implements CallbackHandler {
-        private String password = null;
-
-        public ClientCallbackHandler(String password) {
-            this.password = password;
-        }
-
-        public void handle(Callback[] callbacks) throws
-          UnsupportedCallbackException {
-            for (Callback callback : callbacks) {
-                if (callback instanceof NameCallback) {
-                    NameCallback nc = (NameCallback) callback;
-                    nc.setName(nc.getDefaultName());
-                }
-                else {
-                    if (callback instanceof PasswordCallback) {
-                        PasswordCallback pc = (PasswordCallback)callback;
-                        if (password != null) {
-                            pc.setPassword(this.password.toCharArray());
-                        } else {
-                            LOG.warn("Could not login: the client is being asked for a password, but the Zookeeper" +
-                              " client code does not currently support obtaining a password from the user." +
-                              " Make sure that the client is configured to use a ticket cache (using" +
-                              " the JAAS configuration setting 'useTicketCache=true)' and restart the client. If" +
-                              " you still get this message after that, the TGT in the ticket cache has expired and must" +
-                              " be manually refreshed. To do so, first determine if you are using a password or a" +
-                              " keytab. If the former, run kinit in a Unix shell in the environment of the user who" +
-                              " is running this Zookeeper client using the command" +
-                              " 'kinit <princ>' (where <princ> is the name of the client's Kerberos principal)." +
-                              " If the latter, do" +
-                              " 'kinit -k -t <keytab> <princ>' (where <princ> is the name of the Kerberos principal, and" +
-                              " <keytab> is the location of the keytab file). After manually refreshing your cache," +
-                              " restart this client. If you continue to see this message after manually refreshing" +
-                              " your cache, ensure that your KDC host's clock is in sync with this host's clock.");
-                        }
-                    }
-                    else {
-                        if (callback instanceof RealmCallback) {
-                            RealmCallback rc = (RealmCallback) callback;
-                            rc.setText(rc.getDefaultText());
-                        }
-                        else {
-                            if (callback instanceof AuthorizeCallback) {
-                                AuthorizeCallback ac = (AuthorizeCallback) callback;
-                                String authid = ac.getAuthenticationID();
-                                String authzid = ac.getAuthorizationID();
-                                if (authid.equals(authzid)) {
-                                    ac.setAuthorized(true);
-                                } else {
-                                    ac.setAuthorized(false);
-                                }
-                                if (ac.isAuthorized()) {
-                                    ac.setAuthorizedID(authzid);
-                                }
-                            }
-                            else {
-                                throw new UnsupportedCallbackException(callback,"Unrecognized SASL ClientCallback");
-                            }
-                        }
-                    }
-                }
-            }
-        }
-    }
-
     public boolean clientTunneledAuthenticationInProgress() {
     	if (!isSASLConfigured) {
     	    return false;

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/main/org/apache/zookeeper/server/ServerCnxn.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/ServerCnxn.java b/src/java/main/org/apache/zookeeper/server/ServerCnxn.java
index e469cda..f8a90cf 100644
--- a/src/java/main/org/apache/zookeeper/server/ServerCnxn.java
+++ b/src/java/main/org/apache/zookeeper/server/ServerCnxn.java
@@ -24,7 +24,13 @@ import java.io.StringWriter;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.security.cert.Certificate;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/main/org/apache/zookeeper/server/ZooKeeperSaslServer.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/ZooKeeperSaslServer.java b/src/java/main/org/apache/zookeeper/server/ZooKeeperSaslServer.java
index c792378..af19e5d 100644
--- a/src/java/main/org/apache/zookeeper/server/ZooKeeperSaslServer.java
+++ b/src/java/main/org/apache/zookeeper/server/ZooKeeperSaslServer.java
@@ -18,22 +18,12 @@
 
 package org.apache.zookeeper.server;
 
-import java.security.Principal;
-import java.security.PrivilegedActionException;
-import java.security.PrivilegedExceptionAction;
-
 import javax.security.auth.Subject;
-import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslException;
 import javax.security.sasl.SaslServer;
 
 import org.apache.zookeeper.Login;
-import org.ietf.jgss.GSSContext;
-import org.ietf.jgss.GSSCredential;
-import org.ietf.jgss.GSSException;
-import org.ietf.jgss.GSSManager;
-import org.ietf.jgss.GSSName;
-import org.ietf.jgss.Oid;
+import org.apache.zookeeper.util.SecurityUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,114 +41,9 @@ public class ZooKeeperSaslServer {
     private SaslServer createSaslServer(final Login login) {
         synchronized (login) {
             Subject subject = login.getSubject();
-            if (subject != null) {
-                // server is using a JAAS-authenticated subject: determine service principal name and hostname from zk server's subject.
-                if (subject.getPrincipals().size() > 0) {
-                    try {
-                        final Object[] principals = subject.getPrincipals().toArray();
-                        final Principal servicePrincipal = (Principal)principals[0];
-
-                        // e.g. servicePrincipalNameAndHostname := "zookeeper/myhost.foo.com@FOO.COM"
-                        final String servicePrincipalNameAndHostname = servicePrincipal.getName();
-
-                        int indexOf = servicePrincipalNameAndHostname.indexOf("/");
-
-                        // e.g. serviceHostnameAndKerbDomain := "myhost.foo.com@FOO.COM"
-                        final String serviceHostnameAndKerbDomain = servicePrincipalNameAndHostname.substring(indexOf+1,servicePrincipalNameAndHostname.length());
-
-                        int indexOfAt = serviceHostnameAndKerbDomain.indexOf("@");
-
-                        // Handle Kerberos Service as well as User Principal Names
-                        final String servicePrincipalName, serviceHostname;
-                        if (indexOf > 0){
-                            // e.g. servicePrincipalName := "zookeeper"
-                            servicePrincipalName = servicePrincipalNameAndHostname.substring(0, indexOf);
-                            // e.g. serviceHostname := "myhost.foo.com"
-                            serviceHostname = serviceHostnameAndKerbDomain.substring(0, indexOfAt);
-                        } else {
-                            servicePrincipalName = servicePrincipalNameAndHostname.substring(0, indexOfAt);
-                            serviceHostname = null;
-                        }
-                        
-                        final String mech = "GSSAPI";   // TODO: should depend on zoo.cfg specified mechs, but if subject is non-null, it can be assumed to be GSSAPI.
-
-                        LOG.debug("serviceHostname is '"+ serviceHostname + "'");
-                        LOG.debug("servicePrincipalName is '"+ servicePrincipalName + "'");
-                        LOG.debug("SASL mechanism(mech) is '"+ mech +"'");
-
-                        boolean usingNativeJgss =
-                        		Boolean.getBoolean("sun.security.jgss.native");
-                        if (usingNativeJgss) {
-                        	// http://docs.oracle.com/javase/6/docs/technotes/guides/security/jgss/jgss-features.html
-                        	// """
-                        	// In addition, when performing operations as a particular
-                        	// Subject, e.g. Subject.doAs(...) or
-                        	// Subject.doAsPrivileged(...), the to-be-used
-                        	// GSSCredential should be added to Subject's
-                        	// private credential set. Otherwise, the GSS operations
-                        	// will fail since no credential is found.
-                        	// """
-                        	try {
-                        		GSSManager manager = GSSManager.getInstance();
-                        		Oid krb5Mechanism = new Oid("1.2.840.113554.1.2.2");
-                        		GSSName gssName = manager.createName(
-                        				servicePrincipalName + "@" + serviceHostname,
-                        				GSSName.NT_HOSTBASED_SERVICE);
-                        		GSSCredential cred = manager.createCredential(gssName,
-                        				GSSContext.DEFAULT_LIFETIME,
-                        				krb5Mechanism,
-                        				GSSCredential.ACCEPT_ONLY);
-                        		subject.getPrivateCredentials().add(cred);
-                        		if (LOG.isDebugEnabled()) {
-                        			LOG.debug("Added private credential to subject: " + cred);
-                        		}
-                        	} catch (GSSException ex) {
-                        		LOG.warn("Cannot add private credential to subject; " +
-                        				"clients authentication may fail", ex);
-                        	}
-                        }
-                        try {
-                            return Subject.doAs(subject,new PrivilegedExceptionAction<SaslServer>() {
-                                public SaslServer run() {
-                                    try {
-                                        SaslServer saslServer;
-                                        saslServer = Sasl.createSaslServer(mech, servicePrincipalName, serviceHostname, null, login.callbackHandler);
-                                        return saslServer;
-                                    }
-                                    catch (SaslException e) {
-                                        LOG.error("Zookeeper Server failed to create a SaslServer to interact with a client during session initiation: " + e);
-                                        e.printStackTrace();
-                                        return null;
-                                    }
-                                }
-                            }
-                            );
-                        }
-                        catch (PrivilegedActionException e) {
-                            // TODO: exit server at this point(?)
-                            LOG.error("Zookeeper Quorum member experienced a PrivilegedActionException exception while creating a SaslServer using a JAAS principal context:" + e);
-                            e.printStackTrace();
-                        }
-                    }
-                    catch (IndexOutOfBoundsException e) {
-                        LOG.error("server principal name/hostname determination error: ", e);
-                    }
-                }
-                else {
-                    // JAAS non-GSSAPI authentication: assuming and supporting only DIGEST-MD5 mechanism for now.
-                    // TODO: use 'authMech=' value in zoo.cfg.
-                    try {
-                        SaslServer saslServer = Sasl.createSaslServer("DIGEST-MD5","zookeeper","zk-sasl-md5",null, login.callbackHandler);
-                        return saslServer;
-                    }
-                    catch (SaslException e) {
-                        LOG.error("Zookeeper Quorum member failed to create a SaslServer to interact with a client during session initiation", e);
-                    }
-                }
-            }
+            return SecurityUtils.createSaslServer(subject, "zookeeper",
+                    "zk-sasl-md5", login.callbackHandler, LOG);
         }
-        LOG.error("failed to create saslServer object.");
-        return null;
     }
 
     public byte[] evaluateResponse(byte[] response) throws SaslException {

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
index 11b6981..3ac1ddd 100644
--- a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -61,6 +61,7 @@ import org.apache.zookeeper.server.RequestProcessor.RequestProcessorException;
 import org.apache.zookeeper.server.ServerCnxn.CloseRequestException;
 import org.apache.zookeeper.server.SessionTracker.Session;
 import org.apache.zookeeper.server.SessionTracker.SessionExpirer;
+import org.apache.zookeeper.server.auth.AuthenticationProvider;
 import org.apache.zookeeper.server.auth.ProviderRegistry;
 import org.apache.zookeeper.server.auth.ServerAuthenticationProvider;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/main/org/apache/zookeeper/server/auth/SaslServerCallbackHandler.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/auth/SaslServerCallbackHandler.java b/src/java/main/org/apache/zookeeper/server/auth/SaslServerCallbackHandler.java
index 7fdffde..9f53a4d 100644
--- a/src/java/main/org/apache/zookeeper/server/auth/SaslServerCallbackHandler.java
+++ b/src/java/main/org/apache/zookeeper/server/auth/SaslServerCallbackHandler.java
@@ -46,13 +46,15 @@ public class SaslServerCallbackHandler implements CallbackHandler {
     private String userName;
     private final Map<String,String> credentials = new HashMap<String,String>();
 
-    public SaslServerCallbackHandler(Configuration configuration) throws IOException {
-        String serverSection = System.getProperty(ZooKeeperSaslServer.LOGIN_CONTEXT_NAME_KEY,
-                                                  ZooKeeperSaslServer.DEFAULT_LOGIN_CONTEXT_NAME);
+    public SaslServerCallbackHandler(Configuration configuration)
+            throws IOException {
+        String serverSection = System.getProperty(
+                ZooKeeperSaslServer.LOGIN_CONTEXT_NAME_KEY,
+                ZooKeeperSaslServer.DEFAULT_LOGIN_CONTEXT_NAME);
         AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(serverSection);
 
         if (configurationEntries == null) {
-            String errorMessage = "Could not find a 'Server' entry in this configuration: Server cannot start.";
+            String errorMessage = "Could not find a '" + serverSection + "' entry in this configuration: Server cannot start.";
             LOG.error(errorMessage);
             throw new IOException(errorMessage);
         }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java b/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
index 4e85ce0..f1112b7 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
@@ -1078,6 +1078,8 @@ public class FastLeaderElection implements Election {
                 LOG.warn("Failed to unregister with JMX", e);
             }
             self.jmxLeaderElectionBean = null;
+            LOG.debug("Number of connection processing threads: {}",
+                    manager.getConnectionThreadCount());
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/Follower.java b/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
index 1cdc202..84d29c8 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
@@ -27,6 +27,7 @@ import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.common.Time;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.util.SerializeUtils;
 import org.apache.zookeeper.server.util.ZxidUtils;
 import org.apache.zookeeper.txn.SetDataTxn;
@@ -71,9 +72,9 @@ public class Follower extends Learner{
         self.end_fle = 0;
         fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
         try {
-            InetSocketAddress addr = findLeader();            
+            QuorumServer leaderServer = findLeader();
             try {
-                connectToLeader(addr);
+                connectToLeader(leaderServer.addr, leaderServer.hostname);
                 long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
                 if (self.isReconfigStateChange())
                    throw new Exception("learned about role change");

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/Leader.java b/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
index cf2eb37..cebf2e3 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
@@ -19,6 +19,7 @@
 package org.apache.zookeeper.server.quorum;
 
 import java.io.ByteArrayOutputStream;
+import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.net.BindException;
 import java.net.ServerSocket;
@@ -39,6 +40,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 
+import javax.security.sasl.SaslException;
+
 import org.apache.jute.BinaryOutputArchive;
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.common.Time;
@@ -367,7 +370,10 @@ public class Leader {
                         // in LearnerHandler switch to the syncLimit
                         s.setSoTimeout(self.tickTime * self.initLimit);
                         s.setTcpNoDelay(nodelay);
-                        LearnerHandler fh = new LearnerHandler(s, Leader.this);
+
+                        BufferedInputStream is = new BufferedInputStream(
+                                s.getInputStream());
+                        LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
                         fh.start();
                     } catch (SocketException e) {
                         if (stop) {
@@ -381,6 +387,8 @@ public class Leader {
                         } else {
                             throw e;
                         }
+                    } catch (SaslException e){
+                        LOG.error("Exception while connecting to quorum learner", e);
                     }
                 }
             } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/Learner.java b/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
index 726e688..a37cc93 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
@@ -50,6 +50,8 @@ import org.apache.zookeeper.server.util.SerializeUtils;
 import org.apache.zookeeper.server.util.ZxidUtils;
 import org.apache.zookeeper.txn.SetDataTxn;
 import org.apache.zookeeper.txn.TxnHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class is the superclass of two of the three main actors in a ZK
@@ -193,21 +195,21 @@ public class Learner {
     /**
      * Returns the address of the node we think is the leader.
      */
-    protected InetSocketAddress findLeader() {
-        InetSocketAddress addr = null;
+    protected QuorumServer findLeader() {
+        QuorumServer leaderServer = null;
         // Find the leader by id
         Vote current = self.getCurrentVote();
         for (QuorumServer s : self.getView().values()) {
             if (s.id == current.getId()) {
-                addr = s.addr;
+                leaderServer = s;
                 break;
             }
         }
-        if (addr == null) {
+        if (leaderServer == null) {
             LOG.warn("Couldn't find the leader with id = "
                     + current.getId());
         }
-        return addr;
+        return leaderServer;
     }
    
     /**
@@ -232,10 +234,11 @@ public class Learner {
      * until either initLimit time has elapsed or 5 tries have happened. 
      * @param addr - the address of the Leader to connect to.
      * @throws IOException - if the socket connection fails on the 5th attempt
+     * <li>if there is an authentication failure while connecting to leader</li>
      * @throws ConnectException
      * @throws InterruptedException
      */
-    protected void connectToLeader(InetSocketAddress addr) 
+    protected void connectToLeader(InetSocketAddress addr, String hostname)
     throws IOException, ConnectException, InterruptedException {
         sock = new Socket();        
         sock.setSoTimeout(self.tickTime * self.initLimit);
@@ -279,6 +282,9 @@ public class Learner {
             }
             Thread.sleep(1000);
         }
+
+        self.authLearner.authenticate(sock, hostname);
+
         leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(
                 sock.getInputStream()));
         bufferedOutput = new BufferedOutputStream(sock.getOutputStream());

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java b/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
index 1fbe708..7247f5c 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
@@ -33,6 +33,8 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 
+import javax.security.sasl.SaslException;
+
 import org.apache.jute.BinaryInputArchive;
 import org.apache.jute.BinaryOutputArchive;
 import org.apache.jute.Record;
@@ -155,6 +157,7 @@ public class LearnerHandler extends ZooKeeperThread {
 
     private BinaryOutputArchive oa;
 
+    private final BufferedInputStream bufferedInput;
     private BufferedOutputStream bufferedOutput;
     
     /**
@@ -179,11 +182,27 @@ public class LearnerHandler extends ZooKeeperThread {
      */
     private long leaderLastZxid;
 
-    LearnerHandler(Socket sock, Leader leader) throws IOException {
+    LearnerHandler(Socket sock, BufferedInputStream bufferedInput,Leader leader) throws IOException {
         super("LearnerHandler-" + sock.getRemoteSocketAddress());
         this.sock = sock;
         this.leader = leader;
-        leader.addLearnerHandler(this);
+        this.bufferedInput = bufferedInput;
+
+        try {
+            if (leader.self != null) {
+                leader.self.authServer.authenticate(sock,
+                        new DataInputStream(bufferedInput));
+            }
+        } catch (IOException e) {
+            LOG.error("Server failed to authenticate quorum learner, addr: {}, closing connection",
+                    sock.getRemoteSocketAddress(), e);
+            try {
+                sock.close();
+            } catch (IOException ie) {
+                LOG.error("Exception while closing socket", ie);
+            }
+            throw new SaslException("Authentication failure: " + e.getMessage());
+        }
     }
 
     @Override
@@ -343,11 +362,11 @@ public class LearnerHandler extends ZooKeeperThread {
     @Override
     public void run() {
         try {
+            leader.addLearnerHandler(this);
             tickOfNextAckDeadline = leader.self.tick.get()
                     + leader.self.initLimit + leader.self.syncLimit;
 
-            ia = BinaryInputArchive.getArchive(new BufferedInputStream(sock
-                    .getInputStream()));
+            ia = BinaryInputArchive.getArchive(bufferedInput);
             bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
             oa = BinaryOutputArchive.getArchive(bufferedOutput);
 

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/main/org/apache/zookeeper/server/quorum/Observer.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/Observer.java b/src/java/main/org/apache/zookeeper/server/quorum/Observer.java
index 27368c7..f0f724e 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/Observer.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/Observer.java
@@ -19,12 +19,12 @@
 package org.apache.zookeeper.server.quorum;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 
 import org.apache.jute.Record;
 import org.apache.zookeeper.server.ObserverBean;
 import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.server.util.SerializeUtils;
 import org.apache.zookeeper.txn.SetDataTxn;
@@ -63,10 +63,10 @@ public class Observer extends Learner{
         zk.registerJMX(new ObserverBean(this, zk), self.jmxLocalPeerBean);
 
         try {
-            InetSocketAddress addr = findLeader();
-            LOG.info("Observing " + addr);
+            QuorumServer leaderServer = findLeader();
+            LOG.info("Observing " + leaderServer.addr);
             try {
-                connectToLeader(addr);
+                connectToLeader(leaderServer.addr, leaderServer.hostname);
                 long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);
                 if (self.isReconfigStateChange())
                    throw new Exception("learned about role change");

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
index c8f73a3..09da63a 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.zookeeper.server.quorum;
 
+import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
@@ -30,15 +31,24 @@ import java.net.SocketTimeoutException;
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.nio.channels.UnresolvedAddressException;
+import java.util.Collections;
 import java.util.Enumeration;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.NoSuchElementException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.zookeeper.server.ZooKeeperThread;
+import org.apache.zookeeper.server.quorum.auth.QuorumAuthLearner;
+import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -79,8 +89,8 @@ public class QuorumCnxManager {
     /*
      * Negative counter for observer server ids.
      */
-    
-    private long observerCounter = -1;
+
+    private AtomicLong observerCounter = new AtomicLong(-1);
 
     /*
      * Protocol identifier used among peers
@@ -97,11 +107,26 @@ public class QuorumCnxManager {
      */
     
     private int cnxTO = 5000;
-    
+
+    final QuorumPeer self;
+
     /*
      * Local IP address
      */
-    final QuorumPeer self;
+    final long mySid;
+    final int socketTimeout;
+    final Map<Long, QuorumPeer.QuorumServer> view;
+    final boolean listenOnAllIPs;
+    private ThreadPoolExecutor connectionExecutor;
+    private final Set<Long> inprogressConnections = Collections
+            .synchronizedSet(new HashSet<Long>());
+    private QuorumAuthServer authServer;
+    private QuorumAuthLearner authLearner;
+    private boolean quorumSaslAuthEnabled;
+    /*
+     * Counter to count connection processing threads.
+     */
+    private AtomicInteger connectionThreadCnt = new AtomicInteger(0);
 
     /*
      * Mapping from Peer to Thread number
@@ -217,7 +242,15 @@ public class QuorumCnxManager {
         }
     }
 
-    public QuorumCnxManager(QuorumPeer self) {
+    public QuorumCnxManager(QuorumPeer self,
+                            final long mySid,
+                            Map<Long,QuorumPeer.QuorumServer> view,
+                            QuorumAuthServer authServer,
+                            QuorumAuthLearner authLearner,
+                            int socketTimeout,
+                            boolean listenOnAllIPs,
+                            int quorumCnxnThreadsSize,
+                            boolean quorumSaslAuthEnabled) {
         this.recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY);
         this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
         this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
@@ -230,11 +263,54 @@ public class QuorumCnxManager {
         
         this.self = self;
 
-        // Starts listener thread that waits for connection requests 
+        this.mySid = mySid;
+        this.socketTimeout = socketTimeout;
+        this.view = view;
+        this.listenOnAllIPs = listenOnAllIPs;
+
+        initializeAuth(mySid, authServer, authLearner, quorumCnxnThreadsSize,
+                quorumSaslAuthEnabled);
+
+        // Starts listener thread that waits for connection requests
         listener = new Listener();
         listener.setName("QuorumPeerListener");
     }
 
+    private void initializeAuth(final long mySid,
+                                final QuorumAuthServer authServer,
+                                final QuorumAuthLearner authLearner,
+                                final int quorumCnxnThreadsSize,
+                                final boolean quorumSaslAuthEnabled) {
+        this.authServer = authServer;
+        this.authLearner = authLearner;
+        this.quorumSaslAuthEnabled = quorumSaslAuthEnabled;
+        if (!this.quorumSaslAuthEnabled) {
+            LOG.debug("Not initializing connection executor as quorum sasl auth is disabled");
+            return;
+        }
+
+        // init connection executors
+        final AtomicInteger threadIndex = new AtomicInteger(1);
+        SecurityManager s = System.getSecurityManager();
+        final ThreadGroup group = (s != null) ? s.getThreadGroup()
+                : Thread.currentThread().getThreadGroup();
+        ThreadFactory daemonThFactory = new ThreadFactory() {
+
+            @Override
+            public Thread newThread(Runnable r) {
+                Thread t = new Thread(group, r, "QuorumConnectionThread-"
+                        + "[myid=" + mySid + "]-"
+                        + threadIndex.getAndIncrement());
+                return t;
+            }
+        };
+        this.connectionExecutor = new ThreadPoolExecutor(3,
+                quorumCnxnThreadsSize, 60, TimeUnit.SECONDS,
+                new SynchronousQueue<Runnable>(), daemonThFactory);
+        this.connectionExecutor.allowCoreThreadTimeOut(true);
+    }
+
+
     /**
      * Invokes initiateConnection for testing purposes
      * 
@@ -247,17 +323,80 @@ public class QuorumCnxManager {
         sock.connect(self.getVotingView().get(sid).electionAddr, cnxTO);
         initiateConnection(sock, sid);
     }
-    
+
     /**
      * If this server has initiated the connection, then it gives up on the
      * connection if it loses challenge. Otherwise, it keeps the connection.
      */
-    public boolean initiateConnection(Socket sock, Long sid) {
+    public void initiateConnection(final Socket sock, final Long sid) {
+        try {
+            startConnection(sock, sid);
+        } catch (IOException e) {
+            LOG.error("Exception while connecting, id: {}, addr: {}, closing learner connection",
+                    new Object[] { sid, sock.getRemoteSocketAddress() }, e);
+            closeSocket(sock);
+            return;
+        }
+    }
+
+    /**
+     * Server will initiate the connection request to its peer server
+     * asynchronously via separate connection thread.
+     */
+    public void initiateConnectionAsync(final Socket sock, final Long sid) {
+        if(!inprogressConnections.add(sid)){
+            // simply return as there is a connection request to
+            // server 'sid' already in progress.
+            LOG.debug("Connection request to server id: {} is already in progress, so skipping this request",
+                    sid);
+            closeSocket(sock);
+            return;
+        }
+        try {
+            connectionExecutor.execute(
+                    new QuorumConnectionReqThread(sock, sid));
+            connectionThreadCnt.incrementAndGet();
+        } catch (Throwable e) {
+            // Imp: Safer side catching all type of exceptions and remove 'sid'
+            // from inprogress connections. This is to avoid blocking further
+            // connection requests from this 'sid' in case of errors.
+            inprogressConnections.remove(sid);
+            LOG.error("Exception while submitting quorum connection request", e);
+            closeSocket(sock);
+        }
+    }
+
+    /**
+     * Thread to send connection request to peer server.
+     */
+    private class QuorumConnectionReqThread extends ZooKeeperThread {
+        final Socket sock;
+        final Long sid;
+        QuorumConnectionReqThread(final Socket sock, final Long sid) {
+            super("QuorumConnectionReqThread-" + sid);
+            this.sock = sock;
+            this.sid = sid;
+        }
+
+        @Override
+        public void run() {
+            try{
+                initiateConnection(sock, sid);
+            } finally {
+                inprogressConnections.remove(sid);
+            }
+        }
+    }
+
+    private boolean startConnection(Socket sock, Long sid)
+            throws IOException {
+        DataOutputStream dout = null;
+        DataInputStream din = null;
         try {
             // Use BufferedOutputStream to reduce the number of IP packets. This is
             // important for x-DC scenarios.
             BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());
-            DataOutputStream dout = new DataOutputStream(buf);
+            dout = new DataOutputStream(buf);
 
             // Sending id and challenge
 
@@ -269,12 +408,22 @@ public class QuorumCnxManager {
             dout.writeInt(addr_bytes.length);
             dout.write(addr_bytes);
             dout.flush();
+
+            din = new DataInputStream(
+                    new BufferedInputStream(sock.getInputStream()));
         } catch (IOException e) {
             LOG.warn("Ignoring exception reading or writing challenge: ", e);
             closeSocket(sock);
             return false;
         }
-        
+
+        // authenticate learner
+        QuorumPeer.QuorumServer qps = self.getVotingView().get(sid);
+        if (qps != null) {
+            // TODO - investigate why reconfig makes qps null.
+            authLearner.authenticate(sock, qps.hostname);
+        }
+
         // If lost the challenge, then drop the new connection
         if (sid > self.getId()) {
             LOG.info("Have smaller server identifier, so dropping the " +
@@ -283,7 +432,7 @@ public class QuorumCnxManager {
             // Otherwise proceed with the connection
         } else {
             SendWorker sw = new SendWorker(sock, sid);
-            RecvWorker rw = new RecvWorker(sock, sid, sw);
+            RecvWorker rw = new RecvWorker(sock, din, sid, sw);
             sw.setRecv(rw);
 
             SendWorker vsw = senderWorkerMap.get(sid);
@@ -312,13 +461,58 @@ public class QuorumCnxManager {
      * possible long value to lose the challenge.
      *
      */
-    public void receiveConnection(Socket sock) {
+    public void receiveConnection(final Socket sock) {
+        DataInputStream din = null;
+        try {
+            din = new DataInputStream(
+                    new BufferedInputStream(sock.getInputStream()));
+
+            handleConnection(sock, din);
+        } catch (IOException e) {
+            LOG.error("Exception handling connection, addr: {}, closing server connection",
+                    sock.getRemoteSocketAddress());
+            closeSocket(sock);
+        }
+    }
+
+    /**
+     * Server receives a connection request and handles it asynchronously via
+     * separate thread.
+     */
+    public void receiveConnectionAsync(final Socket sock) {
+        try {
+            connectionExecutor.execute(
+                    new QuorumConnectionReceiverThread(sock));
+            connectionThreadCnt.incrementAndGet();
+        } catch (Throwable e) {
+            LOG.error("Exception handling connection, addr: {}, closing server connection",
+                    sock.getRemoteSocketAddress());
+            closeSocket(sock);
+        }
+    }
+
+    /**
+     * Thread to receive connection request from peer server.
+     */
+    private class QuorumConnectionReceiverThread extends ZooKeeperThread {
+        private final Socket sock;
+        QuorumConnectionReceiverThread(final Socket sock) {
+            super("QuorumConnectionReceiverThread-" + sock.getRemoteSocketAddress());
+            this.sock = sock;
+        }
+
+        @Override
+        public void run() {
+            receiveConnection(sock);
+        }
+    }
+
+    private void handleConnection(Socket sock, DataInputStream din)
+            throws IOException {
         Long sid = null, protocolVersion = null;
         InetSocketAddress electionAddr = null;
 
         try {
-            DataInputStream din = new DataInputStream(sock.getInputStream());
-
             protocolVersion = din.readLong();
             if (protocolVersion >= 0) { // this is a server id and not a protocol version
                 sid = protocolVersion;
@@ -339,16 +533,18 @@ public class QuorumCnxManager {
                  * Choose identifier at random. We need a value to identify
                  * the connection.
                  */
-                
-                sid = observerCounter--;
-                LOG.info("Setting arbitrary identifier to observer: {}", sid);
+                sid = observerCounter.getAndDecrement();
+                LOG.info("Setting arbitrary identifier to observer: " + sid);
             }
         } catch (IOException e) {
             closeSocket(sock);
             LOG.warn("Exception reading or writing challenge: {}", e.toString());
             return;
         }
-        
+
+        // do authenticating learner
+        authServer.authenticate(sock, din);
+
         //If wins the challenge, then close the new connection.
         if (sid < self.getId()) {
             /*
@@ -375,7 +571,7 @@ public class QuorumCnxManager {
 
         } else { // Otherwise start worker threads to receive data.
             SendWorker sw = new SendWorker(sock, sid);
-            RecvWorker rw = new RecvWorker(sock, sid, sw);
+            RecvWorker rw = new RecvWorker(sock, din, sid, sw);
             sw.setRecv(rw);
 
             SendWorker vsw = senderWorkerMap.get(sid);
@@ -402,7 +598,7 @@ public class QuorumCnxManager {
         /*
          * If sending message to myself, then simply enqueue it (loopback).
          */
-        if (self.getId() == sid) {
+        if (this.mySid == sid) {
              b.position(0);
              addToRecvQueue(new Message(b.duplicate(), sid));
             /*
@@ -444,7 +640,15 @@ public class QuorumCnxManager {
              setSockOpts(sock);
              sock.connect(electionAddr, cnxTO);
              LOG.debug("Connected to server " + sid);
-             initiateConnection(sock, sid);
+            // Sends connection request asynchronously if the quorum
+            // sasl authentication is enabled. This is required because
+            // sasl server authentication process may take few seconds to
+            // finish, this may delay next peer connection requests.
+            if (quorumSaslAuthEnabled) {
+                initiateConnectionAsync(sock, sid);
+            } else {
+                initiateConnection(sock, sid);
+            }
              return true;
          } catch (UnresolvedAddressException e) {
              // Sun doesn't include the address that causes this
@@ -547,6 +751,13 @@ public class QuorumCnxManager {
             LOG.warn("Got interrupted before joining the listener", ex);
         }
         softHalt();
+
+        // clear data structures used for auth
+        if (connectionExecutor != null) {
+            connectionExecutor.shutdown();
+        }
+        inprogressConnections.clear();
+        resetConnectionThreadCount();
     }
    
     /**
@@ -595,11 +806,19 @@ public class QuorumCnxManager {
     public long getThreadCount() {
         return threadCnt.get();
     }
+
+    /**
+     * Return number of connection processing threads.
+     */
+    public long getConnectionThreadCount() {
+        return connectionThreadCnt.get();
+    }
+
     /**
-     * Return reference to QuorumPeer
+     * Reset the value of connection processing threads count to zero.
      */
-    public QuorumPeer getQuorumPeer() {
-        return self;
+    private void resetConnectionThreadCount() {
+        connectionThreadCnt.set(0);
     }
 
     /**
@@ -645,7 +864,16 @@ public class QuorumCnxManager {
                             setSockOpts(client);
                             LOG.info("Received connection request "
                                      + client.getRemoteSocketAddress());
-                            receiveConnection(client);
+                            // Receive and handle the connection request
+                            // asynchronously if the quorum sasl authentication is
+                            // enabled. This is required because sasl server
+                            // authentication process may take few seconds to finish,
+                            // this may delay next peer connection requests.
+                            if (quorumSaslAuthEnabled) {
+                                receiveConnectionAsync(client);
+                            } else {
+                                receiveConnection(client);
+                            }
                             numRetries = 0;
                         } catch (SocketTimeoutException e) {
                             LOG.warn("The socket is listening for the election accepted "
@@ -695,7 +923,8 @@ public class QuorumCnxManager {
             try{
                 LOG.debug("Trying to close listener: " + ss);
                 if(ss != null) {
-                    LOG.debug("Closing listener: " + self.getId());
+                    LOG.debug("Closing listener: "
+                              + QuorumCnxManager.this.mySid);
                     ss.close();
                 }
             } catch (IOException e){
@@ -847,8 +1076,9 @@ public class QuorumCnxManager {
                     }
                 }
             } catch (Exception e) {
-                LOG.warn("Exception when using channel: for id " + sid + " my id = " + 
-                        self.getId() + " error = " + e);
+                LOG.warn("Exception when using channel: for id " + sid
+                         + " my id = " + QuorumCnxManager.this.mySid
+                         + " error = " + e);
             }
             this.finish();
             LOG.warn("Send worker leaving thread " + " id " + sid + " my id = " + self.getId());
@@ -863,16 +1093,16 @@ public class QuorumCnxManager {
         Long sid;
         Socket sock;
         volatile boolean running = true;
-        DataInputStream din;
+        final DataInputStream din;
         final SendWorker sw;
 
-        RecvWorker(Socket sock, Long sid, SendWorker sw) {
+        RecvWorker(Socket sock, DataInputStream din, Long sid, SendWorker sw) {
             super("RecvWorker:" + sid);
             this.sid = sid;
             this.sock = sock;
             this.sw = sw;
+            this.din = din;
             try {
-                din = new DataInputStream(sock.getInputStream());
                 // OK to wait until socket disconnects while reading.
                 sock.setSoTimeout(0);
             } catch (IOException e) {
@@ -925,8 +1155,8 @@ public class QuorumCnxManager {
                     addToRecvQueue(new Message(message.duplicate(), sid));
                 }
             } catch (Exception e) {
-                LOG.warn("Connection broken for id " + sid + ", my id = " + 
-                        self.getId() + ", error = " , e);
+                LOG.warn("Connection broken for id " + sid + ", my id = "
+                         + QuorumCnxManager.this.mySid + ", error = " , e);
             } finally {
                 LOG.warn("Interrupting SendWorker");
                 sw.finish();
@@ -1046,4 +1276,8 @@ public class QuorumCnxManager {
        throws InterruptedException {
        return recvQueue.poll(timeout, unit);
     }
+
+    public boolean connectedToPeer(long peerSid) {
+        return senderWorkerMap.get(peerSid) != null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
index 01e5947..45f32b9 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
@@ -42,6 +42,8 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import javax.security.sasl.SaslException;
+
 import org.apache.zookeeper.KeeperException.BadArgumentsException;
 import org.apache.zookeeper.common.AtomicFileWritingIdiom;
 import org.apache.zookeeper.common.AtomicFileWritingIdiom.WriterStatement;
@@ -52,6 +54,13 @@ import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.ZooKeeperThread;
+import org.apache.zookeeper.server.quorum.auth.QuorumAuth;
+import org.apache.zookeeper.server.quorum.auth.QuorumAuthLearner;
+import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
+import org.apache.zookeeper.server.quorum.auth.SaslQuorumAuthLearner;
+import org.apache.zookeeper.server.quorum.auth.SaslQuorumAuthServer;
+import org.apache.zookeeper.server.quorum.auth.NullQuorumAuthLearner;
+import org.apache.zookeeper.server.quorum.auth.NullQuorumAuthServer;
 import org.apache.zookeeper.server.admin.AdminServer;
 import org.apache.zookeeper.server.admin.AdminServer.AdminServerException;
 import org.apache.zookeeper.server.admin.AdminServerFactory;
@@ -98,6 +107,8 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
     private Map<Long, RemotePeerBean> jmxRemotePeerBean;
     LeaderElectionBean jmxLeaderElectionBean;
     private QuorumCnxManager qcm;
+    QuorumAuthServer authServer;
+    QuorumAuthLearner authLearner;
 
     /**
      * ZKDatabase is a top level member of quorumpeer
@@ -116,6 +127,8 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
         public InetSocketAddress clientAddr = null;
         
         public long id;
+
+        public String hostname;
         
         public LearnerType type = LearnerType.PARTICIPANT;
         
@@ -131,6 +144,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
             this(id, addr, electionAddr, (InetSocketAddress)null, LearnerType.PARTICIPANT);
         }
 
+        // VisibleForTesting
         public QuorumServer(long id, InetSocketAddress addr) {
             this(id, addr, (InetSocketAddress)null, (InetSocketAddress)null, LearnerType.PARTICIPANT);
         }
@@ -217,7 +231,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
                 }
 
                 // is client_config a host:port or just a port
-                String hostname = (clientParts.length == 2) ? clientParts[0] : "0.0.0.0";
+                hostname = (clientParts.length == 2) ? clientParts[0] : "0.0.0.0";
                 try {
                     clientAddr = new InetSocketAddress(hostname,
                             Integer.parseInt(clientParts[clientParts.length - 1]));
@@ -245,6 +259,8 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
                 setType(serverParts[3]);
             }
 
+            this.hostname = serverParts[0];
+            
             setMyAddrs();
         }
 
@@ -529,6 +545,50 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
     private long electionTimeTaken = -1;
 
     /**
+     * Enable/Disables quorum authentication using sasl. Defaulting to false.
+     */
+    protected boolean quorumSaslEnableAuth;
+
+    /**
+     * If this is false, quorum peer server will accept another quorum peer client
+     * connection even if the authentication did not succeed. This can be used while
+     * upgrading ZooKeeper server. Defaulting to false (required).
+     */
+    protected boolean quorumServerSaslAuthRequired;
+
+    /**
+     * If this is false, quorum peer learner will talk to quorum peer server
+     * without authentication. This can be used while upgrading ZooKeeper
+     * server. Defaulting to false (required).
+     */
+    protected boolean quorumLearnerSaslAuthRequired;
+
+    /**
+     * Kerberos quorum service principal. Defaulting to 'zkquorum/localhost'.
+     */
+    protected String quorumServicePrincipal;
+
+    /**
+     * Quorum learner login context name in jaas-conf file to read the kerberos
+     * security details. Defaulting to 'QuorumLearner'.
+     */
+    protected String quorumLearnerLoginContext;
+
+    /**
+     * Quorum server login context name in jaas-conf file to read the kerberos
+     * security details. Defaulting to 'QuorumServer'.
+     */
+    protected String quorumServerLoginContext;
+
+    // TODO: need to tune the default value of thread size
+    private static final int QUORUM_CNXN_THREADS_SIZE_DEFAULT_VALUE = 20;
+    /**
+     * The maximum number of threads to allow in the connectionExecutors thread
+     * pool which will be used to initiate quorum server connections.
+     */
+    protected int quorumCnxnThreadsSize = QUORUM_CNXN_THREADS_SIZE_DEFAULT_VALUE;
+
+    /**
      * @deprecated As of release 3.4.0, this class has been deprecated, since
      * it is used with one of the udp-based versions of leader election, which
      * we are also deprecating.
@@ -717,14 +777,18 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
 
     AdminServer adminServer;
 
-    public QuorumPeer() {
+    public static QuorumPeer testingQuorumPeer() throws SaslException {
+        return new QuorumPeer();
+    }
+
+    public QuorumPeer() throws SaslException {
         super("QuorumPeer");
         quorumStats = new QuorumStats(this);
         jmxRemotePeerBean = new HashMap<Long, RemotePeerBean>();
         adminServer = AdminServerFactory.createAdminServer();
+        initialize();
     }
 
-
     /**
      * For backward compatibility purposes, we instantiate QuorumMaj by default.
      */
@@ -759,6 +823,23 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
         adminServer = AdminServerFactory.createAdminServer();
     }
 
+    public void initialize() throws SaslException {
+        // init quorum auth server & learner
+        if (isQuorumSaslAuthEnabled()) {
+            Set<String> authzHosts = new HashSet<String>();
+            for (QuorumServer qs : getView().values()) {
+                authzHosts.add(qs.hostname);
+            }
+            authServer = new SaslQuorumAuthServer(isQuorumServerSaslAuthRequired(),
+                    quorumServerLoginContext, authzHosts);
+            authLearner = new SaslQuorumAuthLearner(isQuorumLearnerSaslAuthRequired(),
+                    quorumServicePrincipal, quorumLearnerLoginContext);
+        } else {
+            authServer = new NullQuorumAuthServer();
+            authLearner = new NullQuorumAuthLearner();
+        }
+    }
+
     QuorumStats quorumStats() {
         return quorumStats;
     }
@@ -939,26 +1020,26 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
 
         //TODO: use a factory rather than a switch
         switch (electionAlgorithm) {
-            case 1:
-                le = new AuthFastLeaderElection(this);
-                break;
-            case 2:
-                le = new AuthFastLeaderElection(this, true);
-                break;
-            case 3:
-                qcm = new QuorumCnxManager(this);
-                QuorumCnxManager.Listener listener = qcm.listener;
-                if(listener != null){
-                    listener.start();
-                    FastLeaderElection fle = new FastLeaderElection(this, qcm);
-                    fle.start();
-                    le = fle;
-                } else {
-                    LOG.error("Null listener when initializing cnx manager");
-                }
-                break;
-            default:
-                assert false;
+        case 1:
+            le = new AuthFastLeaderElection(this);
+            break;
+        case 2:
+            le = new AuthFastLeaderElection(this, true);
+            break;
+        case 3:
+            qcm = createCnxnManager();
+            QuorumCnxManager.Listener listener = qcm.listener;
+            if(listener != null){
+                listener.start();
+                FastLeaderElection fle = new FastLeaderElection(this, qcm);
+                fle.start();
+                le = fle;
+            } else {
+                LOG.error("Null listener when initializing cnx manager");
+            }
+            break;
+        default:
+            assert false;
         }
         return le;
     }
@@ -1921,4 +2002,75 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
     long getElectionTimeTaken() {
         return electionTimeTaken;
     }
+
+    void setQuorumServerSaslRequired(boolean serverSaslRequired) {
+        quorumServerSaslAuthRequired = serverSaslRequired;
+        LOG.info("{} set to {}", QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED,
+                serverSaslRequired);
+    }
+
+    void setQuorumLearnerSaslRequired(boolean learnerSaslRequired) {
+        quorumLearnerSaslAuthRequired = learnerSaslRequired;
+        LOG.info("{} set to {}", QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED,
+                learnerSaslRequired);
+    }
+
+    void setQuorumSaslEnabled(boolean enableAuth) {
+        quorumSaslEnableAuth = enableAuth;
+        if (!quorumSaslEnableAuth) {
+            LOG.info("QuorumPeer communication is not secured!");
+        } else {
+            LOG.info("{} set to {}",
+                    QuorumAuth.QUORUM_SASL_AUTH_ENABLED, enableAuth);
+        }
+    }
+
+    void setQuorumServicePrincipal(String servicePrincipal) {
+        quorumServicePrincipal = servicePrincipal;
+        LOG.info("{} set to {}", QuorumAuth.QUORUM_KERBEROS_SERVICE_PRINCIPAL,
+                quorumServicePrincipal);
+    }
+
+    void setQuorumLearnerLoginContext(String learnerContext) {
+        quorumLearnerLoginContext = learnerContext;
+        LOG.info("{} set to {}", QuorumAuth.QUORUM_LEARNER_SASL_LOGIN_CONTEXT,
+                quorumLearnerLoginContext);
+    }
+
+    void setQuorumServerLoginContext(String serverContext) {
+        quorumServerLoginContext = serverContext;
+        LOG.info("{} set to {}", QuorumAuth.QUORUM_SERVER_SASL_LOGIN_CONTEXT,
+                quorumServerLoginContext);
+    }
+
+    void setQuorumCnxnThreadsSize(int qCnxnThreadsSize) {
+        if (qCnxnThreadsSize > QUORUM_CNXN_THREADS_SIZE_DEFAULT_VALUE) {
+            quorumCnxnThreadsSize = qCnxnThreadsSize;
+        }
+        LOG.info("quorum.cnxn.threads.size set to {}", quorumCnxnThreadsSize);
+    }
+
+    boolean isQuorumSaslAuthEnabled() {
+        return quorumSaslEnableAuth;
+    }
+
+    private boolean isQuorumServerSaslAuthRequired() {
+        return quorumServerSaslAuthRequired;
+    }
+
+    private boolean isQuorumLearnerSaslAuthRequired() {
+        return quorumLearnerSaslAuthRequired;
+    }
+
+    public QuorumCnxManager createCnxnManager() {
+        return new QuorumCnxManager(this,
+                this.getId(),
+                this.getView(),
+                this.authServer,
+                this.authLearner,
+                this.tickTime * this.syncLimit,
+                this.getQuorumListenOnAllIPs(),
+                this.quorumCnxnThreadsSize,
+                this.isQuorumSaslAuthEnabled());
+    }
 }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
index 7afbedd..7159139 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
@@ -49,6 +49,7 @@ import org.apache.zookeeper.common.PathUtils;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.auth.QuorumAuth;
 import org.apache.zookeeper.server.quorum.flexible.QuorumHierarchical;
 import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
@@ -94,6 +95,17 @@ public class QuorumPeerConfig {
     protected LearnerType peerType = LearnerType.PARTICIPANT;
 
     /**
+     * Configurations for the quorumpeer-to-quorumpeer sasl authentication
+     */
+    protected boolean quorumServerRequireSasl = false;
+    protected boolean quorumLearnerRequireSasl = false;
+    protected boolean quorumEnableSasl = false;
+    protected String quorumServicePrincipal = QuorumAuth.QUORUM_KERBEROS_SERVICE_PRINCIPAL_DEFAULT_VALUE;
+    protected String quorumLearnerLoginContext = QuorumAuth.QUORUM_LEARNER_SASL_LOGIN_CONTEXT_DFAULT_VALUE;
+    protected String quorumServerLoginContext = QuorumAuth.QUORUM_SERVER_SASL_LOGIN_CONTEXT_DFAULT_VALUE;
+    protected int quorumCnxnThreadsSize;
+
+    /**
      * Minimum snapshot retain count.
      * @see org.apache.zookeeper.server.PurgeTxnLog#purge(File, File, int)
      */
@@ -296,11 +308,47 @@ public class QuorumPeerConfig {
                 }
             } else if ((key.startsWith("server.") || key.startsWith("group") || key.startsWith("weight")) && zkProp.containsKey("dynamicConfigFile")) {
                 throw new ConfigException("parameter: " + key + " must be in a separate dynamic config file");
+            } else if (key.equals(QuorumAuth.QUORUM_SASL_AUTH_ENABLED)) {
+                quorumEnableSasl = Boolean.parseBoolean(value);
+            } else if (key.equals(QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED)) {
+                quorumServerRequireSasl = Boolean.parseBoolean(value);
+            } else if (key.equals(QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED)) {
+                quorumLearnerRequireSasl = Boolean.parseBoolean(value);
+            } else if (key.equals(QuorumAuth.QUORUM_LEARNER_SASL_LOGIN_CONTEXT)) {
+                quorumLearnerLoginContext = value;
+            } else if (key.equals(QuorumAuth.QUORUM_SERVER_SASL_LOGIN_CONTEXT)) {
+                quorumServerLoginContext = value;
+            } else if (key.equals(QuorumAuth.QUORUM_KERBEROS_SERVICE_PRINCIPAL)) {
+                quorumServicePrincipal = value;
+            } else if (key.equals("quorum.cnxn.threads.size")) {
+                quorumCnxnThreadsSize = Integer.parseInt(value);
             } else {
                 System.setProperty("zookeeper." + key, value);
             }
         }
 
+        if (!quorumEnableSasl && quorumServerRequireSasl) {
+            throw new IllegalArgumentException(
+                    QuorumAuth.QUORUM_SASL_AUTH_ENABLED
+                            + " is disabled, so cannot enable "
+                            + QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED);
+        }
+        if (!quorumEnableSasl && quorumLearnerRequireSasl) {
+            throw new IllegalArgumentException(
+                    QuorumAuth.QUORUM_SASL_AUTH_ENABLED
+                            + " is disabled, so cannot enable "
+                            + QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED);
+        }
+        // If quorumpeer learner is not auth enabled then self won't be able to
+        // join quorum. So this condition is ensuring that the quorumpeer learner
+        // is also auth enabled while enabling quorum server require sasl.
+        if (!quorumLearnerRequireSasl && quorumServerRequireSasl) {
+            throw new IllegalArgumentException(
+                    QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED
+                            + " is disabled, so cannot enable "
+                            + QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED);
+        }
+
         // Reset to MIN_SNAP_RETAIN_COUNT if invalid (less than 3)
         // PurgeTxnLog.purge(File, File, int) will not allow to purge less
         // than 3.


[2/3] zookeeper git commit: ZOOKEEPER-2935: [QP MutualAuth]: Port ZOOKEEPER-1045 implementation from branch-3.5 to trunk

Posted by ph...@apache.org.
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
index e1a5806..3da6e20 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
@@ -20,6 +20,7 @@ package org.apache.zookeeper.server.quorum;
 import java.io.IOException;
 
 import javax.management.JMException;
+import javax.security.sasl.SaslException;
 
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -183,6 +184,18 @@ public class QuorumPeerMain {
           quorumPeer.setLearnerType(config.getPeerType());
           quorumPeer.setSyncEnabled(config.getSyncEnabled());
           quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
+
+          // sets quorum sasl authentication configurations
+          quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
+          if(quorumPeer.isQuorumSaslAuthEnabled()){
+              quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
+              quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
+              quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
+              quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
+              quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
+          }
+          quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
+          quorumPeer.initialize();
           
           quorumPeer.start();
           quorumPeer.join();
@@ -193,7 +206,7 @@ public class QuorumPeerMain {
     }
 
     // @VisibleForTesting
-    protected QuorumPeer getQuorumPeer() {
+    protected QuorumPeer getQuorumPeer() throws SaslException {
         return new QuorumPeer();
     }
 }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/main/org/apache/zookeeper/server/quorum/auth/NullQuorumAuthLearner.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/auth/NullQuorumAuthLearner.java b/src/java/main/org/apache/zookeeper/server/quorum/auth/NullQuorumAuthLearner.java
new file mode 100644
index 0000000..0af891c
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/quorum/auth/NullQuorumAuthLearner.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum.auth;
+
+import java.net.Socket;
+
+/**
+ * This class represents no authentication learner, it just return
+ * without performing any authentication.
+ */
+public class NullQuorumAuthLearner implements QuorumAuthLearner {
+
+    @Override
+    public void authenticate(Socket sock, String hostname) {
+        return; // simply return don't require auth
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/main/org/apache/zookeeper/server/quorum/auth/NullQuorumAuthServer.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/auth/NullQuorumAuthServer.java b/src/java/main/org/apache/zookeeper/server/quorum/auth/NullQuorumAuthServer.java
new file mode 100644
index 0000000..b26a54a
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/quorum/auth/NullQuorumAuthServer.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum.auth;
+
+import java.io.DataInputStream;
+import java.net.Socket;
+
+/**
+ * This class represents no authentication server, it just return
+ * without performing any authentication.
+ */
+public class NullQuorumAuthServer implements QuorumAuthServer {
+
+    @Override
+    public void authenticate(final Socket sock, final DataInputStream din) {
+        return; // simply return don't require auth
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/main/org/apache/zookeeper/server/quorum/auth/QuorumAuth.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/auth/QuorumAuth.java b/src/java/main/org/apache/zookeeper/server/quorum/auth/QuorumAuth.java
new file mode 100644
index 0000000..8bfa394
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/quorum/auth/QuorumAuth.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum.auth;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import org.apache.jute.BinaryInputArchive;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.zookeeper.server.quorum.QuorumAuthPacket;
+
+public class QuorumAuth {
+    private static final Logger LOG = LoggerFactory.getLogger(QuorumAuth.class);
+
+    public static final String QUORUM_SASL_AUTH_ENABLED = "quorum.auth.enableSasl";
+    public static final String QUORUM_SERVER_SASL_AUTH_REQUIRED = "quorum.auth.serverRequireSasl";
+    public static final String QUORUM_LEARNER_SASL_AUTH_REQUIRED = "quorum.auth.learnerRequireSasl";
+
+    public static final String QUORUM_KERBEROS_SERVICE_PRINCIPAL = "quorum.auth.kerberos.servicePrincipal";
+    public static final String QUORUM_KERBEROS_SERVICE_PRINCIPAL_DEFAULT_VALUE = "zkquorum/localhost";
+
+    public static final String QUORUM_LEARNER_SASL_LOGIN_CONTEXT = "quorum.auth.learner.saslLoginContext";
+    public static final String QUORUM_LEARNER_SASL_LOGIN_CONTEXT_DFAULT_VALUE = "QuorumLearner";
+
+    public static final String QUORUM_SERVER_SASL_LOGIN_CONTEXT = "quorum.auth.server.saslLoginContext";
+    public static final String QUORUM_SERVER_SASL_LOGIN_CONTEXT_DFAULT_VALUE = "QuorumServer";
+
+    static final String QUORUM_SERVER_PROTOCOL_NAME = "zookeeper-quorum";
+    static final String QUORUM_SERVER_SASL_DIGEST = "zk-quorum-sasl-md5";
+    static final String QUORUM_AUTH_MESSAGE_TAG = "qpconnect";
+
+    // this is negative, so that if a learner that does auth, connects to a
+    // server, it'll think the received packet is an authentication packet
+    public static final long QUORUM_AUTH_MAGIC_NUMBER = -0xa0dbcafecafe1234L;
+
+    public enum Status {
+         IN_PROGRESS(0), SUCCESS(1), ERROR(-1);
+        private int status;
+
+        Status(int status) {
+            this.status = status;
+        }
+
+        static Status getStatus(int status) {
+            switch (status) {
+            case 0:
+                return IN_PROGRESS;
+            case 1:
+                return SUCCESS;
+            case -1:
+                return ERROR;
+            default:
+                LOG.error("Unknown status:{}!", status);
+                assert false : "Unknown status!";
+                return ERROR;
+            }
+        }
+
+        int status() {
+            return status;
+        }
+    }
+
+    public static QuorumAuthPacket createPacket(Status status, byte[] response) {
+        return new QuorumAuthPacket(QUORUM_AUTH_MAGIC_NUMBER,
+                                    status.status(), response);
+    }
+
+    public static boolean nextPacketIsAuth(DataInputStream din)
+            throws IOException {
+        din.mark(32);
+        BinaryInputArchive bia = new BinaryInputArchive(din);
+        boolean firstIsAuth = (bia.readLong("NO_TAG")
+                               == QuorumAuth.QUORUM_AUTH_MAGIC_NUMBER);
+        din.reset();
+        return firstIsAuth;
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/main/org/apache/zookeeper/server/quorum/auth/QuorumAuthLearner.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/auth/QuorumAuthLearner.java b/src/java/main/org/apache/zookeeper/server/quorum/auth/QuorumAuthLearner.java
new file mode 100644
index 0000000..af71257
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/quorum/auth/QuorumAuthLearner.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum.auth;
+
+import java.io.IOException;
+import java.net.Socket;
+
+/**
+ * Interface for quorum learner authentication mechanisms.
+ */
+public interface QuorumAuthLearner {
+
+    /**
+     * Performs an authentication step for the given socket connection.
+     *
+     * @param sock
+     *            socket connection to other quorum peer server
+     * @param hostname
+     *            host name of other quorum peer server
+     * @throws IOException
+     *             if there is an authentication failure
+     */
+    public void authenticate(Socket sock, String hostname) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/main/org/apache/zookeeper/server/quorum/auth/QuorumAuthServer.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/auth/QuorumAuthServer.java b/src/java/main/org/apache/zookeeper/server/quorum/auth/QuorumAuthServer.java
new file mode 100644
index 0000000..e9de8f0
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/quorum/auth/QuorumAuthServer.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum.auth;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.Socket;
+
+/**
+ * Interface for quorum server authentication mechanisms.
+ */
+public interface QuorumAuthServer {
+
+    /**
+     * Performs an authentication step for the given socket connection.
+     *
+     * @param sock
+     *            socket connection to other quorum peer
+     * @param din
+     *            stream used to read auth data send by the quorum learner
+     * @throws IOException if the server fails to authenticate connecting quorum learner
+     */
+    public void authenticate(Socket sock, DataInputStream din)
+            throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/main/org/apache/zookeeper/server/quorum/auth/SaslQuorumAuthLearner.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/auth/SaslQuorumAuthLearner.java b/src/java/main/org/apache/zookeeper/server/quorum/auth/SaslQuorumAuthLearner.java
new file mode 100644
index 0000000..31f4f55
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/quorum/auth/SaslQuorumAuthLearner.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum.auth;
+
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+
+import javax.security.auth.Subject;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginException;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.zookeeper.Login;
+import org.apache.zookeeper.SaslClientCallbackHandler;
+import org.apache.zookeeper.common.ZKConfig;
+import org.apache.zookeeper.server.quorum.QuorumAuthPacket;
+import org.apache.zookeeper.util.SecurityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SaslQuorumAuthLearner implements QuorumAuthLearner {
+    private static final Logger LOG = LoggerFactory
+            .getLogger(SaslQuorumAuthLearner.class);
+
+    private final Login learnerLogin;
+    private final boolean quorumRequireSasl;
+    private final String quorumServicePrincipal;
+
+    public SaslQuorumAuthLearner(boolean quorumRequireSasl,
+            String quorumServicePrincipal, String loginContext)
+                    throws SaslException {
+        this.quorumRequireSasl = quorumRequireSasl;
+        this.quorumServicePrincipal = quorumServicePrincipal;
+        try {
+            AppConfigurationEntry entries[] = Configuration
+                .getConfiguration()
+                .getAppConfigurationEntry(loginContext);
+            if (entries == null || entries.length == 0) {
+                throw new LoginException("SASL-authentication failed because"
+                                         + " the specified JAAS configuration "
+                                         + "section '" + loginContext
+                                         + "' could not be found.");
+            }
+            this.learnerLogin = new Login(loginContext,
+                                    new SaslClientCallbackHandler(null, "QuorumLearner"), new ZKConfig());
+            this.learnerLogin.startThreadIfNeeded();
+        } catch (LoginException e) {
+            throw new SaslException("Failed to initialize authentication mechanism using SASL", e);
+        }
+    }
+
+    @Override
+    public void authenticate(Socket sock, String hostName) throws IOException {
+        if (!quorumRequireSasl) { // let it through, we don't require auth
+            LOG.info("Skipping SASL authentication as {}={}",
+                    QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED,
+                    quorumRequireSasl);
+            return;
+        }
+        SaslClient sc = null;
+        String principalConfig = SecurityUtils
+                .getServerPrincipal(quorumServicePrincipal, hostName);
+        try {
+            DataOutputStream dout = new DataOutputStream(
+                    sock.getOutputStream());
+            DataInputStream din = new DataInputStream(sock.getInputStream());
+            byte[] responseToken = new byte[0];
+            sc = SecurityUtils.createSaslClient(learnerLogin.getSubject(),
+                    principalConfig,
+                    QuorumAuth.QUORUM_SERVER_PROTOCOL_NAME,
+                    QuorumAuth.QUORUM_SERVER_SASL_DIGEST, LOG, "QuorumLearner");
+
+            if (sc.hasInitialResponse()) {
+                responseToken = createSaslToken(new byte[0], sc, learnerLogin);
+            }
+            send(dout, responseToken);
+            QuorumAuthPacket authPacket = receive(din);
+            QuorumAuth.Status qpStatus = QuorumAuth.Status
+                    .getStatus(authPacket.getStatus());
+            while (!sc.isComplete()) {
+                switch (qpStatus) {
+                case SUCCESS:
+                    responseToken = createSaslToken(authPacket.getToken(), sc,
+                            learnerLogin);
+                    // we're done; don't expect to send another BIND
+                    if (responseToken != null) {
+                        throw new SaslException("Protocol error: attempting to send response after completion");
+                    }
+                    break;
+                case IN_PROGRESS:
+                    responseToken = createSaslToken(authPacket.getToken(), sc,
+                            learnerLogin);
+                    send(dout, responseToken);
+                    authPacket = receive(din);
+                    qpStatus = QuorumAuth.Status
+                            .getStatus(authPacket.getStatus());
+                    break;
+                case ERROR:
+                    throw new SaslException(
+                            "Authentication failed against server addr: "
+                                    + sock.getRemoteSocketAddress());
+                default:
+                    LOG.warn("Unknown status:{}!", qpStatus);
+                    throw new SaslException(
+                            "Authentication failed against server addr: "
+                                    + sock.getRemoteSocketAddress());
+                }
+            }
+
+            // Validate status code at the end of authentication exchange.
+            checkAuthStatus(sock, qpStatus);
+        } finally {
+            if (sc != null) {
+                try {
+                    sc.dispose();
+                } catch (SaslException e) {
+                    LOG.error("SaslClient dispose() failed", e);
+                }
+            }
+        }
+        return;
+    }
+
+    private void checkAuthStatus(Socket sock, QuorumAuth.Status qpStatus)
+            throws SaslException {
+        if (qpStatus == QuorumAuth.Status.SUCCESS) {
+            LOG.info("Successfully completed the authentication using SASL. server addr: {}, status: {}",
+                    sock.getRemoteSocketAddress(), qpStatus);
+        } else {
+            throw new SaslException("Authentication failed against server addr: "
+                            + sock.getRemoteSocketAddress() + ", qpStatus: "
+                            + qpStatus);
+        }
+    }
+
+    private QuorumAuthPacket receive(DataInputStream din) throws IOException {
+        QuorumAuthPacket authPacket = new QuorumAuthPacket();
+        BinaryInputArchive bia = BinaryInputArchive.getArchive(din);
+        authPacket.deserialize(bia, QuorumAuth.QUORUM_AUTH_MESSAGE_TAG);
+        return authPacket;
+    }
+
+    private void send(DataOutputStream dout, byte[] response)
+            throws IOException {
+        QuorumAuthPacket authPacket;
+        BufferedOutputStream bufferedOutput = new BufferedOutputStream(dout);
+        BinaryOutputArchive boa = BinaryOutputArchive
+                .getArchive(bufferedOutput);
+        authPacket = QuorumAuth.createPacket(
+                QuorumAuth.Status.IN_PROGRESS, response);
+        boa.writeRecord(authPacket, QuorumAuth.QUORUM_AUTH_MESSAGE_TAG);
+        bufferedOutput.flush();
+    }
+
+    // TODO: need to consolidate the #createSaslToken() implementation between ZooKeeperSaslClient#createSaslToken().
+    private byte[] createSaslToken(final byte[] saslToken,
+            final SaslClient saslClient, final Login login)
+                    throws SaslException {
+        if (saslToken == null) {
+            throw new SaslException(
+                    "Error in authenticating with a Zookeeper Quorum member: the quorum member's saslToken is null.");
+        }
+        if (login.getSubject() != null) {
+            synchronized (login) {
+                try {
+                    final byte[] retval = Subject.doAs(login.getSubject(),
+                            new PrivilegedExceptionAction<byte[]>() {
+                                public byte[] run() throws SaslException {
+                                    LOG.debug("saslClient.evaluateChallenge(len="
+                                                    + saslToken.length + ")");
+                                    return saslClient.evaluateChallenge(saslToken);
+                                }
+                            });
+                    return retval;
+                } catch (PrivilegedActionException e) {
+                    String error = "An error: (" + e
+                            + ") occurred when evaluating Zookeeper Quorum Member's "
+                            + " received SASL token.";
+                    // Try to provide hints to use about what went wrong so they
+                    // can fix their configuration.
+                    // TODO: introspect about e: look for GSS information.
+                    final String UNKNOWN_SERVER_ERROR_TEXT = "(Mechanism level: Server not found in Kerberos database (7) - UNKNOWN_SERVER)";
+                    if (e.toString().indexOf(UNKNOWN_SERVER_ERROR_TEXT) > -1) {
+                        error += " This may be caused by Java's being unable to resolve the Zookeeper Quorum Member's"
+                                + " hostname correctly. You may want to try to adding"
+                                + " '-Dsun.net.spi.nameservice.provider.1=dns,sun' to your server's JVMFLAGS environment.";
+                    }
+                    LOG.error(error);
+                    throw new SaslException(error);
+                }
+            }
+        } else {
+            throw new SaslException(
+                    "Cannot make SASL token without subject defined. "
+                            + "For diagnosis, please look for WARNs and ERRORs in your log related to the Login class.");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/main/org/apache/zookeeper/server/quorum/auth/SaslQuorumAuthServer.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/auth/SaslQuorumAuthServer.java b/src/java/main/org/apache/zookeeper/server/quorum/auth/SaslQuorumAuthServer.java
new file mode 100644
index 0000000..fc5e3b6
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/quorum/auth/SaslQuorumAuthServer.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum.auth;
+
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.util.Set;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginException;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.zookeeper.Login;
+import org.apache.zookeeper.common.ZKConfig;
+import org.apache.zookeeper.server.quorum.QuorumAuthPacket;
+import org.apache.zookeeper.util.SecurityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SaslQuorumAuthServer implements QuorumAuthServer {
+
+    private static final Logger LOG = LoggerFactory
+            .getLogger(SaslQuorumAuthServer.class);
+
+    private final static int MAX_RETRIES = 5;
+    private final Login serverLogin;
+    private final boolean quorumRequireSasl;
+
+    public SaslQuorumAuthServer(boolean quorumRequireSasl, String loginContext, Set<String> authzHosts)
+            throws SaslException {
+        this.quorumRequireSasl = quorumRequireSasl;
+        try {
+            AppConfigurationEntry entries[] = Configuration.getConfiguration()
+                    .getAppConfigurationEntry(loginContext);
+            if (entries == null || entries.length == 0) {
+                throw new LoginException("SASL-authentication failed"
+                        + " because the specified JAAS configuration "
+                        + "section '" + loginContext + "' could not be found.");
+            }
+            SaslQuorumServerCallbackHandler saslServerCallbackHandler = new SaslQuorumServerCallbackHandler(
+                    Configuration.getConfiguration(), loginContext, authzHosts);
+            serverLogin = new Login(loginContext, saslServerCallbackHandler, new ZKConfig());
+            serverLogin.startThreadIfNeeded();
+        } catch (Throwable e) {
+            throw new SaslException(
+                    "Failed to initialize authentication mechanism using SASL",
+                    e);
+        }
+    }
+
+    @Override
+    public void authenticate(Socket sock, DataInputStream din)
+            throws SaslException {
+        DataOutputStream dout = null;
+        SaslServer ss = null;
+        try {
+            if (!QuorumAuth.nextPacketIsAuth(din)) {
+                if (quorumRequireSasl) {
+                    throw new SaslException("Learner not trying to authenticate"
+                                            + " and authentication is required");
+                } else {
+                    // let it through, we don't require auth
+                    return;
+                }
+            }
+
+            byte[] token = receive(din);
+            int tries = 0;
+            dout = new DataOutputStream(sock.getOutputStream());
+            byte[] challenge = null;
+            ss = SecurityUtils.createSaslServer(serverLogin.getSubject(),
+                    QuorumAuth.QUORUM_SERVER_PROTOCOL_NAME,
+                    QuorumAuth.QUORUM_SERVER_SASL_DIGEST, serverLogin.callbackHandler,
+                    LOG);
+            while (!ss.isComplete()) {
+                challenge = ss.evaluateResponse(token);
+                if (!ss.isComplete()) {
+                    // limited number of retries.
+                    if (++tries > MAX_RETRIES) {
+                        send(dout, challenge, QuorumAuth.Status.ERROR);
+                        LOG.warn("Failed to authenticate using SASL, server addr: {}, retries={} exceeded.",
+                                sock.getRemoteSocketAddress(), tries);
+                        break;
+                    }
+                    send(dout, challenge, QuorumAuth.Status.IN_PROGRESS);
+                    token = receive(din);
+                }
+            }
+            // Authentication exchange has completed
+            if (ss.isComplete()) {
+                send(dout, challenge, QuorumAuth.Status.SUCCESS);
+                LOG.info("Successfully completed the authentication using SASL. learner addr: {}",
+                        sock.getRemoteSocketAddress());
+            }
+        } catch (Exception e) {
+            try {
+                if (dout != null) {
+                    // send error message to the learner
+                    send(dout, new byte[0], QuorumAuth.Status.ERROR);
+                }
+            } catch (IOException ioe) {
+                LOG.warn("Exception while sending failed status", ioe);
+            }
+            // If sasl is not required, when a server initializes a
+            // connection it will try to log in, but it will also
+            // accept connections that do not start with a sasl
+            // handshake.
+            if (quorumRequireSasl) {
+                LOG.error("Failed to authenticate using SASL", e);
+                throw new SaslException(
+                        "Failed to authenticate using SASL: " + e.getMessage());
+            } else {
+                LOG.warn("Failed to authenticate using SASL", e);
+                LOG.warn("Maintaining learner connection despite SASL authentication failure."
+                                + " server addr: {}, {}: {}",
+                        new Object[] { sock.getRemoteSocketAddress(),
+                                QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED,
+                                quorumRequireSasl });
+                return; // let it through, we don't require auth
+            }
+        } finally {
+            if (ss != null) {
+                try {
+                    ss.dispose();
+                } catch (SaslException e) {
+                    LOG.error("SaslServer dispose() failed", e);
+                }
+            }
+        }
+        return;
+    }
+
+    private byte[] receive(DataInputStream din) throws IOException {
+        QuorumAuthPacket authPacket = new QuorumAuthPacket();
+        BinaryInputArchive bia = BinaryInputArchive.getArchive(din);
+        authPacket.deserialize(bia, QuorumAuth.QUORUM_AUTH_MESSAGE_TAG);
+        return authPacket.getToken();
+    }
+
+    private void send(DataOutputStream dout, byte[] challenge,
+            QuorumAuth.Status s) throws IOException {
+        BufferedOutputStream bufferedOutput = new BufferedOutputStream(dout);
+        BinaryOutputArchive boa = BinaryOutputArchive
+                .getArchive(bufferedOutput);
+        QuorumAuthPacket authPacket;
+        if (challenge == null && s != QuorumAuth.Status.SUCCESS) {
+            authPacket = QuorumAuth.createPacket(
+                    QuorumAuth.Status.IN_PROGRESS, null);
+        } else {
+            authPacket = QuorumAuth.createPacket(s, challenge);
+        }
+
+        boa.writeRecord(authPacket, QuorumAuth.QUORUM_AUTH_MESSAGE_TAG);
+        bufferedOutput.flush();
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/main/org/apache/zookeeper/server/quorum/auth/SaslQuorumServerCallbackHandler.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/auth/SaslQuorumServerCallbackHandler.java b/src/java/main/org/apache/zookeeper/server/quorum/auth/SaslQuorumServerCallbackHandler.java
new file mode 100644
index 0000000..3e71bb1
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/quorum/auth/SaslQuorumServerCallbackHandler.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.quorum.auth;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is used by the SASL mechanisms to get further information to complete
+ * the authentication. For example, a SASL mechanism might use this callback
+ * handler to do verification operation. This is used by the QuorumServer to
+ * perform the mutual quorum peer authentication.
+ */
+public class SaslQuorumServerCallbackHandler implements CallbackHandler {
+    private static final String USER_PREFIX = "user_";
+    private static final Logger LOG = LoggerFactory.getLogger(SaslQuorumServerCallbackHandler.class);
+
+    private String userName;
+    private final Map<String,String> credentials = new HashMap<String,String>();
+    private final Set<String> authzHosts;
+
+    public SaslQuorumServerCallbackHandler(Configuration configuration,
+            String serverSection, Set<String> authzHosts) throws IOException {
+        AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(serverSection);
+
+        if (configurationEntries == null) {
+            String errorMessage = "Could not find a '" + serverSection + "' entry in this configuration: Server cannot start.";
+            LOG.error(errorMessage);
+            throw new IOException(errorMessage);
+        }
+        credentials.clear();
+        for(AppConfigurationEntry entry: configurationEntries) {
+            Map<String,?> options = entry.getOptions();
+            // Populate DIGEST-MD5 user -> password map with JAAS configuration entries from the "QuorumServer" section.
+            // Usernames are distinguished from other options by prefixing the username with a "user_" prefix.
+            for(Map.Entry<String, ?> pair : options.entrySet()) {
+                String key = pair.getKey();
+                if (key.startsWith(USER_PREFIX)) {
+                    String userName = key.substring(USER_PREFIX.length());
+                    credentials.put(userName,(String)pair.getValue());
+                }
+            }
+        }
+
+        // authorized host lists
+        this.authzHosts = authzHosts;
+    }
+
+    public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
+        for (Callback callback : callbacks) {
+            if (callback instanceof NameCallback) {
+                handleNameCallback((NameCallback) callback);
+            } else if (callback instanceof PasswordCallback) {
+                handlePasswordCallback((PasswordCallback) callback);
+            } else if (callback instanceof RealmCallback) {
+                handleRealmCallback((RealmCallback) callback);
+            } else if (callback instanceof AuthorizeCallback) {
+                handleAuthorizeCallback((AuthorizeCallback) callback);
+            }
+        }
+    }
+
+    private void handleNameCallback(NameCallback nc) {
+        // check to see if this user is in the user password database.
+        if (credentials.get(nc.getDefaultName()) == null) {
+            LOG.warn("User '{}' not found in list of DIGEST-MD5 authenticateable users.",
+                    nc.getDefaultName());
+            return;
+        }
+        nc.setName(nc.getDefaultName());
+        userName = nc.getDefaultName();
+    }
+
+    private void handlePasswordCallback(PasswordCallback pc) {
+        if (credentials.containsKey(userName) ) {
+            pc.setPassword(credentials.get(userName).toCharArray());
+        } else {
+            LOG.warn("No password found for user: {}", userName);
+        }
+    }
+
+    private void handleRealmCallback(RealmCallback rc) {
+        LOG.debug("QuorumLearner supplied realm: {}", rc.getDefaultText());
+        rc.setText(rc.getDefaultText());
+    }
+
+    private void handleAuthorizeCallback(AuthorizeCallback ac) {
+        String authenticationID = ac.getAuthenticationID();
+        String authorizationID = ac.getAuthorizationID();
+
+        boolean authzFlag = false;
+        // 1. Matches authenticationID and authorizationID
+        authzFlag = authenticationID.equals(authorizationID);
+
+        // 2. Verify whether the connecting host is present in authorized hosts.
+        // If not exists, then connecting peer is not authorized to join the
+        // ensemble and will reject it.
+        if (authzFlag) {
+            String[] components = authorizationID.split("[/@]");
+            if (components.length == 3) {
+                authzFlag = authzHosts.contains(components[1]);
+            }
+            if (!authzFlag) {
+                LOG.error("SASL authorization completed, {} is not authorized to connect",
+                        components[1]);
+            }
+        }
+
+        // Sets authorization flag
+        ac.setAuthorized(authzFlag);
+        if (ac.isAuthorized()) {
+            ac.setAuthorizedID(authorizationID);
+            LOG.info("Successfully authenticated learner: authenticationID={};  authorizationID={}.",
+                    authenticationID, authorizationID);
+        }
+        LOG.debug("SASL authorization completed, authorized flag set to {}", ac.isAuthorized());
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/main/org/apache/zookeeper/util/SecurityUtils.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/util/SecurityUtils.java b/src/java/main/org/apache/zookeeper/util/SecurityUtils.java
new file mode 100644
index 0000000..67484e4
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/util/SecurityUtils.java
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.util;
+
+import java.security.Principal;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+
+import javax.security.auth.Subject;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+import org.apache.zookeeper.SaslClientCallbackHandler;
+import org.apache.zookeeper.server.auth.KerberosName;
+import org.ietf.jgss.GSSContext;
+import org.ietf.jgss.GSSCredential;
+import org.ietf.jgss.GSSException;
+import org.ietf.jgss.GSSManager;
+import org.ietf.jgss.GSSName;
+import org.ietf.jgss.Oid;
+import org.slf4j.Logger;
+
+public final class SecurityUtils {
+
+    public static final String QUORUM_HOSTNAME_PATTERN = "_HOST";
+
+    /**
+     * Create an instance of a SaslClient. It will return null if there is an exception.
+     *
+     * @param subject subject
+     * @param servicePrincipal principal
+     * @param protocol name of the protocol for which the authentication is being performed
+     * @param serverName name of the server to authenticate to
+     * @param LOG logger
+     * @param entity can be either zookeeper client or quorum learner
+     *
+     * @return saslclient object
+     * @throws SaslException
+     */
+    public static SaslClient createSaslClient(final Subject subject,
+            final String servicePrincipal, final String protocol,
+            final String serverName, final Logger LOG, final String entity) throws SaslException {
+        SaslClient saslClient;
+        // Use subject.getPrincipals().isEmpty() as an indication of which SASL
+        // mechanism to use: if empty, use DIGEST-MD5; otherwise, use GSSAPI.
+        if (subject.getPrincipals().isEmpty()) {
+            // no principals: must not be GSSAPI: use DIGEST-MD5 mechanism
+            // instead.
+            LOG.info("{} will use DIGEST-MD5 as SASL mechanism.", entity);
+            String[] mechs = { "DIGEST-MD5" };
+            String username = (String) (subject.getPublicCredentials()
+                    .toArray()[0]);
+            String password = (String) (subject.getPrivateCredentials()
+                    .toArray()[0]);
+            // 'domain' parameter is hard-wired between the server and client
+            saslClient = Sasl.createSaslClient(mechs, username, protocol,
+                    serverName, null, new SaslClientCallbackHandler(password, entity));
+            return saslClient;
+        } else { // GSSAPI.
+            final Object[] principals = subject.getPrincipals().toArray();
+            // determine client principal from subject.
+            final Principal clientPrincipal = (Principal) principals[0];
+            boolean usingNativeJgss = Boolean
+                    .getBoolean("sun.security.jgss.native");
+            if (usingNativeJgss) {
+                // http://docs.oracle.com/javase/6/docs/technotes/guides/security/jgss/jgss-features.html
+                // """
+                // In addition, when performing operations as a particular
+                // Subject, e.g. Subject.doAs(...) or
+                // Subject.doAsPrivileged(...),
+                // the to-be-used GSSCredential should be added to Subject's
+                // private credential set. Otherwise, the GSS operations will
+                // fail since no credential is found.
+                // """
+                try {
+                    GSSManager manager = GSSManager.getInstance();
+                    Oid krb5Mechanism = new Oid("1.2.840.113554.1.2.2");
+                    GSSCredential cred = manager.createCredential(null,
+                            GSSContext.DEFAULT_LIFETIME, krb5Mechanism,
+                            GSSCredential.INITIATE_ONLY);
+                    subject.getPrivateCredentials().add(cred);
+                    LOG.debug("Added private credential to {} principal name: '{}'",
+                            entity, clientPrincipal);
+                } catch (GSSException ex) {
+                    LOG.warn("Cannot add private credential to subject; "
+                                    + "authentication at the server may fail", ex);
+                }
+            }
+            final KerberosName clientKerberosName = new KerberosName(
+                    clientPrincipal.getName());
+            // assume that server and client are in the same realm (by default;
+            // unless the system property
+            // "zookeeper.server.realm" is set).
+            String serverRealm = System.getProperty("zookeeper.server.realm",
+                    clientKerberosName.getRealm());
+            KerberosName serviceKerberosName = new KerberosName(
+                    servicePrincipal + "@" + serverRealm);
+            final String serviceName = serviceKerberosName.getServiceName();
+            final String serviceHostname = serviceKerberosName.getHostName();
+            final String clientPrincipalName = clientKerberosName.toString();
+            try {
+                saslClient = Subject.doAs(subject,
+                        new PrivilegedExceptionAction<SaslClient>() {
+                            public SaslClient run() throws SaslException {
+                                LOG.info("{} will use GSSAPI as SASL mechanism.", entity);
+                                String[] mechs = { "GSSAPI" };
+                                LOG.debug("creating sasl client: {}={};service={};serviceHostname={}",
+                                        new Object[] { entity, clientPrincipalName, serviceName, serviceHostname });
+                                SaslClient saslClient = Sasl.createSaslClient(
+                                        mechs, clientPrincipalName, serviceName,
+                                        serviceHostname, null,
+                                        new SaslClientCallbackHandler(null, entity));
+                                return saslClient;
+                            }
+                        });
+                return saslClient;
+            } catch (Exception e) {
+                LOG.error("Exception while trying to create SASL client", e);
+                return null;
+            }
+        }
+    }
+
+    /**
+     * Create an instance of a SaslServer. It will return null if there is an exception.
+     *
+     * @param subject subject
+     * @param protocol protocol
+     * @param serverName server name
+     * @param callbackHandler login callback handler
+     * @param LOG logger
+     * @return sasl server object
+     */
+    public static SaslServer createSaslServer(final Subject subject,
+            final String protocol, final String serverName,
+            final CallbackHandler callbackHandler, final Logger LOG) {
+        if (subject != null) {
+            // server is using a JAAS-authenticated subject: determine service
+            // principal name and hostname from zk server's subject.
+            if (subject.getPrincipals().size() > 0) {
+                try {
+                    final Object[] principals = subject.getPrincipals()
+                            .toArray();
+                    final Principal servicePrincipal = (Principal) principals[0];
+
+                    // e.g. servicePrincipalNameAndHostname :=
+                    // "zookeeper/myhost.foo.com@FOO.COM"
+                    final String servicePrincipalNameAndHostname = servicePrincipal
+                            .getName();
+
+                    int indexOf = servicePrincipalNameAndHostname.indexOf("/");
+
+                    // e.g. servicePrincipalName := "zookeeper"
+                    final String servicePrincipalName = servicePrincipalNameAndHostname
+                            .substring(0, indexOf);
+
+                    // e.g. serviceHostnameAndKerbDomain :=
+                    // "myhost.foo.com@FOO.COM"
+                    final String serviceHostnameAndKerbDomain = servicePrincipalNameAndHostname
+                            .substring(indexOf + 1,
+                                    servicePrincipalNameAndHostname.length());
+
+                    indexOf = serviceHostnameAndKerbDomain.indexOf("@");
+                    // e.g. serviceHostname := "myhost.foo.com"
+                    final String serviceHostname = serviceHostnameAndKerbDomain
+                            .substring(0, indexOf);
+
+                    // TODO: should depend on zoo.cfg specified mechs, but if
+                    // subject is non-null, it can be assumed to be GSSAPI.
+                    final String mech = "GSSAPI";
+
+                    LOG.debug("serviceHostname is '" + serviceHostname + "'");
+                    LOG.debug("servicePrincipalName is '" + servicePrincipalName
+                            + "'");
+                    LOG.debug("SASL mechanism(mech) is '" + mech + "'");
+
+                    boolean usingNativeJgss = Boolean
+                            .getBoolean("sun.security.jgss.native");
+                    if (usingNativeJgss) {
+                        // http://docs.oracle.com/javase/6/docs/technotes/guides/security/jgss/jgss-features.html
+                        // """
+                        // In addition, when performing operations as a
+                        // particular
+                        // Subject, e.g. Subject.doAs(...) or
+                        // Subject.doAsPrivileged(...), the to-be-used
+                        // GSSCredential should be added to Subject's
+                        // private credential set. Otherwise, the GSS operations
+                        // will fail since no credential is found.
+                        // """
+                        try {
+                            GSSManager manager = GSSManager.getInstance();
+                            Oid krb5Mechanism = new Oid("1.2.840.113554.1.2.2");
+                            GSSName gssName = manager.createName(
+                                    servicePrincipalName + "@"
+                                            + serviceHostname,
+                                    GSSName.NT_HOSTBASED_SERVICE);
+                            GSSCredential cred = manager.createCredential(
+                                    gssName, GSSContext.DEFAULT_LIFETIME,
+                                    krb5Mechanism, GSSCredential.ACCEPT_ONLY);
+                            subject.getPrivateCredentials().add(cred);
+                            LOG.debug("Added private credential to service principal name: '{}',"
+                                            + " GSSCredential name: {}", servicePrincipalName, cred.getName());
+                        } catch (GSSException ex) {
+                            LOG.warn("Cannot add private credential to subject; "
+                                            + "clients authentication may fail", ex);
+                        }
+                    }
+                    try {
+                        return Subject.doAs(subject,
+                                new PrivilegedExceptionAction<SaslServer>() {
+                                    public SaslServer run() {
+                                        try {
+                                            SaslServer saslServer;
+                                            saslServer = Sasl.createSaslServer(
+                                                    mech, servicePrincipalName,
+                                                    serviceHostname, null,
+                                                    callbackHandler);
+                                            return saslServer;
+                                        } catch (SaslException e) {
+                                            LOG.error("Zookeeper Server failed to create a SaslServer to interact with a client during session initiation: ", e);
+                                            return null;
+                                        }
+                                    }
+                                });
+                    } catch (PrivilegedActionException e) {
+                        // TODO: exit server at this point(?)
+                        LOG.error("Zookeeper Quorum member experienced a PrivilegedActionException exception while creating a SaslServer using a JAAS principal context:", e);
+                    }
+                } catch (IndexOutOfBoundsException e) {
+                    LOG.error("server principal name/hostname determination error: ", e);
+                }
+            } else {
+                // JAAS non-GSSAPI authentication: assuming and supporting only
+                // DIGEST-MD5 mechanism for now.
+                // TODO: use 'authMech=' value in zoo.cfg.
+                try {
+                    SaslServer saslServer = Sasl.createSaslServer("DIGEST-MD5",
+                            protocol, serverName, null, callbackHandler);
+                    return saslServer;
+                } catch (SaslException e) {
+                    LOG.error("Zookeeper Quorum member failed to create a SaslServer to interact with a client during session initiation", e);
+                }
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Convert Kerberos principal name pattern to valid Kerberos principal name.
+     * If the principal name contains hostname pattern "_HOST" then it replaces
+     * with the given hostname, which should be fully-qualified domain name.
+     *
+     * @param principalConfig
+     *            the Kerberos principal name conf value to convert
+     * @param hostname
+     *            the fully-qualified domain name used for substitution
+     * @return converted Kerberos principal name
+     */
+    public static String getServerPrincipal(String principalConfig,
+            String hostname) {
+        String[] components = getComponents(principalConfig);
+        if (components == null || components.length != 2
+                || !components[1].equals(QUORUM_HOSTNAME_PATTERN)) {
+            return principalConfig;
+        } else {
+            return replacePattern(components, hostname);
+        }
+    }
+
+    private static String[] getComponents(String principalConfig) {
+        if (principalConfig == null)
+            return null;
+        return principalConfig.split("[/]");
+    }
+
+    private static String replacePattern(String[] components, String hostname) {
+        return components[0] + "/" + hostname.toLowerCase();
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/test/config/findbugsExcludeFile.xml
----------------------------------------------------------------------
diff --git a/src/java/test/config/findbugsExcludeFile.xml b/src/java/test/config/findbugsExcludeFile.xml
index 60f2366..5c29155 100644
--- a/src/java/test/config/findbugsExcludeFile.xml
+++ b/src/java/test/config/findbugsExcludeFile.xml
@@ -67,6 +67,11 @@
   </Match>
 
   <Match>
+      <Class name="org.apache.zookeeper.server.quorum.QuorumAuthPacket" />
+      <Bug code="EI2, EI" />
+  </Match>
+
+  <Match>
     <Class name="org.apache.zookeeper.ClientCnxn"/>
       <Bug code="EI, EI2" />
   </Match>

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/test/data/kerberos/minikdc-krb5.conf
----------------------------------------------------------------------
diff --git a/src/java/test/data/kerberos/minikdc-krb5.conf b/src/java/test/data/kerberos/minikdc-krb5.conf
new file mode 100644
index 0000000..43ec7c4
--- /dev/null
+++ b/src/java/test/data/kerberos/minikdc-krb5.conf
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# This resource is originally from HDFS, see the similarly named files there
+# in case of bug fixing, history, etc.
+# Branch : trunk
+# Github Revision: 1d1ab587e4e92ce3aea4cb144811f69145cb3b33
+#
+[libdefaults]
+    default_realm = {0}
+    udp_preference_limit = 1
+
+[realms]
+    {0} = '{'
+        kdc = {1}:{2}
+    '}'
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/test/data/kerberos/minikdc.ldiff
----------------------------------------------------------------------
diff --git a/src/java/test/data/kerberos/minikdc.ldiff b/src/java/test/data/kerberos/minikdc.ldiff
new file mode 100644
index 0000000..20c8d77
--- /dev/null
+++ b/src/java/test/data/kerberos/minikdc.ldiff
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# This resource is originally from HDFS, see the similarly named files there
+# in case of bug fixing, history, etc.
+# Branch : trunk
+# Github Revision: 1d1ab587e4e92ce3aea4cb144811f69145cb3b33
+#
+dn: ou=users,dc=${0},dc=${1}
+objectClass: organizationalUnit
+objectClass: top
+ou: users
+
+dn: uid=krbtgt,ou=users,dc=${0},dc=${1}
+objectClass: top
+objectClass: person
+objectClass: inetOrgPerson
+objectClass: krb5principal
+objectClass: krb5kdcentry
+cn: KDC Service
+sn: Service
+uid: krbtgt
+userPassword: secret
+krb5PrincipalName: krbtgt/${2}.${3}@${2}.${3}
+krb5KeyVersionNumber: 0
+
+dn: uid=ldap,ou=users,dc=${0},dc=${1}
+objectClass: top
+objectClass: person
+objectClass: inetOrgPerson
+objectClass: krb5principal
+objectClass: krb5kdcentry
+cn: LDAP
+sn: Service
+uid: ldap
+userPassword: secret
+krb5PrincipalName: ldap/${4}@${2}.${3}
+krb5KeyVersionNumber: 0
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/test/org/apache/zookeeper/server/quorum/EphemeralNodeDeletionTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/EphemeralNodeDeletionTest.java b/src/java/test/org/apache/zookeeper/server/quorum/EphemeralNodeDeletionTest.java
index 4ec9a07..9edb4be 100644
--- a/src/java/test/org/apache/zookeeper/server/quorum/EphemeralNodeDeletionTest.java
+++ b/src/java/test/org/apache/zookeeper/server/quorum/EphemeralNodeDeletionTest.java
@@ -38,6 +38,8 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 
+import javax.security.sasl.SaslException;
+
 public class EphemeralNodeDeletionTest extends QuorumPeerTestBase {
     private static int SERVER_COUNT = 3;
     private MainThread[] mt = new MainThread[SERVER_COUNT];
@@ -181,6 +183,10 @@ public class EphemeralNodeDeletionTest extends QuorumPeerTestBase {
     static class CustomQuorumPeer extends QuorumPeer {
         private boolean injectError = false;
 
+        public CustomQuorumPeer() throws SaslException {
+
+        }
+
         @Override
         protected Follower makeFollower(FileTxnSnapLog logFactory)
                 throws IOException {
@@ -215,7 +221,7 @@ public class EphemeralNodeDeletionTest extends QuorumPeerTestBase {
 
     static class MockTestQPMain extends TestQPMain {
         @Override
-        protected QuorumPeer getQuorumPeer() {
+        protected QuorumPeer getQuorumPeer() throws SaslException {
             return new CustomQuorumPeer();
         }
     }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java b/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java
index 9b7cc56..f50d0bb 100644
--- a/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java
+++ b/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java
@@ -115,7 +115,7 @@ public class FLEBackwardElectionRoundTest extends ZKTestCase {
          * Start mock server 1
          */
         QuorumPeer mockPeer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 1000, 2, 2);
-        cnxManagers[0] = new QuorumCnxManager(mockPeer);
+        cnxManagers[0] = mockPeer.createCnxnManager();
         cnxManagers[0].listener.start();
 
         cnxManagers[0].toSend(0l, initialMsg0);
@@ -124,7 +124,7 @@ public class FLEBackwardElectionRoundTest extends ZKTestCase {
          * Start mock server 2
          */
         mockPeer = new QuorumPeer(peers, tmpdir[2], tmpdir[2], port[2], 3, 2, 1000, 2, 2);
-        cnxManagers[1] = new QuorumCnxManager(mockPeer);
+        cnxManagers[1] = mockPeer.createCnxnManager();
         cnxManagers[1].listener.start();
 
         cnxManagers[1].toSend(0l, initialMsg1);

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java b/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java
index cc44243..6583f90 100644
--- a/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java
+++ b/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java
@@ -95,7 +95,7 @@ public class FLELostMessageTest extends ZKTestCase {
 
     void mockServer() throws InterruptedException, IOException {
         QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2);
-        cnxManager = new QuorumCnxManager(peer);
+        cnxManager = peer.createCnxnManager();
         cnxManager.listener.start();
 
         cnxManager.toSend(1l, FLETestUtils.createMsg(ServerState.LOOKING.ordinal(), 0, 0, 0));

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/test/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java b/src/java/test/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java
index c4ea646..f238971 100644
--- a/src/java/test/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java
+++ b/src/java/test/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.net.Socket;
 import java.util.Iterator;
@@ -55,7 +56,7 @@ public class LearnerHandlerTest extends ZKTestCase {
         boolean threadStarted = false;
 
         MockLearnerHandler(Socket sock, Leader leader) throws IOException {
-            super(sock, leader);
+            super(sock, new BufferedInputStream(sock.getInputStream()), leader);
         }
 
         protected void startSendingPackets() {

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/test/org/apache/zookeeper/server/quorum/LearnerTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/LearnerTest.java b/src/java/test/org/apache/zookeeper/server/quorum/LearnerTest.java
index 85284f6..1793550 100644
--- a/src/java/test/org/apache/zookeeper/server/quorum/LearnerTest.java
+++ b/src/java/test/org/apache/zookeeper/server/quorum/LearnerTest.java
@@ -110,7 +110,7 @@ public class LearnerTest extends ZKTestCase {
         InetSocketAddress addr = new InetSocketAddress(1111);
 
         // we expect this to throw an IOException since we're faking socket connect errors every time
-        learner.connectToLeader(addr);
+        learner.connectToLeader(addr, "");
     }
     @Test
     public void connectionInitLimitTimeoutTest() throws Exception {
@@ -130,7 +130,7 @@ public class LearnerTest extends ZKTestCase {
 
         // we expect this to throw an IOException since we're faking socket connect errors every time
         try {
-            learner.connectToLeader(addr);
+            learner.connectToLeader(addr, "");
             Assert.fail("should have thrown IOException!");
         } catch (IOException e) {
             //good, wanted to see that, let's make sure we ran out of time

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
index 43e7002..21aa819 100644
--- a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
+++ b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
@@ -26,6 +26,10 @@ import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.FilenameFilter;
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 import java.util.Properties;
 
 import org.slf4j.Logger;
@@ -47,7 +51,7 @@ public class QuorumPeerTestBase extends ZKTestCase implements Watcher {
     protected static final Logger LOG = LoggerFactory
             .getLogger(QuorumPeerTestBase.class);
 
-    public static final int TIMEOUT = 3000;
+    public static final int TIMEOUT = 5000;
 
     public void process(WatchedEvent event) {
         // ignore for this test
@@ -72,6 +76,60 @@ public class QuorumPeerTestBase extends ZKTestCase implements Watcher {
 
         volatile TestQPMain main;
 
+        File baseDir;
+        private int myid;
+        private int clientPort;
+        private String quorumCfgSection;
+        private Map<String, String> otherConfigs;
+
+        public MainThread(int myid, int clientPort, String quorumCfgSection,
+                          Map<String, String> otherConfigs) throws IOException {
+            baseDir = ClientBase.createTmpDir();
+            this.myid = myid;
+            this.clientPort = clientPort;
+            this.quorumCfgSection = quorumCfgSection;
+            this.otherConfigs = otherConfigs;
+            LOG.info("id = " + myid + " tmpDir = " + baseDir + " clientPort = "
+                    + clientPort);
+            confFile = new File(baseDir, "zoo.cfg");
+
+            FileWriter fwriter = new FileWriter(confFile);
+            fwriter.write("tickTime=4000\n");
+            fwriter.write("initLimit=10\n");
+            fwriter.write("syncLimit=5\n");
+
+            tmpDir = new File(baseDir, "data");
+            if (!tmpDir.mkdir()) {
+                throw new IOException("Unable to mkdir " + tmpDir);
+            }
+
+            // Convert windows path to UNIX to avoid problems with "\"
+            String dir = tmpDir.toString();
+            String osname = java.lang.System.getProperty("os.name");
+            if (osname.toLowerCase().contains("windows")) {
+                dir = dir.replace('\\', '/');
+            }
+            fwriter.write("dataDir=" + dir + "\n");
+
+            fwriter.write("clientPort=" + clientPort + "\n");
+
+            // write extra configurations
+            Set<Entry<String, String>> entrySet = otherConfigs.entrySet();
+            for (Entry<String, String> entry : entrySet) {
+                fwriter.write(entry.getKey() + "=" + entry.getValue() + "\n");
+            }
+
+            fwriter.write(quorumCfgSection + "\n");
+            fwriter.flush();
+            fwriter.close();
+
+            File myidFile = new File(tmpDir, "myid");
+            fwriter = new FileWriter(myidFile);
+            fwriter.write(Integer.toString(myid));
+            fwriter.flush();
+            fwriter.close();
+        }
+
         public MainThread(int myid, String quorumCfgSection) throws IOException {
             this(myid, quorumCfgSection, true);
         }
@@ -87,11 +145,6 @@ public class QuorumPeerTestBase extends ZKTestCase implements Watcher {
             this(myid, UNSET_STATIC_CLIENTPORT, quorumCfgSection, writeDynamicConfigFile);
         }
 
-        public MainThread(int myid, int clientPort, String quorumCfgSection)
-                throws IOException {
-            this(myid, clientPort, JettyAdminServer.DEFAULT_PORT, quorumCfgSection, null, null, true);
-        }
-
         public MainThread(int myid, int clientPort, String quorumCfgSection, boolean writeDynamicConfigFile)
                 throws IOException {
             this(myid, clientPort, JettyAdminServer.DEFAULT_PORT, quorumCfgSection, null, null, writeDynamicConfigFile);
@@ -237,6 +290,12 @@ public class QuorumPeerTestBase extends ZKTestCase implements Watcher {
             fwriter.close();
         }
 
+        public MainThread(int myid, int clientPort, String quorumCfgSection)
+                throws IOException {
+            this(myid, clientPort, quorumCfgSection,
+                    new HashMap<String, String>());
+        }
+
         Thread currentThread;
 
         synchronized public void start() {
@@ -301,5 +360,31 @@ public class QuorumPeerTestBase extends ZKTestCase implements Watcher {
         public QuorumPeer getQuorumPeer() {
             return main.quorumPeer;
         }
+
+
+        public void deleteBaseDir() {
+            ClientBase.recursiveDelete(baseDir);
+        }
+
+        public int getMyid() {
+            return myid;
+        }
+
+        public int getClientPort() {
+            return clientPort;
+        }
+
+        public String getQuorumCfgSection() {
+            return quorumCfgSection;
+        }
+
+        public Map<String, String> getOtherConfigs() {
+            return otherConfigs;
+        }
+
+        public File getConfFile() {
+            return confFile;
+        }
+
     }
 }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/test/org/apache/zookeeper/server/quorum/RaceConditionTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/RaceConditionTest.java b/src/java/test/org/apache/zookeeper/server/quorum/RaceConditionTest.java
index c65a794..e96d273 100644
--- a/src/java/test/org/apache/zookeeper/server/quorum/RaceConditionTest.java
+++ b/src/java/test/org/apache/zookeeper/server/quorum/RaceConditionTest.java
@@ -42,6 +42,8 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.security.sasl.SaslException;
+
 /**
  * This test class contains test cases related to race condition in complete
  * ZooKeeper
@@ -156,6 +158,9 @@ public class RaceConditionTest extends QuorumPeerTestBase {
     private static class CustomQuorumPeer extends QuorumPeer {
         private boolean stopPing;
 
+        public CustomQuorumPeer() throws SaslException {
+        }
+
         public void setStopPing(boolean stopPing) {
             this.stopPing = stopPing;
         }
@@ -239,7 +244,7 @@ public class RaceConditionTest extends QuorumPeerTestBase {
     private static class MockTestQPMain extends TestQPMain {
 
         @Override
-        protected QuorumPeer getQuorumPeer() {
+        protected QuorumPeer getQuorumPeer() throws SaslException {
             return new CustomQuorumPeer();
         }
     }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java b/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
index acac87e..10ec1ea 100644
--- a/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
+++ b/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
@@ -23,6 +23,7 @@ import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 
+import java.io.BufferedInputStream;
 import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
@@ -386,7 +387,9 @@ public class Zab1_0Test extends ZKTestCase {
                 Thread.sleep(20);
             }
             
-            LearnerHandler lh = new LearnerHandler(leaderSocket, leader);
+            LearnerHandler lh = new LearnerHandler(leaderSocket,
+                    new BufferedInputStream(leaderSocket.getInputStream()),
+                    leader);
             lh.start();
             leaderSocket.setSoTimeout(4000);
 
@@ -454,8 +457,10 @@ public class Zab1_0Test extends ZKTestCase {
             while(leader.cnxAcceptor == null || !leader.cnxAcceptor.isAlive()) {
                 Thread.sleep(20);
             }
-            
-            LearnerHandler lh = new LearnerHandler(leaderSocket, leader);
+
+            LearnerHandler lh = new LearnerHandler(leaderSocket,
+                    new BufferedInputStream(leaderSocket.getInputStream()),
+                    leader);
             lh.start();
             leaderSocket.setSoTimeout(4000);
 
@@ -477,7 +482,6 @@ public class Zab1_0Test extends ZKTestCase {
         }
     }
     
-    
     public void testFollowerConversation(FollowerConversation conversation) throws Exception {
         File tmpDir = File.createTempFile("test", "dir", testData);
         tmpDir.delete();
@@ -491,8 +495,10 @@ public class Zab1_0Test extends ZKTestCase {
             peer.follower = follower;
             
             ServerSocket ss =
-                new ServerSocket(0, 50, InetAddress.getByName("127.0.0.1"));
-            follower.setLeaderSocketAddress((InetSocketAddress)ss.getLocalSocketAddress());
+                    new ServerSocket(0, 50, InetAddress.getByName("127.0.0.1"));
+            QuorumServer leaderQS = new QuorumServer(1,
+                    (InetSocketAddress) ss.getLocalSocketAddress());
+            follower.setLeaderQuorumServer(leaderQS);
             final Follower followerForThread = follower;
             
             followerThread = new Thread() {
@@ -545,7 +551,8 @@ public class Zab1_0Test extends ZKTestCase {
 
             ServerSocket ss =
                 new ServerSocket(0, 50, InetAddress.getByName("127.0.0.1"));
-            observer.setLeaderSocketAddress((InetSocketAddress)ss.getLocalSocketAddress());
+            QuorumServer leaderQS = new QuorumServer(1, (InetSocketAddress) ss.getLocalSocketAddress());
+            observer.setLeaderQuorumServer(leaderQS);
             final Observer observerForThread = observer;
 
             observerThread = new Thread() {
@@ -1272,14 +1279,14 @@ public class Zab1_0Test extends ZKTestCase {
             super(self, zk);
         }
 
-        InetSocketAddress leaderAddr;
-        public void setLeaderSocketAddress(InetSocketAddress addr) {
-            leaderAddr = addr;
+        QuorumServer leaderQuorumServer;
+        public void setLeaderQuorumServer(QuorumServer quorumServer) {
+            leaderQuorumServer = quorumServer;
         }
         
         @Override
-        protected InetSocketAddress findLeader() {
-            return leaderAddr;
+        protected QuorumServer findLeader() {
+            return leaderQuorumServer;
         }
     }
     private ConversableFollower createFollower(File tmpDir, QuorumPeer peer)
@@ -1298,14 +1305,14 @@ public class Zab1_0Test extends ZKTestCase {
             super(self, zk);
         }
 
-        InetSocketAddress leaderAddr;
-        public void setLeaderSocketAddress(InetSocketAddress addr) {
-            leaderAddr = addr;
+        QuorumServer leaderQuorumServer;
+        public void setLeaderQuorumServer(QuorumServer quorumServer) {
+            leaderQuorumServer = quorumServer;
         }
 
         @Override
-        protected InetSocketAddress findLeader() {
-            return leaderAddr;
+        protected QuorumServer findLeader() {
+            return leaderQuorumServer;
         }
     }
 
@@ -1321,7 +1328,7 @@ public class Zab1_0Test extends ZKTestCase {
 
     private QuorumPeer createQuorumPeer(File tmpDir) throws IOException, FileNotFoundException {
         HashMap<Long, QuorumServer> peers = new HashMap<Long, QuorumServer>();
-        QuorumPeer peer = new QuorumPeer();
+        QuorumPeer peer = QuorumPeer.testingQuorumPeer();
         peer.syncLimit = SYNC_LIMIT;
         peer.initLimit = 2;
         peer.tickTime = 2000;
@@ -1374,7 +1381,7 @@ public class Zab1_0Test extends ZKTestCase {
                     new ErrorTxn(1), zxid));
             logFactory.commit();
             ZKDatabase zkDb = new ZKDatabase(logFactory);
-            QuorumPeer peer = new QuorumPeer();
+            QuorumPeer peer = QuorumPeer.testingQuorumPeer();
             peer.setZKDatabase(zkDb);
             peer.setTxnFactory(logFactory);
             peer.getLastLoggedZxid();

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/test/org/apache/zookeeper/server/quorum/auth/KerberosSecurityTestcase.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/auth/KerberosSecurityTestcase.java b/src/java/test/org/apache/zookeeper/server/quorum/auth/KerberosSecurityTestcase.java
new file mode 100644
index 0000000..9617c70
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/quorum/auth/KerberosSecurityTestcase.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.quorum.auth;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
+
+/*
+ * This code is originally from HDFS, see the similarly named file there
+ * in case of bug fixing, history, etc.
+ *
+ * Branch : trunk
+ * Github Revision: 1d1ab587e4e92ce3aea4cb144811f69145cb3b33
+ */
+
+/**
+ * KerberosSecurityTestcase provides a base class for using MiniKdc with other
+ * test cases. KerberosSecurityTestcase starts the MiniKdc (@Before) before
+ * running tests, and stop the MiniKdc (@After) after the testcases, using
+ * default settings (working dir and kdc configurations).
+ * <p>
+ * Users can directly inherit this class and implement their own test functions
+ * using the default settings, or override functions getTestDir() and
+ * createMiniKdcConf() to provide new settings.
+ */
+public class KerberosSecurityTestcase extends QuorumAuthTestBase {
+    private static MiniKdc kdc;
+    private static File workDir;
+    private static Properties conf;
+
+    @BeforeClass
+    public static void setUpSasl() throws Exception {
+        startMiniKdc();
+    }
+
+    @AfterClass
+    public static void tearDownSasl() throws Exception {
+        stopMiniKdc();
+        FileUtils.deleteQuietly(workDir);
+    }
+
+    public static void startMiniKdc() throws Exception {
+        createTestDir();
+        createMiniKdcConf();
+
+        kdc = new MiniKdc(conf, workDir);
+        kdc.start();
+    }
+
+    /**
+     * Create a working directory, it should be the build directory. Under this
+     * directory an ApacheDS working directory will be created, this directory
+     * will be deleted when the MiniKdc stops.
+     *
+     * @throws IOException
+     */
+    public static void createTestDir() throws IOException {
+        workDir = createTmpDir(
+                new File(System.getProperty("build.test.dir", "build")));
+    }
+
+    static File createTmpDir(File parentDir) throws IOException {
+        File tmpFile = File.createTempFile("test", ".junit", parentDir);
+        // don't delete tmpFile - this ensures we don't attempt to create
+        // a tmpDir with a duplicate name
+        File tmpDir = new File(tmpFile + ".dir");
+        // never true if tmpfile does it's job
+        Assert.assertFalse(tmpDir.exists());
+        Assert.assertTrue(tmpDir.mkdirs());
+        return tmpDir;
+    }
+
+    /**
+     * Create a Kdc configuration
+     */
+    public static void createMiniKdcConf() {
+        conf = MiniKdc.createConf();
+    }
+
+    public static void stopMiniKdc() {
+        if (kdc != null) {
+            kdc.stop();
+        }
+    }
+
+    public static MiniKdc getKdc() {
+        return kdc;
+    }
+
+    public static File getWorkDir() {
+        return workDir;
+    }
+
+    public static Properties getConf() {
+        return conf;
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/75411ab3/src/java/test/org/apache/zookeeper/server/quorum/auth/KerberosTestUtils.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/auth/KerberosTestUtils.java b/src/java/test/org/apache/zookeeper/server/quorum/auth/KerberosTestUtils.java
new file mode 100644
index 0000000..4a75f83
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/quorum/auth/KerberosTestUtils.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum.auth;
+
+import java.io.File;
+import java.util.UUID;
+
+import org.apache.zookeeper.util.SecurityUtils;
+
+public class KerberosTestUtils {
+    private static String keytabFile = new File(System.getProperty("test.dir", "build"), UUID.randomUUID().toString())
+            .getAbsolutePath();
+
+    public static String getRealm() {
+        return "EXAMPLE.COM";
+    }
+
+    public static String getLearnerPrincipal() {
+        return "learner@EXAMPLE.COM";
+    }
+
+    public static String getServerPrincipal() {
+        return "zkquorum/localhost@EXAMPLE.COM";
+    }
+
+    public static String getHostLearnerPrincipal() {
+        return "learner/_HOST@EXAMPLE.COM";
+    }
+
+    public static String getHostServerPrincipal() {
+        return "zkquorum/_HOST@EXAMPLE.COM";
+    }
+
+    public static String getHostNamedLearnerPrincipal(String myHostname) {
+        return "learner/" + myHostname + "@EXAMPLE.COM";
+    }
+
+    public static String getKeytabFile() {
+        return keytabFile;
+    }
+
+    public static String replaceHostPattern(String principal) {
+        String[] components = principal.split("[/@]");
+        if (components == null || components.length < 2
+                || !components[1].equals(SecurityUtils.QUORUM_HOSTNAME_PATTERN)) {
+            return principal;
+        } else {
+            return replacePattern(components, "localhost");
+        }
+    }
+
+    public static String replacePattern(String[] components, String hostname) {
+        if (components.length == 3) {
+            return components[0] + "/" + hostname.toLowerCase() + "@"
+                    + components[2];
+        } else {
+            return components[0] + "/" + hostname.toLowerCase();
+        }
+    }
+}