You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by eo...@apache.org on 2019/07/24 16:00:17 UTC

[zookeeper] branch master updated: ZOOKEEPER-1634: hardening security by teaching server to enforce client authentication.

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 513df3d  ZOOKEEPER-1634: hardening security by teaching server to enforce client authentication.
513df3d is described below

commit 513df3da671c9937417bb7d92a55402520ba1292
Author: Michael Han <lh...@twitter.com>
AuthorDate: Wed Jul 24 18:00:02 2019 +0200

    ZOOKEEPER-1634: hardening security by teaching server to enforce client authentication.
    
    ## Motivation
    Previously ZooKeeper server is open to the world as the server does not enforce client authentication - anonymous clients are allowed to establish session with server. This behavior raises a couple of issues from the perspective of performance and security for example:
    * It is easy to launch a DDoS attack to server, by having a fleet of anonymous clients connect to the ensemble, as each session would consume valuable resources (socket, memory, etc) from server.
    * It is cumbersome to enforce certain security models with the presence of anonymous clients login - for example as clients are not trusted the root ACL has to be disabled for writes to world, among other configurations an admin has to do to secure a cluster in a multi-tenant environment.
    
    So the goal here is to address such issue by hardening ZooKeeper security to provide a more confined access option that user could opt-in, which in addition to the existing ACLs together could lead to more secured / resource optimal ensemble.
    
    ## Design Abstract
    * Introduce a new server side Java property that if set, ZooKeeper server will only accept connections and requests from clients that have authenticated with server via SASL.
    * Clients that are not configured with SASL authentication, or configured with SASL but fail authentication (i.e. with invalid credential) will not be able to establish a session with server. A typed error code (-124) will be delivered in such case, both Java and C client will close the session with server thereafter, without further attempts on retrying to reconnect.
    * This feature overrules the server property "zookeeper.allowSaslFailedClients". So even if server is configured to allow clients that fail SASL authentication to login, client will not be able to establish a session with server if this feature is enabled.
    * Only support SASL because only SASL authentication has the property that no operations will happen until SASL authentication process finished. Thus, the decision of whether to close the session or not can be quickly made on server side upon receiving a client connection request. We could later add support for other auth scheme via add_auth_info if that's desired (if we do, then a session has to be maintained until add_auth_info is invoked.).
    * As a side benefit, this PR fixes an issue mentioned in ZOOKEEPER-2346 by correctly propagate events from server to client side so a SASL auth failure will manifest as an auth / config failure rather than generic ConnectionLoss event.
    
    JIRA: https://issues.apache.org/jira/browse/ZOOKEEPER-1634
    The PR also covers (or part of):
    https://issues.apache.org/jira/browse/ZOOKEEPER-2462
    https://issues.apache.org/jira/browse/ZOOKEEPER-2526
    https://issues.apache.org/jira/browse/ZOOKEEPER-2346
    
    Author: Michael Han <lh...@twitter.com>
    
    Reviewers: Enrico Olivelli <eo...@apache.org>, Andor Molnar <an...@apache.org>
    
    Closes #118 from hanm/ZOOKEEPER-1634
---
 zookeeper-client/zookeeper-client-c/Makefile.am    |   1 +
 .../zookeeper-client-c/include/zookeeper.h         |   3 +-
 .../tests/TestServerRequireClientSASLAuth.cc       | 109 +++++++++++++++++++++
 .../zookeeper-client-c/tests/zkServer.sh           |  16 ++-
 .../src/main/resources/markdown/zookeeperAdmin.md  |  17 ++++
 .../java/org/apache/zookeeper/KeeperException.java |  20 +++-
 .../apache/zookeeper/server/ZooKeeperServer.java   |  76 ++++++++++----
 .../test/SaslAuthRequiredFailNoSASLTest.java       |  62 ++++++++++++
 .../test/SaslAuthRequiredFailWrongSASLTest.java    |  64 ++++++++++++
 .../zookeeper/test/SaslAuthRequiredTest.java       |  62 ++++++++++++
 .../org/apache/zookeeper/test/SaslTestUtil.java    |  61 ++++++++++++
 11 files changed, 469 insertions(+), 22 deletions(-)

diff --git a/zookeeper-client/zookeeper-client-c/Makefile.am b/zookeeper-client/zookeeper-client-c/Makefile.am
index 059b22e..a5312a8 100644
--- a/zookeeper-client/zookeeper-client-c/Makefile.am
+++ b/zookeeper-client/zookeeper-client-c/Makefile.am
@@ -107,6 +107,7 @@ TEST_SOURCES = \
 	tests/ZooKeeperQuorumServer.h \
 	tests/TestReadOnlyClient.cc \
 	tests/TestLogClientEnv.cc \
+        tests/TestServerRequireClientSASLAuth.cc \
 	$(NULL)
 
 if SOLARIS
diff --git a/zookeeper-client/zookeeper-client-c/include/zookeeper.h b/zookeeper-client/zookeeper-client-c/include/zookeeper.h
index 801027d..304dafe 100644
--- a/zookeeper-client/zookeeper-client-c/include/zookeeper.h
+++ b/zookeeper-client/zookeeper-client-c/include/zookeeper.h
@@ -130,7 +130,8 @@ enum ZOO_ERRORS {
   ZNOTREADONLY = -119, /*!< state-changing request is passed to read-only server */
   ZEPHEMERALONLOCALSESSION = -120, /*!< Attempt to create ephemeral node on a local session */
   ZNOWATCHER = -121, /*!< The watcher couldn't be found */
-  ZRECONFIGDISABLED = -123 /*!< Attempts to perform a reconfiguration operation when reconfiguration feature is disabled */
+  ZRECONFIGDISABLED = -123, /*!< Attempts to perform a reconfiguration operation when reconfiguration feature is disabled */
+  ZSESSIONCLOSEDREQUIRESASLAUTH = -124 /*!< The session has been closed by server because server requires client to do SASL authentication, but client is not configured with SASL authentication or configuted with SASL but failed (i.e. wrong credential used.). */
 };
 
 #ifdef __cplusplus
diff --git a/zookeeper-client/zookeeper-client-c/tests/TestServerRequireClientSASLAuth.cc b/zookeeper-client/zookeeper-client-c/tests/TestServerRequireClientSASLAuth.cc
new file mode 100644
index 0000000..2c5290d
--- /dev/null
+++ b/zookeeper-client/zookeeper-client-c/tests/TestServerRequireClientSASLAuth.cc
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cppunit/extensions/HelperMacros.h>
+#include "CppAssertHelper.h"
+
+#include <sys/socket.h>
+#include <unistd.h>
+
+#include <zookeeper.h>
+
+#include "Util.h"
+#include "WatchUtil.h"
+
+ZOOAPI int zoo_create2(zhandle_t *zh, const char *path, const char *value,
+        int valuelen, const struct ACL_vector *acl, int mode,
+        char *path_buffer, int path_buffer_len, struct Stat *stat);
+
+class Zookeeper_serverRequireClientSASL : public CPPUNIT_NS::TestFixture {
+    CPPUNIT_TEST_SUITE(Zookeeper_serverRequireClientSASL);
+#ifdef THREADED
+    CPPUNIT_TEST(testServerRequireClientSASL);
+#endif
+    CPPUNIT_TEST_SUITE_END();
+    FILE *logfile;
+    static const char hostPorts[];
+    static void watcher(zhandle_t *, int type, int state, const char *path,void*v){
+        watchctx_t *ctx = (watchctx_t*)v;
+
+        if (state == ZOO_CONNECTED_STATE) {
+            ctx->connected = true;
+        } else {
+            ctx->connected = false;
+        }
+        if (type != ZOO_SESSION_EVENT) {
+            evt_t evt;
+            evt.path = path;
+            evt.type = type;
+            ctx->putEvent(evt);
+        }
+    }
+
+public:
+    Zookeeper_serverRequireClientSASL() {
+      logfile = openlogfile("Zookeeper_serverRequireClientSASL");
+    }
+
+    ~Zookeeper_serverRequireClientSASL() {
+      if (logfile) {
+        fflush(logfile);
+        fclose(logfile);
+        logfile = 0;
+      }
+    }
+
+    void setUp() {
+        zoo_set_log_stream(logfile);
+    }
+
+    void startServer() {
+        char cmd[1024];
+        sprintf(cmd, "%s startRequireSASLAuth", ZKSERVER_CMD);
+        CPPUNIT_ASSERT(system(cmd) == 0);
+    }
+
+    void stopServer() {
+        char cmd[1024];
+        sprintf(cmd, "%s stop", ZKSERVER_CMD);
+        CPPUNIT_ASSERT(system(cmd) == 0);
+    }
+
+    void testServerRequireClientSASL() {
+        startServer();
+
+        watchctx_t ctx;
+        int rc = 0;
+        zhandle_t *zk = zookeeper_init(hostPorts, watcher, 10000, 0, &ctx, 0);
+        ctx.zh = zk;
+        CPPUNIT_ASSERT(zk);
+
+        char pathbuf[80];
+        struct Stat stat_a = {0};
+
+        rc = zoo_create2(zk, "/serverRequireClientSASL", "", 0,
+                         &ZOO_OPEN_ACL_UNSAFE, 0, pathbuf, sizeof(pathbuf), &stat_a);
+        CPPUNIT_ASSERT_EQUAL((int)ZSESSIONCLOSEDREQUIRESASLAUTH, rc);
+
+        stopServer();
+    }
+};
+
+const char Zookeeper_serverRequireClientSASL::hostPorts[] = "127.0.0.1:23456";
+
+CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_serverRequireClientSASL);
diff --git a/zookeeper-client/zookeeper-client-c/tests/zkServer.sh b/zookeeper-client/zookeeper-client-c/tests/zkServer.sh
index af558ea..ebc3df4 100755
--- a/zookeeper-client/zookeeper-client-c/tests/zkServer.sh
+++ b/zookeeper-client/zookeeper-client-c/tests/zkServer.sh
@@ -21,7 +21,7 @@ ZOOPORT=22181
 
 if [ "x$1" == "x" ]
 then
-    echo "USAGE: $0 startClean|start|startReadOnly|stop hostPorts"
+    echo "USAGE: $0 startClean|start|startReadOnly|startRequireSASLAuth|stop hostPorts"
     exit 2
 fi
 
@@ -180,6 +180,20 @@ startReadOnly)
     fi
 
     ;;
+startRequireSASLAuth)
+    if [ "x${base_dir}" == "x" ]
+    then
+        echo "this target is for unit tests only"
+        exit 2
+    else
+        mkdir -p "${base_dir}/build/tmp/zkdata"
+        java -cp "$CLASSPATH" -Dzookeeper.sessionRequireClientSASLAuth=true org.apache.zookeeper.server.ZooKeeperServerMain 23456 "${base_dir}/build/tmp/zkdata" 3000 $ZKMAXCNXNS &> "${base_dir}/build/tmp/zk.log" &
+        pid=$!
+        echo -n $pid > "${base_dir}/build/tmp/zk.pid"
+        sleep 3 # wait until server is up.
+    fi
+
+    ;;
 stop)
     # Already killed above
     ;;
diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
index c887f36..cb79a9f 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
@@ -1146,6 +1146,23 @@ encryption/authentication/authorization performed by the service.
     If the credential is not in the list, the connection request will be refused.
     This prevents a client accidentally connecting to a wrong ensemble.
 
+* *zookeeper.sessionRequireClientSASLAuth* :
+    (Java system property only: **zookeeper.sessionRequireClientSASLAuth**)
+    **New in 3.6.0:**
+    When set to **true**, ZooKeeper server will only accept connections and requests from clients
+    that have authenticated with server via SASL. Clients that are not configured with SASL
+    authentication, or configured with SASL but failed authentication (i.e. with invalid credential)
+    will not be able to establish a session with server. A typed error code (-124) will be delivered
+    in such case, both Java and C client will close the session with server thereafter,
+    without further attempts on retrying to reconnect.
+
+    By default, this feature is disabled. Users who would like to opt-in can enable the feature
+    by setting **zookeeper.sessionRequireClientSASLAuth** to **true**.
+
+    This feature overrules the <emphasis role="bold">zookeeper.allowSaslFailedClients</emphasis> option, so even if server is
+    configured to allow clients that fail SASL authentication to login, client will not be able to
+    establish a session with server if this feature is enabled.
+
 * *sslQuorum* :
     (Java system property: **zookeeper.sslQuorum**)
     **New in 3.5.5:**
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/KeeperException.java b/zookeeper-server/src/main/java/org/apache/zookeeper/KeeperException.java
index f797bb0..42f7a33 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/KeeperException.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/KeeperException.java
@@ -144,6 +144,8 @@ public abstract class KeeperException extends Exception {
                 return new NoWatcherException();
             case RECONFIGDISABLED:
                 return new ReconfigDisabledException();
+            case SESSIONCLOSEDREQUIRESASLAUTH:
+                return new SessionClosedRequireAuthException();
             case REQUESTTIMEOUT:
                 return new RequestTimeoutException();
             case OK:
@@ -397,7 +399,11 @@ public abstract class KeeperException extends Exception {
         /** Request not completed within max allowed time.*/
         REQUESTTIMEOUT (-122),
         /** Attempts to perform a reconfiguration operation when reconfiguration feature is disabled. */
-        RECONFIGDISABLED(-123);
+        RECONFIGDISABLED(-123),
+        /** The session has been closed by server because server requires client to do SASL authentication,
+         *  but client is not configured with SASL authentication or configuted with SASL but failed
+         *  (i.e. wrong credential used.). */
+        SESSIONCLOSEDREQUIRESASLAUTH(-124);
 
         private static final Map<Integer,Code> lookup
             = new HashMap<Integer,Code>();
@@ -484,6 +490,8 @@ public abstract class KeeperException extends Exception {
                 return "No such watcher";
             case RECONFIGDISABLED:
                 return "Reconfig is disabled";
+            case SESSIONCLOSEDREQUIRESASLAUTH:
+                return "Session closed because client failed to authenticate";
             default:
                 return "Unknown error " + code;
         }
@@ -849,6 +857,16 @@ public abstract class KeeperException extends Exception {
     }
 
     /**
+     * @see Code#SESSIONCLOSEDREQUIRESASLAUTH
+     */
+    public static class SessionClosedRequireAuthException extends KeeperException {
+        public SessionClosedRequireAuthException() { super(Code.SESSIONCLOSEDREQUIRESASLAUTH); }
+        public SessionClosedRequireAuthException(String path) {
+            super(Code.SESSIONCLOSEDREQUIRESASLAUTH, path);
+        }
+    }
+
+    /**
      * @see Code#REQUESTTIMEOUT
      */
     public static class RequestTimeoutException extends KeeperException {
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 4258916..f818fa1 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -89,6 +89,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     protected static final Logger LOG;
 
     public static final String GLOBAL_OUTSTANDING_LIMIT = "zookeeper.globalOutstandingLimit";
+    public static final String ALLOW_SASL_FAILED_CLIENTS = "zookeeper.allowSaslFailedClients";
+    public static final String SESSION_REQUIRE_CLIENT_SASL_AUTH = "zookeeper.sessionRequireClientSASLAuth";
+    public static final String SASL_AUTH_SCHEME = "sasl";
 
     static {
         LOG = LoggerFactory.getLogger(ZooKeeperServer.class);
@@ -1379,23 +1382,45 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
             }
             return;
         } else if (h.getType() == OpCode.sasl) {
-            Record rsp = processSasl(incomingBuffer,cnxn);
-            ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
-            cnxn.sendResponse(rh,rsp, "response"); // not sure about 3rd arg..what is it?
-            return;
+            processSasl(incomingBuffer,cnxn, h);
         } else {
+          if (shouldRequireClientSaslAuth() && !hasCnxSASLAuthenticated(cnxn)) {
+            ReplyHeader replyHeader = new ReplyHeader(h.getXid(), 0,
+                Code.SESSIONCLOSEDREQUIRESASLAUTH.intValue());
+            cnxn.sendResponse(replyHeader, null, "response");
+            cnxn.sendCloseSession();
+            cnxn.disableRecv();
+          } else {
             Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
-              h.getType(), incomingBuffer, cnxn.getAuthInfo());
+                h.getType(), incomingBuffer, cnxn.getAuthInfo());
             si.setOwner(ServerCnxn.me);
             // Always treat packet from the client as a possible
             // local request.
             setLocalSessionFlag(si);
             submitRequest(si);
-            return;
+          }
         }
     }
 
-    private Record processSasl(ByteBuffer incomingBuffer, ServerCnxn cnxn) throws IOException {
+  private static boolean shouldAllowSaslFailedClientsConnect() {
+    return Boolean.getBoolean(ALLOW_SASL_FAILED_CLIENTS);
+  }
+
+  private static boolean shouldRequireClientSaslAuth() {
+    return Boolean.getBoolean(SESSION_REQUIRE_CLIENT_SASL_AUTH);
+  }
+
+  private boolean hasCnxSASLAuthenticated(ServerCnxn cnxn) {
+    for (Id id : cnxn.getAuthInfo()) {
+      if (id.getScheme().equals(SASL_AUTH_SCHEME)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private void processSasl(ByteBuffer incomingBuffer, ServerCnxn cnxn,
+                           RequestHeader requestHeader) throws IOException {
         LOG.debug("Responding to client SASL token.");
         GetSASLRequest clientTokenRecord = new GetSASLRequest();
         ByteBufferInputStream.byteBuffer2Record(incomingBuffer,clientTokenRecord);
@@ -1418,27 +1443,40 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
                         cnxn.addAuthInfo(new Id("super", ""));
                     }
                 }
-            }
-            catch (SaslException e) {
-                LOG.warn("Client failed to SASL authenticate: " + e, e);
-                if ((System.getProperty("zookeeper.allowSaslFailedClients") != null)
-                  &&
-                  (System.getProperty("zookeeper.allowSaslFailedClients").equals("true"))) {
-                    LOG.warn("Maintaining client connection despite SASL authentication failure.");
+            } catch (SaslException e) {
+                LOG.warn("Client {} failed to SASL authenticate: {}",
+                    cnxn.getRemoteSocketAddress(), e);
+                if (shouldAllowSaslFailedClientsConnect() && !shouldRequireClientSaslAuth()) {
+                  LOG.warn("Maintaining client connection despite SASL authentication failure.");
                 } else {
+                  int error;
+                  if (shouldRequireClientSaslAuth()) {
+                    LOG.warn("Closing client connection due to server requires client SASL authenticaiton," +
+                        "but client SASL authentication has failed, or client is not configured with SASL " +
+                        "authentication.");
+                    error = Code.SESSIONCLOSEDREQUIRESASLAUTH.intValue();
+                  } else {
                     LOG.warn("Closing client connection due to SASL authentication failure.");
-                    cnxn.close(ServerCnxn.DisconnectReason.SASL_AUTH_FAILURE);
+                    error = Code.AUTHFAILED.intValue();
+                  }
+
+                  ReplyHeader replyHeader = new ReplyHeader(requestHeader.getXid(), 0, error);
+                  cnxn.sendResponse(replyHeader, new SetSASLResponse(null), "response");
+                  cnxn.sendCloseSession();
+                  cnxn.disableRecv();
+                  return;
                 }
             }
-        }
-        catch (NullPointerException e) {
+        } catch (NullPointerException e) {
             LOG.error("cnxn.saslServer is null: cnxn object did not initialize its saslServer properly.");
         }
         if (responseToken != null) {
             LOG.debug("Size of server SASL response: " + responseToken.length);
         }
-        // wrap SASL response token to client inside a Response object.
-        return new SetSASLResponse(responseToken);
+
+        ReplyHeader replyHeader = new ReplyHeader(requestHeader.getXid(), 0, Code.OK.intValue());
+        Record record = new SetSASLResponse(responseToken);
+        cnxn.sendResponse(replyHeader, record, "response");
     }
 
     // entry point for quorum/Learner.java
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/SaslAuthRequiredFailNoSASLTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/SaslAuthRequiredFailNoSASLTest.java
new file mode 100644
index 0000000..1bb59d1
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/SaslAuthRequiredFailNoSASLTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SaslAuthRequiredFailNoSASLTest extends ClientBase {
+  @Before
+  public void setup() {
+    System.setProperty(SaslTestUtil.requireSASLAuthProperty, "true");
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    System.clearProperty(SaslTestUtil.requireSASLAuthProperty);
+  }
+
+  @Test
+  public void testClientOpWithoutSASLConfigured() throws Exception {
+    ZooKeeper zk = null;
+    CountdownWatcher watcher = new CountdownWatcher();
+    try {
+      zk = createClient(watcher);
+      zk.create("/foo", null, Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);
+      Assert.fail("Client is not configured with SASL authentication, so zk.create operation should fail.");
+    } catch(KeeperException e) {
+      Assert.assertTrue(e.code() == KeeperException.Code.SESSIONCLOSEDREQUIRESASLAUTH);
+      // Verify that "eventually" (within the bound of timeouts)
+      // this client closes the connection between itself and the server.
+      watcher.waitForDisconnected(SaslTestUtil.CLIENT_DISCONNECT_TIMEOUT);
+    } finally {
+      if (zk != null) {
+        zk.close();
+      }
+    }
+  }
+
+}
+
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/SaslAuthRequiredFailWrongSASLTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/SaslAuthRequiredFailWrongSASLTest.java
new file mode 100644
index 0000000..f6c6e33
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/SaslAuthRequiredFailWrongSASLTest.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.junit.Assert;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class SaslAuthRequiredFailWrongSASLTest extends ClientBase {
+  @BeforeClass
+  public static void setUpBeforeClass() {
+    System.setProperty(SaslTestUtil.requireSASLAuthProperty, "true");
+    System.setProperty(SaslTestUtil.authProviderProperty, SaslTestUtil.authProvider);
+    System.setProperty(SaslTestUtil.jaasConfig,
+        SaslTestUtil.createJAASConfigFile("jaas_wrong.conf", "test1"));
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() {
+    System.clearProperty(SaslTestUtil.requireSASLAuthProperty);
+    System.clearProperty(SaslTestUtil.authProviderProperty);
+    System.clearProperty(SaslTestUtil.jaasConfig);
+  }
+
+  @Test
+  public void testClientOpWithFailedSASLAuth() throws Exception {
+    ZooKeeper zk = null;
+    CountdownWatcher watcher = new CountdownWatcher();
+    try {
+      zk = createClient(watcher);
+      zk.create("/bar", null, Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);
+      Assert.fail("Client with wrong SASL config should not pass SASL authentication.");
+    } catch(KeeperException e) {
+      Assert.assertTrue(e.code() == KeeperException.Code.AUTHFAILED);
+      // Verify that "eventually" this client closes the connection between itself and the server.
+      watcher.waitForDisconnected(SaslTestUtil.CLIENT_DISCONNECT_TIMEOUT);
+    } finally {
+      if (zk != null) {
+        zk.close();
+      }
+    }
+  }
+}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/SaslAuthRequiredTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/SaslAuthRequiredTest.java
new file mode 100644
index 0000000..54b0c58
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/SaslAuthRequiredTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class SaslAuthRequiredTest extends ClientBase {
+  @BeforeClass
+  public static void setUpBeforeClass() {
+    System.setProperty(SaslTestUtil.requireSASLAuthProperty, "true");
+    System.setProperty(SaslTestUtil.authProviderProperty, SaslTestUtil.authProvider);
+    System.setProperty(SaslTestUtil.jaasConfig,
+        SaslTestUtil.createJAASConfigFile("jaas.conf", "test"));
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() {
+    System.clearProperty(SaslTestUtil.requireSASLAuthProperty);
+    System.clearProperty(SaslTestUtil.authProviderProperty);
+    System.clearProperty(SaslTestUtil.jaasConfig);
+  }
+
+  @Test
+  public void testClientOpWithValidSASLAuth() throws Exception {
+    ZooKeeper zk = null;
+    CountdownWatcher watcher = new CountdownWatcher();
+    try {
+      zk = createClient(watcher);
+      zk.create("/foobar", null, Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);
+    } catch(KeeperException e) {
+      Assert.fail("Client operation should succeed with valid SASL configuration.");
+    } finally {
+      if (zk != null) {
+        zk.close();
+      }
+    }
+  }
+
+}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/SaslTestUtil.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/SaslTestUtil.java
new file mode 100644
index 0000000..4e0a337
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/SaslTestUtil.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+
+import org.junit.Assert;
+
+public class SaslTestUtil extends ClientBase {
+  // The maximum time (in milliseconds) a client should take to observe
+  // a disconnect event of the same client from server.
+  static Integer CLIENT_DISCONNECT_TIMEOUT = 3000;
+  static String requireSASLAuthProperty = "zookeeper.sessionRequireClientSASLAuth";
+  static String authProviderProperty = "zookeeper.authProvider.1";
+  static String authProvider = "org.apache.zookeeper.server.auth.SASLAuthenticationProvider";
+  static String digestLoginModule = "org.apache.zookeeper.server.auth.DigestLoginModule";
+  static String jaasConfig = "java.security.auth.login.config";
+
+  static String createJAASConfigFile(String fileName, String password) {
+    String ret = null;
+    try {
+      File tmpDir = createTmpDir();
+      File jaasFile = new File(tmpDir, fileName);
+      FileWriter fwriter = new FileWriter(jaasFile);
+      fwriter.write("" +
+          "Server {\n" +
+          "          " + digestLoginModule + " required\n" +
+          "          user_super=\"test\";\n" +
+          "};\n" +
+          "Client {\n" +
+          "       " + digestLoginModule + " required\n" +
+          "       username=\"super\"\n" +
+          "       password=\"" + password + "\";\n" +
+          "};" + "\n");
+      fwriter.close();
+      ret = jaasFile.getAbsolutePath();
+    } catch (IOException e) {
+      Assert.fail("Unable to create JaaS configuration file!");
+    }
+
+    return ret;
+  }
+}