You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by nk...@apache.org on 2020/01/22 11:54:05 UTC

[zookeeper] branch master updated: ZOOKEEPER-1112: Add (Cyrus) SASL authentication support to C client library

This is an automated email from the ASF dual-hosted git repository.

nkalmar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new c234848  ZOOKEEPER-1112: Add (Cyrus) SASL authentication support to C client library
c234848 is described below

commit c234848caef0cc920f19befd1b0b14251b17da92
Author: Damien Diederen <dd...@crosstwine.com>
AuthorDate: Wed Jan 22 12:53:55 2020 +0100

    ZOOKEEPER-1112: Add (Cyrus) SASL authentication support to C client library
    
    This is a "respin" of https://github.com/apache/zookeeper/pull/1054, which I withdrew due to some annoying shortcomings.
    
    This changeset allows C clients to use SASL to authenticate with the ZooKeeper server.  It is loosely based on patches #1 and #2 by Tom Klonikowski, at https://reviews.apache.org/r/2252/, but the result has been extensively reworked to follow the semantics of the Java client:
    
      * No SASL operations are exposed through the API;
    
      * The configuration is provided, and stored, at "handle init time";
    
      * SASL authentication is automatically performed after each (re)connect.
    
    It introduces an optional dependency on the Cyrus SASL library, which can either be autodetected (default) or configured using the `--without-sasl`/`--with-sasl[=DIR]` flags, or -DWITH_CYRUS_SASL for CMake/Windows.
    
    `TestServerRequireClientSASLAuth.cc` has been renamed to `TestSASLAuth.cc`, and a test has been added which successfully (re)authenticates using the `DIGEST-MD5` mechanism.  The code has also been used to successfully authenticate clients via `GSSAPI`/Kerberos.
    
    This commit also adds SASL support to the `cli.c` client.
    
    Co-authored-by: Tom Klonikowski <klonik_tinformatik.haw-hamburg.de>
    
    Author: Damien Diederen <dd...@crosstwine.com>
    
    Reviewers: Mate Szalay-Beko <sz...@gmail.com>, Norbert Kalmar <nk...@apache.org>
    
    Closes #1134 from ztzg/ZOOKEEPER-1112-c-client-sasl-support-v2
---
 README_packaging.md                                |   8 +-
 dev/docker/Dockerfile                              |   2 +-
 pom.xml                                            |   3 +-
 tools/cmake/Modules/FindCyrusSASL.cmake            |  54 ++
 zookeeper-client/zookeeper-client-c/CMakeLists.txt |  25 +
 zookeeper-client/zookeeper-client-c/LICENSE        |  50 ++
 zookeeper-client/zookeeper-client-c/Makefile.am    |  22 +-
 zookeeper-client/zookeeper-client-c/README         |   2 +
 zookeeper-client/zookeeper-client-c/configure.ac   |  30 ++
 .../zookeeper-client-c/include/proto.h             |   1 +
 .../zookeeper-client-c/include/zookeeper.h         |  91 ++++
 zookeeper-client/zookeeper-client-c/pom.xml        |   1 +
 zookeeper-client/zookeeper-client-c/src/cli.c      | 110 ++++-
 .../zookeeper-client-c/src/zk_adaptor.h            |   7 +
 zookeeper-client/zookeeper-client-c/src/zk_sasl.c  | 541 +++++++++++++++++++++
 zookeeper-client/zookeeper-client-c/src/zk_sasl.h  | 154 ++++++
 .../zookeeper-client-c/src/zookeeper.c             | 195 +++++++-
 .../zookeeper-client-c/tests/TestSASLAuth.cc       | 243 +++++++++
 .../tests/TestServerRequireClientSASLAuth.cc       | 109 -----
 .../zookeeper-client-c/tests/zkServer.sh           |  16 +-
 20 files changed, 1509 insertions(+), 155 deletions(-)

diff --git a/README_packaging.md b/README_packaging.md
index b290dd9..5c9da96 100644
--- a/README_packaging.md
+++ b/README_packaging.md
@@ -9,7 +9,7 @@ http://bigtop.apache.org/
 ## Requirements
 
 - you need maven to build the java code
-- gcc, cppunit, openssl and python-setuptools are required to build C and python bindings. (only needed when using `-Pfull-build`)
+- gcc, cppunit, openssl and python-setuptools are required to build C and python bindings (only needed when using `-Pfull-build`).  Cyrus SASL is optional, but recommended for a maximally functional client.
 
 On RHEL machine:
 
@@ -17,6 +17,7 @@ On RHEL machine:
 yum install cppunit
 yum install python-setuptools
 yum install openssl openssl-devel
+yum install cyrus-sasl-md5 cyrus-sasl-gssapi cyrus-sasl-devel
 ```
 
 On Ubuntu:
@@ -25,6 +26,7 @@ On Ubuntu:
 apt-get install cppunit
 apt-get install python-setuptools
 apt-get install openssl libssl-dev
+apt-get install libsasl2-modules-gssapi-mit libsasl2-modules libsasl2-dev
 ```
 
 
@@ -63,7 +65,9 @@ Optional parameters you might consider when using maven:
                               Use `-Dc-client-openssl=no` to explicitly disable SSL feature in C client. Or use 
                               `-Dc-client-openssl=/path/to/openssl/` if you want to use a non-default / specific 
                               openssl library location.
-    
+-  `-Dc-client-sasl`      -   specify SASL support and Cyrus SASL 1.x library location.  Works similarly to the
+                              `c-client-openssl` flag above (`yes`, `no`, or path).
+
 Please note: if you don't provide the `-Pfull-build` parameter, then the C client will not be built, the C client tests
 will not be executed and the previous C client builds will no be cleaned up (e.g. with simply using `mvn clean`).
 
diff --git a/dev/docker/Dockerfile b/dev/docker/Dockerfile
index cead98a..8d148ee 100644
--- a/dev/docker/Dockerfile
+++ b/dev/docker/Dockerfile
@@ -20,4 +20,4 @@
 FROM maven:3.6.3-jdk-8
 
 RUN apt-get update
-RUN apt-get install -y g++ cmake autoconf libcppunit-dev libtool openssl libssl-dev
+RUN apt-get install -y g++ cmake autoconf libcppunit-dev libtool openssl libssl-dev libsasl2-modules-gssapi-mit libsasl2-modules libsasl2-dev
diff --git a/pom.xml b/pom.xml
index 9f397c6..c00701c 100755
--- a/pom.xml
+++ b/pom.xml
@@ -339,8 +339,9 @@
     <spotbugsannotations.version>3.1.9</spotbugsannotations.version>
     <checkstyle.version>8.17</checkstyle.version>
 
-    <!-- parameter to pass to C client build -->
+    <!-- parameters to pass to C client build -->
     <c-client-openssl>yes</c-client-openssl>
+    <c-client-sasl>yes</c-client-sasl>
 
   </properties>
 
diff --git a/tools/cmake/Modules/FindCyrusSASL.cmake b/tools/cmake/Modules/FindCyrusSASL.cmake
new file mode 100644
index 0000000..224a8ce
--- /dev/null
+++ b/tools/cmake/Modules/FindCyrusSASL.cmake
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# - Find Cyrus SASL (sasl.h, libsasl2.so)
+#
+# This module defines
+#  CYRUS_SASL_INCLUDE_DIR, directory containing headers
+#  CYRUS_SASL_SHARED_LIB, path to Cyrus SASL's shared library
+#  CYRUS_SASL_FOUND, whether Cyrus SASL and its plugins have been found
+#
+# It also defines the following IMPORTED targets:
+#  CyrusSASL
+#
+# Hints:
+#  Set CYRUS_SASL_ROOT_DIR to the root directory of a Cyrus SASL installation.
+#
+# The initial version of this file was extracted from
+# https://github.com/cloudera/kudu, at the following commit:
+#
+#  commit 9806863e78107505a622b44112a897189d9b3c24
+#  Author: Dan Burkert <da...@cloudera.com>
+#  Date:   Mon Nov 30 12:15:36 2015 -0800
+#
+#      Enable C++11
+
+find_path(CYRUS_SASL_INCLUDE_DIR sasl/sasl.h HINTS "${CYRUS_SASL_ROOT_DIR}/include")
+find_library(CYRUS_SASL_SHARED_LIB sasl2 HINTS "${CYRUS_SASL_ROOT_DIR}/lib")
+
+include(FindPackageHandleStandardArgs)
+find_package_handle_standard_args(CYRUS_SASL REQUIRED_VARS
+  CYRUS_SASL_SHARED_LIB CYRUS_SASL_INCLUDE_DIR)
+
+if(CYRUS_SASL_FOUND)
+  if(NOT TARGET CyrusSASL)
+    add_library(CyrusSASL UNKNOWN IMPORTED)
+    set_target_properties(CyrusSASL PROPERTIES
+      INTERFACE_INCLUDE_DIRECTORIES "${CYRUS_SASL_INCLUDE_DIR}"
+      IMPORTED_LINK_INTERFACE_LANGUAGES "C"
+      IMPORTED_LOCATION "${CYRUS_SASL_SHARED_LIB}")
+  endif()
+endif()
diff --git a/zookeeper-client/zookeeper-client-c/CMakeLists.txt b/zookeeper-client/zookeeper-client-c/CMakeLists.txt
index a8fe2fd..8400c10 100644
--- a/zookeeper-client/zookeeper-client-c/CMakeLists.txt
+++ b/zookeeper-client/zookeeper-client-c/CMakeLists.txt
@@ -20,6 +20,8 @@ project(zookeeper VERSION 3.7.0)
 set(email user@zookeeper.apache.org)
 set(description "zookeeper C client")
 
+list(APPEND CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/../../tools/cmake/Modules")
+
 # general options
 if(UNIX)
   add_compile_options(-Wall -fPIC)
@@ -61,6 +63,20 @@ if(WANT_SOCK_CLOEXEC AND HAVE_SOCK_CLOEXEC)
   set(SOCK_CLOEXEC_ENABLED 1)
 endif()
 
+# Cyrus SASL 2.x
+option(WITH_CYRUS_SASL "turn ON/OFF Cyrus SASL 2.x support, or define SASL library location (default: ON)" ON)
+message("-- using WITH_CYRUS_SASL=${WITH_CYRUS_SASL}")
+if(NOT WITH_CYRUS_SASL STREQUAL "OFF")
+  if(NOT WITH_CYRUS_SASL STREQUAL "ON")
+    set(CYRUS_SASL_ROOT_DIR "${WITH_CYRUS_SASL}")
+  endif()
+  find_package(CyrusSASL)
+  if(CYRUS_SASL_FOUND)
+    message("-- Cyrus SASL 2.x found! will build with SASL support.")
+  else()
+    message("-- WARNING: unable to find Cyrus SASL 2.x! will build without SASL support.")
+  endif()
+endif()
 
 # The function `to_have(in out)` converts a header name like `arpa/inet.h`
 # into an Autotools style preprocessor definition `HAVE_ARPA_INET_H`.
@@ -171,6 +187,10 @@ else()
   list(APPEND zookeeper_sources src/st_adaptor.c)
 endif()
 
+if(CYRUS_SASL_FOUND)
+  list(APPEND zookeeper_sources src/zk_sasl.c)
+endif()
+
 if(WIN32)
   list(APPEND zookeeper_sources src/winport.c)
 endif()
@@ -203,6 +223,11 @@ if(WANT_SYNCAPI AND NOT WIN32)
   target_link_libraries(zookeeper PUBLIC Threads::Threads)
 endif()
 
+if(CYRUS_SASL_FOUND)
+  target_compile_definitions(zookeeper PUBLIC HAVE_CYRUS_SASL_H)
+  target_link_libraries(zookeeper PUBLIC CyrusSASL)
+endif()
+
 # cli executable
 add_executable(cli src/cli.c)
 target_link_libraries(cli zookeeper)
diff --git a/zookeeper-client/zookeeper-client-c/LICENSE b/zookeeper-client/zookeeper-client-c/LICENSE
index 863a221..7df01ad 100644
--- a/zookeeper-client/zookeeper-client-c/LICENSE
+++ b/zookeeper-client/zookeeper-client-c/LICENSE
@@ -333,3 +333,53 @@
  * copied and put under another distribution licence
  * [including the GNU Public Licence.]
  */
+
+===========================================================================================
+===       The following part contains the license for the Cyrus SASL 2.x library        ===
+===                           used for optional SASL support                            ===
+===========================================================================================
+
+/* CMU libsasl
+ * Tim Martin
+ * Rob Earhart
+ * Rob Siemborski
+ */
+/* 
+ * Copyright (c) 1998-2003 Carnegie Mellon University.  All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer. 
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in
+ *    the documentation and/or other materials provided with the
+ *    distribution.
+ *
+ * 3. The name "Carnegie Mellon University" must not be used to
+ *    endorse or promote products derived from this software without
+ *    prior written permission. For permission or any other legal
+ *    details, please contact  
+ *      Office of Technology Transfer
+ *      Carnegie Mellon University
+ *      5000 Forbes Avenue
+ *      Pittsburgh, PA  15213-3890
+ *      (412) 268-4387, fax: (412) 268-7395
+ *      tech-transfer@andrew.cmu.edu
+ *
+ * 4. Redistributions of any form whatsoever must retain the following
+ *    acknowledgment:
+ *    "This product includes software developed by Computing Services
+ *     at Carnegie Mellon University (http://www.cmu.edu/computing/)."
+ *
+ * CARNEGIE MELLON UNIVERSITY DISCLAIMS ALL WARRANTIES WITH REGARD TO
+ * THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+ * AND FITNESS, IN NO EVENT SHALL CARNEGIE MELLON UNIVERSITY BE LIABLE
+ * FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN
+ * AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING
+ * OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
diff --git a/zookeeper-client/zookeeper-client-c/Makefile.am b/zookeeper-client/zookeeper-client-c/Makefile.am
index 34ef012..9c794a5 100644
--- a/zookeeper-client/zookeeper-client-c/Makefile.am
+++ b/zookeeper-client/zookeeper-client-c/Makefile.am
@@ -13,10 +13,16 @@ if WANT_OPENSSL
   OPENSSL_LIB_LDFLAGS = -lssl -lcrypto
 endif
 
-AM_CPPFLAGS = -I${srcdir}/include -I${srcdir}/tests -I${srcdir}/generated $(SOLARIS_CPPFLAGS) $(OPENSSL_CPPFLAGS)
+if WANT_SASL
+  SASL_CPPFLAGS = -DHAVE_CYRUS_SASL_H
+  SASL_LIB_LDFLAGS = -lsasl2
+  SASL_SRC = src/zk_sasl.c
+endif
+
+AM_CPPFLAGS = -I${srcdir}/include -I${srcdir}/tests -I${srcdir}/generated $(SOLARIS_CPPFLAGS) $(OPENSSL_CPPFLAGS) $(SASL_CPPFLAGS)
 AM_CFLAGS = -Wall -Werror -Wdeclaration-after-statement
 AM_CXXFLAGS = -Wall $(USEIPV6)
-LIB_LDFLAGS = -no-undefined -version-info 2 $(SOLARIS_LIB_LDFLAGS) $(OPENSSL_LIB_LDFLAGS)
+LIB_LDFLAGS = -no-undefined -version-info 2 $(SOLARIS_LIB_LDFLAGS) $(OPENSSL_LIB_LDFLAGS) $(SASL_LIB_LDFLAGS)
 
 # Additional flags for coverage testing (if enabled)
 if ENABLEGCOV
@@ -37,7 +43,7 @@ COMMON_SRC = src/zookeeper.c include/zookeeper.h include/zookeeper_version.h inc
     src/recordio.c include/recordio.h include/proto.h \
     src/zk_adaptor.h generated/zookeeper.jute.c \
     src/zk_log.c src/zk_hashtable.h src/zk_hashtable.c \
-	src/addrvec.h src/addrvec.c
+    src/addrvec.h src/addrvec.c $(SASL_SRC)
 
 # These are the symbols (classes, mostly) we want to export from our library.
 EXPORT_SYMBOLS = '(zoo_|zookeeper_|zhandle|Z|format_log_message|log_message|logLevel|deallocate_|allocate_|zerror|is_unrecoverable)'
@@ -67,13 +73,13 @@ endif
 bin_PROGRAMS = cli_st
 
 cli_st_SOURCES = src/cli.c
-cli_st_LDADD = libzookeeper_st.la
+cli_st_LDADD = libzookeeper_st.la $(SASL_LIB_LDFLAGS)
 
 if WANT_SYNCAPI
 bin_PROGRAMS += cli_mt load_gen
 
 cli_mt_SOURCES = src/cli.c
-cli_mt_LDADD = libzookeeper_mt.la
+cli_mt_LDADD = libzookeeper_mt.la $(SASL_LIB_LDFLAGS)
 cli_mt_CFLAGS = -DTHREADED
 
 load_gen_SOURCES = src/load_gen.c
@@ -113,7 +119,7 @@ TEST_SOURCES = \
 	tests/ZooKeeperQuorumServer.h \
 	tests/TestReadOnlyClient.cc \
 	tests/TestLogClientEnv.cc \
-    tests/TestServerRequireClientSASLAuth.cc \
+        tests/TestSASLAuth.cc \
 	$(NULL)
 
 if SOLARIS
@@ -127,14 +133,14 @@ check_PROGRAMS = zktest-st
 TESTS_ENVIRONMENT = ZKROOT=${srcdir}/../.. \
                     CLASSPATH=$$CLASSPATH:$$CLOVER_HOME/lib/clover*.jar
 nodist_zktest_st_SOURCES = $(TEST_SOURCES)
-zktest_st_LDADD = libzkst.la libhashtable.la $(CPPUNIT_LIBS) $(OPENSSL_LIB_LDFLAGS) -ldl
+zktest_st_LDADD = libzkst.la libhashtable.la $(CPPUNIT_LIBS) $(OPENSSL_LIB_LDFLAGS) $(SASL_LIB_LDFLAGS) -ldl
 zktest_st_CXXFLAGS = -DUSE_STATIC_LIB $(CPPUNIT_CFLAGS) $(USEIPV6) $(SOLARIS_CPPFLAGS)
 zktest_st_LDFLAGS = -shared $(SYMBOL_WRAPPERS) $(SOLARIS_LIB_LDFLAGS)
 
 if WANT_SYNCAPI
   check_PROGRAMS += zktest-mt
   nodist_zktest_mt_SOURCES = $(TEST_SOURCES) tests/PthreadMocks.cc
-  zktest_mt_LDADD = libzkmt.la libhashtable.la -lpthread $(CPPUNIT_LIBS) $(OPENSSL_LIB_LDFLAGS) -ldl
+  zktest_mt_LDADD = libzkmt.la libhashtable.la -lpthread $(CPPUNIT_LIBS) $(OPENSSL_LIB_LDFLAGS) $(SASL_LIB_LDFLAGS) -ldl
   zktest_mt_CXXFLAGS = -DUSE_STATIC_LIB -DTHREADED $(CPPUNIT_CFLAGS) $(USEIPV6)
 if SOLARIS
   SHELL_SYMBOL_WRAPPERS_MT = cat ${srcdir}/tests/wrappers-mt.opt
diff --git a/zookeeper-client/zookeeper-client-c/README b/zookeeper-client/zookeeper-client-c/README
index 0816f62..ade5ad9 100644
--- a/zookeeper-client/zookeeper-client-c/README
+++ b/zookeeper-client/zookeeper-client-c/README
@@ -83,6 +83,8 @@ Follow steps 1 and 2 above, and then continue here.
    -DWANT_CPPUNIT      ON except on Windows, OFF disables the tests
    -DWITH_OPENSSL      ON by default, OFF disables the SSL support. You can also
                        specify a custom path by -DWITH_OPENSSL=/path/to/openssl/
+   -DWITH_CYRUS_SASL   ON by default, OFF disables SASL support. You can also
+                       specify a custom path by -DWITH_CYRUS_SASL=/path/to/cyrus-sasl/
    -DBUILD_SHARED_LIBS not yet supported, only static libraries are built
    other CMake options see "cmake --help" for generic options, such as generator
 
diff --git a/zookeeper-client/zookeeper-client-c/configure.ac b/zookeeper-client/zookeeper-client-c/configure.ac
index 4657d2e..8e24d65 100644
--- a/zookeeper-client/zookeeper-client-c/configure.ac
+++ b/zookeeper-client/zookeeper-client-c/configure.ac
@@ -143,6 +143,36 @@ fi
 
 AM_CONDITIONAL([WANT_SYNCAPI],[test "x$with_syncapi" != xno])
 
+dnl Cyrus SASL 2.x
+AC_ARG_WITH(sasl,
+ [AC_HELP_STRING([--with-sasl[=DIR]], [build with SASL support via Cyrus SASL 2.x (default=auto)])],
+ [], [with_sasl=yes])
+if test "x$with_sasl" != "xno"; then
+    saved_CPPFLAGS="$CPPFLAGS"
+    saved_LDFLAGS="$LDFLAGS"
+    if test "x$with_sasl" != "xyes" ; then
+        CPPFLAGS="$CPPFLAGS -I$with_sasl/include"
+        LDFLAGS="$LDFLAGS -L$with_sasl/lib"
+    fi
+    have_sasl=no
+    AC_CHECK_HEADER(sasl/sasl.h, [
+     AC_CHECK_LIB(sasl2, sasl_client_init, [have_sasl=yes])])
+    if test "x$have_sasl" != "xyes"; then
+        CPPFLAGS="$saved_CPPFLAGS"
+        LDFLAGS="$saved_LDFLAGS"
+    fi
+fi
+if test "x$with_sasl" != xno && test "x$have_sasl" = xno; then
+    AC_MSG_WARN([cannot build SASL support -- sasl2 not found])
+    with_sasl=no
+fi
+if test "x$with_sasl" != xno; then
+    AC_MSG_NOTICE([building with SASL support])
+else
+    AC_MSG_NOTICE([building without SASL support])
+fi
+AM_CONDITIONAL([WANT_SASL],[test "x$with_sasl" != xno])
+
 # Checks for header files.
 AC_HEADER_STDC
 AC_CHECK_HEADERS([arpa/inet.h fcntl.h netdb.h netinet/in.h stdlib.h string.h sys/socket.h sys/time.h unistd.h sys/utsname.h])
diff --git a/zookeeper-client/zookeeper-client-c/include/proto.h b/zookeeper-client/zookeeper-client-c/include/proto.h
index 88774ff..65afde9 100644
--- a/zookeeper-client/zookeeper-client-c/include/proto.h
+++ b/zookeeper-client/zookeeper-client-c/include/proto.h
@@ -46,6 +46,7 @@ extern "C" {
 #define ZOO_CLOSE_OP -11
 #define ZOO_SETAUTH_OP 100
 #define ZOO_SETWATCHES_OP 101
+#define ZOO_SASL_OP 102
 
 #ifdef __cplusplus
 }
diff --git a/zookeeper-client/zookeeper-client-c/include/zookeeper.h b/zookeeper-client/zookeeper-client-c/include/zookeeper.h
index 3a6a689..243fac2 100644
--- a/zookeeper-client/zookeeper-client-c/include/zookeeper.h
+++ b/zookeeper-client/zookeeper-client-c/include/zookeeper.h
@@ -39,6 +39,10 @@
 #include <stdio.h>
 #include <ctype.h>
 
+#ifdef HAVE_CYRUS_SASL_H
+#include <sasl/sasl.h>
+#endif /* HAVE_CYRUS_SASL_H */
+
 #include "proto.h"
 #include "zookeeper_version.h"
 #include "recordio.h"
@@ -575,6 +579,93 @@ ZOOAPI zhandle_t *zookeeper_init2(const char *host, watcher_fn fn,
   int recv_timeout, const clientid_t *clientid, void *context, int flags,
   log_callback_fn log_callback);
 
+#ifdef HAVE_CYRUS_SASL_H
+
+/**
+ * \brief zoo_sasl_params structure.
+ *
+ * This structure holds the SASL parameters for the connection.
+ *
+ * Its \c service, \c host and \c callbacks fields are used with Cyrus
+ * SASL's \c sasl_client_new; its \c mechlist field with \c
+ * sasl_client_start.  Please refer to these functions for precise
+ * semantics.
+ *
+ * Note while "string" parameters are copied into the ZooKeeper
+ * client, the callbacks array is simply referenced: its lifetime must
+ * therefore cover that of the handle.
+ */
+typedef struct zoo_sasl_params {
+  const char *service;          /*!< The service name, usually "zookeeper" */
+  const char *host;             /*!< The server name, e.g. "zk-sasl-md5" */
+  const char *mechlist;         /*!< Mechanisms to try, e.g. "DIGEST-MD5" */
+  const sasl_callback_t *callbacks;  /*!< List of callbacks */
+} zoo_sasl_params_t;
+
+/**
+ * \brief create a handle to communicate with zookeeper.
+ *
+ * This function is identical to \ref zookeeper_init2 except that it
+ * allows specifying optional SASL connection parameters.  It is only
+ * available if the client library was configured to link against the
+ * Cyrus SASL library, and only visible when \c HAVE_CYRUS_SASL_H is defined.
+ *
+ * This method creates a new handle and a zookeeper session that corresponds
+ * to that handle. Session establishment is asynchronous, meaning that the
+ * session should not be considered established until (and unless) an
+ * event of state ZOO_CONNECTED_STATE is received.
+ * \param host comma separated host:port pairs, each corresponding to a zk
+ *   server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+ * \param fn the global watcher callback function. When notifications are
+ *   triggered this function will be invoked.
+ * \param clientid the id of a previously established session that this
+ *   client will be reconnecting to. Pass 0 if not reconnecting to a previous
+ *   session. Clients can access the session id of an established, valid,
+ *   connection by calling \ref zoo_client_id. If the session corresponding to
+ *   the specified clientid has expired, or if the clientid is invalid for
+ *   any reason, the returned zhandle_t will be invalid -- the zhandle_t
+ *   state will indicate the reason for failure (typically
+ *   ZOO_EXPIRED_SESSION_STATE).
+ * \param context the handback object that will be associated with this instance
+ *   of zhandle_t. Application can access it (for example, in the watcher
+ *   callback) using \ref zoo_get_context. The object is not used by zookeeper
+ *   internally and can be null.
+ * \param flags reserved for future use. Should be set to zero.
+ * \param log_callback All log messages will be passed to this callback function.
+ *   For more details see \ref zoo_get_log_callback and \ref zoo_set_log_callback.
+ * \param sasl_params a pointer to a \ref zoo_sasl_params_t structure
+ *   specifying SASL connection parameters, or NULL to skip SASL
+ *   authentication
+ * \return a pointer to the opaque zhandle structure. If it fails to create
+ * a new zhandle the function returns NULL and the errno variable
+ * indicates the reason.
+ */
+ZOOAPI zhandle_t *zookeeper_init_sasl(const char *host, watcher_fn fn,
+  int recv_timeout, const clientid_t *clientid, void *context, int flags,
+  log_callback_fn log_callback, zoo_sasl_params_t *sasl_params);
+
+/**
+ * \brief allocates and initializes a basic array of Cyrus SASL callbacks.
+ *
+ * This small helper function makes it easy to pass "static"
+ * parameters to Cyrus SASL's underlying callback-based API.  Its use
+ * is not mandatory; you can still implement interactive dialogs by
+ * defining your own callbacks.
+ *
+ * \param user the "canned" response to \c SASL_CB_USER and \c SASL_CB_AUTHNAME,
+ *   or NULL for none
+ * \param realm the "canned" response to \c SASL_CB_GETREALM, or NULL for none
+ * \param password_file the name of a file whose first line is read in
+ *   response to \c SASL_CB_PASS, or NULL for none
+ * \return the freshly-malloc()ed callbacks array, or NULL if allocation
+ *   failed.  Deallocate with free(), but only after the corresponding
+ *   ZooKeeper handle is closed.
+ */
+ZOOAPI sasl_callback_t *zoo_sasl_make_basic_callbacks(const char *user,
+  const char *realm, const char* password_file);
+
+#endif /* HAVE_CYRUS_SASL_H */
+
 /**
  * \brief update the list of servers this client will connect to.
  *
diff --git a/zookeeper-client/zookeeper-client-c/pom.xml b/zookeeper-client/zookeeper-client-c/pom.xml
index bff945a..e8ea1aa 100755
--- a/zookeeper-client/zookeeper-client-c/pom.xml
+++ b/zookeeper-client/zookeeper-client-c/pom.xml
@@ -88,6 +88,7 @@
               </environmentVariables>
               <arguments>
                 <argument>--with-openssl=${c-client-openssl}</argument>
+                <argument>--with-sasl=${c-client-sasl}</argument>
                 <argument>--prefix=${project.build.directory}/c</argument>
                 <argument>${c-test-coverage-arg}</argument>
               </arguments>
diff --git a/zookeeper-client/zookeeper-client-c/src/cli.c b/zookeeper-client/zookeeper-client-c/src/cli.c
index 34f2b99..1864e56 100644
--- a/zookeeper-client/zookeeper-client-c/src/cli.c
+++ b/zookeeper-client/zookeeper-client-c/src/cli.c
@@ -757,6 +757,15 @@ int main(int argc, char **argv) {
             {"cmd",      required_argument, NULL, 'c'}, //cmd
             {"readonly", no_argument, NULL, 'r'}, //read-only
             {"debug",    no_argument, NULL, 'd'}, //set log level to DEBUG from the beginning
+#ifdef HAVE_CYRUS_SASL_H
+            // Parameters for SASL authentication.
+            {"service",       required_argument, NULL, 'z'},
+            {"server-fqdn",   required_argument, NULL, 'o'}, //Host used for SASL auth
+            {"mechlist",      required_argument, NULL, 'n'}, //SASL mechanism list
+            {"user",          required_argument, NULL, 'u'}, //SASL user
+            {"realm",         required_argument, NULL, 'l'}, //SASL realm
+            {"password-file", required_argument, NULL, 'p'},
+#endif /* HAVE_CYRUS_SASL_H */
             {NULL,      0,                 NULL, 0},
     };
 #ifndef THREADED
@@ -772,6 +781,14 @@ int main(int argc, char **argv) {
     int bufoff = 0;
     int flags;
     FILE *fh;
+#ifdef HAVE_CYRUS_SASL_H
+    char *service = "zookeeper";
+    char *serverFQDN = NULL;
+    char *mechlist = NULL;
+    char *user = NULL;
+    char *realm = NULL;
+    char *passwordFile = NULL;
+#endif /* HAVE_CYRUS_SASL_H */
 
     int opt;
     int option_index = 0;
@@ -780,7 +797,7 @@ int main(int argc, char **argv) {
     zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
 
     flags = 0;
-    while ((opt = getopt_long(argc, argv, "h:s:m:c:rd", long_options, &option_index)) != -1) {
+    while ((opt = getopt_long(argc, argv, "h:s:m:c:rdz:o:n:u:l:p:", long_options, &option_index)) != -1) {
         switch (opt) {
             case 'h':
                 hostPort = optarg;
@@ -804,6 +821,26 @@ int main(int argc, char **argv) {
                 zoo_set_debug_level(ZOO_LOG_LEVEL_DEBUG);
                 fprintf(stderr, "logging level set to DEBUG\n");
                 break;
+#ifdef HAVE_CYRUS_SASL_H
+            case 'z':
+                service = optarg;
+                break;
+            case 'o':
+                serverFQDN = optarg;
+                break;
+            case 'n':
+                mechlist = optarg;
+                break;
+            case 'u':
+                user = optarg;
+                break;
+            case 'l':
+                realm = optarg;
+                break;
+            case 'p':
+                passwordFile = optarg;
+                break;
+#endif /* HAVE_CYRUS_SASL_H */
             case '?':
                 if (optopt == 'h') {
                     fprintf (stderr, "Option -%c requires host list.\n", optopt);
@@ -853,10 +890,31 @@ int main(int argc, char **argv) {
                 "-s, --ssl <ssl params>         Comma separated parameters to initiate SSL connection\n"
                 "                                 e.g.: server_cert.crt,client_cert.crt,client_priv_key.pem,passwd\n"
 #endif
+#ifdef HAVE_CYRUS_SASL_H
+                "-u, --user <user>              SASL user name\n"
+                "-n, --mechlist <mechlist>      Comma separated list of SASL mechanisms (GSSAPI and/or DIGEST-MD5)\n"
+                "-o, --server-fqdn <fqdn>       SASL server name ('zk-sasl-md5' for DIGEST-MD5; default: reverse DNS lookup)\n"
+                "-p, --password-file <file>     File containing the password (recommended for SASL/DIGEST-MD5)\n"
+                "-l, --realm <realm>            Realm (for SASL/GSSAPI)\n"
+                "-z, --service <service>        SASL service parameter (default: 'zookeeper')\n"
+#endif /* HAVE_CYRUS_SASL_H */
                 "-r, --readonly                 Connect in read-only mode\n"
                 "-d, --debug                    Activate debug logs right from the beginning (you can also use the \n"
                 "                                 command 'verbose' later to activate debug logs in the cli shell)\n\n",
                 argv[0]);
+#ifdef HAVE_CYRUS_SASL_H
+        fprintf(stderr,
+                "SASL EXAMPLES:\n"
+                "$ %s --mechlist DIGEST-MD5 --user bob --password-file bob.secret --server-fqdn zk-sasl-md5 -h ...\n"
+                "$ %s --mechlist GSSAPI --user bob --realm BOBINC.COM -h ...\n"
+                "Notes:\n"
+                "  * SASL and SSL support are currently incompatible (ZOOKEEPER-3482);\n"
+                "  * SASL parameters map to Cyrus SASL's _new/_start APIs and callbacks;\n"
+                "  * DIGEST-MD5 requires '--server-fqdn zk-sasl-md5' for historical reasons.\n"
+                "  * Passwords are obtained via the obsolete 'getpass()' if not provided via '--password-file'.\n"
+                "\n",
+                argv[0], argv[0]);
+#endif /* HAVE_CYRUS_SASL_H */
         fprintf(stderr,
                 "Version: ZooKeeper cli (c client) version %s\n",
                 ZOO_VERSION);
@@ -889,18 +947,52 @@ int main(int argc, char **argv) {
 #endif
     zoo_deterministic_conn_order(1); // enable deterministic order
 
-#ifdef HAVE_OPENSSL_H
-    if (!cert) {
-        zh = zookeeper_init(hostPort, watcher, 30000, &myid, NULL, flags);
-    } else {
-        zh = zookeeper_init_ssl(hostPort, cert, watcher, 30000, &myid, NULL, flags);
+#ifdef HAVE_CYRUS_SASL_H
+    if (mechlist) {
+        zoo_sasl_params_t sasl_params = { 0 };
+        int sr;
+
+        if (cert) {
+            fprintf(stderr, "SASL and SSL support are currently incompatible (ZOOKEEPER-3482); exiting.\n");
+            return 1;
+        }
+
+        sr = sasl_client_init(NULL);
+        if (sr != SASL_OK) {
+            fprintf(stderr, "Unable to initialize SASL library: %s\n",
+                    sasl_errstring(sr, NULL, NULL));
+            return 1;
+        }
+
+        sasl_params.service = service;
+        sasl_params.host = serverFQDN;
+        sasl_params.mechlist = mechlist;
+        sasl_params.callbacks = zoo_sasl_make_basic_callbacks(user, realm,
+            passwordFile);
+
+        zh = zookeeper_init_sasl(hostPort, watcher, 30000, &myid, NULL, flags,
+            NULL, &sasl_params);
+
+        if (!zh) {
+            return errno;
+        }
     }
+#endif /* HAVE_CYRUS_SASL_H */
+
+    if (!zh) {
+#ifdef HAVE_OPENSSL_H
+        if (!cert) {
+            zh = zookeeper_init(hostPort, watcher, 30000, &myid, NULL, flags);
+        } else {
+            zh = zookeeper_init_ssl(hostPort, cert, watcher, 30000, &myid, NULL, flags);
+        }
 #else
-    zh = zookeeper_init(hostPort, watcher, 30000, &myid, NULL, flags);
+        zh = zookeeper_init(hostPort, watcher, 30000, &myid, NULL, flags);
 #endif
 
-    if (!zh) {
-        return errno;
+        if (!zh) {
+            return errno;
+        }
     }
 
 #ifdef YCA
diff --git a/zookeeper-client/zookeeper-client-c/src/zk_adaptor.h b/zookeeper-client/zookeeper-client-c/src/zk_adaptor.h
index 8157472..57696d4 100644
--- a/zookeeper-client/zookeeper-client-c/src/zk_adaptor.h
+++ b/zookeeper-client/zookeeper-client-c/src/zk_adaptor.h
@@ -18,6 +18,7 @@
 
 #ifndef ZK_ADAPTOR_H_
 #define ZK_ADAPTOR_H_
+
 #include <zookeeper.jute.h>
 #ifdef THREADED
 #ifndef WIN32
@@ -182,6 +183,8 @@ typedef struct _auth_list_head {
 #endif
 } auth_list_head_t;
 
+typedef struct _zoo_sasl_client zoo_sasl_client_t;
+
 /**
  * This structure represents the connection to zookeeper.
  */
@@ -261,6 +264,10 @@ struct _zhandle {
     /** used for chroot path at the client side **/
     char *chroot;
 
+#ifdef HAVE_CYRUS_SASL_H
+    zoo_sasl_client_t *sasl_client;
+#endif /* HAVE_CYRUS_SASL_H */
+
     /** Indicates if this client is allowed to go to r/o mode */
     char allow_read_only;
     /** Indicates if we connected to a majority server before */
diff --git a/zookeeper-client/zookeeper-client-c/src/zk_sasl.c b/zookeeper-client/zookeeper-client-c/src/zk_sasl.c
new file mode 100644
index 0000000..e0ccfb3
--- /dev/null
+++ b/zookeeper-client/zookeeper-client-c/src/zk_sasl.c
@@ -0,0 +1,541 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "config.h"
+
+#ifdef HAVE_SYS_SOCKET_H
+#include <sys/socket.h>
+#endif
+
+#ifdef HAVE_NETINET_IN_H
+#include <netinet/in.h>
+#endif
+
+#ifdef HAVE_ARPA_INET_H
+#include <arpa/inet.h>
+#endif
+
+#ifdef HAVE_NETDB_H
+#include <netdb.h>
+#endif
+
+#ifdef HAVE_STRING_H
+#include <string.h>
+#endif
+
+#ifdef HAVE_UNISTD_H
+#include <unistd.h>
+#endif
+
+#include <zookeeper.h>
+#include "zk_sasl.h"
+#include "zk_adaptor.h"
+#include "zookeeper_log.h"
+
+/*
+ * Store a duplicate of src, or NULL, into *target.  Returns
+ * ZSYSTEMERROR if no memory could be allocated, ZOK otherwise.
+ */
+static int _zsasl_strdup(const char **target, const char *src)
+{
+    if (src) {
+        *target = strdup(src);
+        if (!*target) {
+            return ZSYSTEMERROR;
+        }
+    }
+    return ZOK;
+}
+
+/*
+ * Free the malloc'ed memory referenced by *location, setting
+ * *location to NULL.
+ */
+static void _zsasl_free(const char **location)
+{
+    if (*location) {
+        free((void*)*location);
+        *location = NULL;
+    }
+}
+
+zoo_sasl_client_t *zoo_sasl_client_create(zoo_sasl_params_t *sasl_params)
+{
+    zoo_sasl_client_t *sc = calloc(1, sizeof(*sc));
+    int rc = ZOK;
+
+    if (!sc) {
+        return NULL;
+    }
+
+    sc->state = ZOO_SASL_INITIAL;
+
+    rc = rc < 0 ? rc : _zsasl_strdup(&sc->params.service, sasl_params->service);
+    rc = rc < 0 ? rc : _zsasl_strdup(&sc->params.host, sasl_params->host);
+    rc = rc < 0 ? rc : _zsasl_strdup(&sc->params.mechlist, sasl_params->mechlist);
+
+    sc->params.callbacks = sasl_params->callbacks;
+
+    if (rc != ZOK) {
+        zoo_sasl_client_destroy(sc);
+        return NULL;
+    }
+
+    return sc;
+}
+
+void zoo_sasl_client_destroy(zoo_sasl_client_t *sc)
+{
+    if (!sc) {
+        return;
+    }
+
+    if (sc->conn) {
+        sasl_dispose(&sc->conn);
+    }
+
+    sc->params.callbacks = NULL;
+
+    _zsasl_free(&sc->params.service);
+    _zsasl_free(&sc->params.host);
+    _zsasl_free(&sc->params.mechlist);
+
+    sc->state = ZOO_SASL_FAILED;
+}
+
+void zoo_sasl_mark_failed(zhandle_t *zh)
+{
+    if (zh->sasl_client) {
+        zh->sasl_client->state = ZOO_SASL_FAILED;
+    }
+    zh->state = ZOO_AUTH_FAILED_STATE;
+}
+
+/*
+ * Put the handle and SASL client in failed state if rc is not ZOK.
+ * Returns rc.
+ */
+static int _zsasl_fail(zhandle_t *zh, int rc)
+{
+    if (rc != ZOK) {
+        zoo_sasl_mark_failed(zh);
+        LOG_ERROR(LOGCALLBACK(zh), "SASL authentication failed. rc=%d", rc);
+    }
+    return rc;
+}
+
+/*
+ * Get and format the host and port associated with a socket address
+ * into Cyrus SASL format.  Optionally also store the host name in
+ * provided buffer.
+ *
+ * \param addr the socket address
+ * \param salen the length of addr
+ * \param ipport_buf the formatted output buffer, of size
+ *   NI_MAXHOST + NI_MAXSERV
+ * \param opt_host_buf the host name buffer, of size NI_MAXHOST, or
+ *   NULL for none
+ * \return ZOK if successful
+ */
+static int _zsasl_getipport(zhandle_t *zh,
+                            const struct sockaddr *addr, socklen_t salen,
+                            char *ipport_buf, char *opt_host_buf)
+{
+    char hbuf[NI_MAXHOST], pbuf[NI_MAXSERV];
+    int niflags, error, written;
+
+    niflags = (NI_NUMERICHOST | NI_NUMERICSERV);
+#ifdef NI_WITHSCOPEID
+    if (addr->sa_family == AF_INET6) {
+        niflags |= NI_WITHSCOPEID;
+    }
+#endif
+    error = getnameinfo(addr, salen, hbuf, sizeof(hbuf), pbuf, sizeof(pbuf),
+                        niflags);
+    if (error != 0) {
+        LOG_ERROR(LOGCALLBACK(zh), "getnameinfo: %s\n", gai_strerror(error));
+        return ZSYSTEMERROR;
+    }
+
+    written = sprintf(ipport_buf, "%s;%s", hbuf, pbuf);
+    if (written < 0) {
+        return ZSYSTEMERROR;
+    }
+
+    if (opt_host_buf) {
+        memcpy(opt_host_buf, hbuf, sizeof(hbuf));
+    }
+
+    return ZOK;
+}
+
+int zoo_sasl_connect(zhandle_t *zh)
+{
+    zoo_sasl_client_t *sc = zh->sasl_client;
+    char iplocalport[NI_MAXHOST + NI_MAXSERV];
+    char ipremoteport[NI_MAXHOST + NI_MAXSERV];
+    char host[NI_MAXHOST];
+    int rc, sr;
+    socklen_t salen;
+    struct sockaddr_storage local_ip, remote_ip;
+
+    if (!sc) {
+        return _zsasl_fail(zh, ZINVALIDSTATE);
+    }
+
+    if (sc->conn) {
+        sasl_dispose(&sc->conn);
+    }
+
+    sc->state = ZOO_SASL_INITIAL;
+
+    /* set ip addresses */
+    salen = sizeof(local_ip);
+    if (getsockname(zh->fd->sock, (struct sockaddr *)&local_ip, &salen) < 0) {
+        LOG_ERROR(LOGCALLBACK(zh), "getsockname");
+        return _zsasl_fail(zh, ZSYSTEMERROR);
+    }
+
+    rc = _zsasl_getipport(zh, (const struct sockaddr *)&local_ip, salen,
+        iplocalport, NULL);
+    if (rc < 0) {
+        return _zsasl_fail(zh, rc);
+    }
+
+    salen = sizeof(remote_ip);
+    if (getpeername(zh->fd->sock, (struct sockaddr *)&remote_ip, &salen) < 0) {
+        LOG_ERROR(LOGCALLBACK(zh), "getpeername");
+        return _zsasl_fail(zh, ZSYSTEMERROR);
+    }
+
+    rc = _zsasl_getipport(zh, (const struct sockaddr *)&remote_ip, salen,
+        ipremoteport, host);
+    if (rc < 0) {
+        return _zsasl_fail(zh, rc);
+    }
+
+    LOG_DEBUG(LOGCALLBACK(zh),
+        "Zookeeper Host: %s %s", iplocalport, ipremoteport);
+
+    /* client new connection */
+    sr = sasl_client_new(
+        sc->params.service,
+        sc->params.host ? sc->params.host : host,
+        iplocalport,
+        ipremoteport,
+        sc->params.callbacks,
+        /*secflags*/0,
+        &sc->conn);
+
+    if (sr != SASL_OK) {
+        LOG_ERROR(LOGCALLBACK(zh),
+            "allocating SASL connection state: %s",
+            sasl_errstring(sr, NULL, NULL));
+        return _zsasl_fail(zh, ZSYSTEMERROR);
+    }
+
+    return ZOK;
+}
+
+int zoo_sasl_client_start(zhandle_t *zh)
+{
+    zoo_sasl_client_t *sc = zh->sasl_client;
+    const char *chosenmech;
+    const char *client_data;
+    unsigned client_data_len;
+    int sr, rc = ZOK;
+
+    if (!sc || sc->state != ZOO_SASL_INITIAL) {
+        return _zsasl_fail(zh, ZINVALIDSTATE);
+    }
+
+    sc->state = ZOO_SASL_INTERMEDIATE;
+
+    sr = sasl_client_start(sc->conn, sc->params.mechlist,
+                           NULL, &client_data, &client_data_len, &chosenmech);
+
+    if (sr != SASL_OK && sr != SASL_CONTINUE) {
+        LOG_ERROR(LOGCALLBACK(zh),
+                  "Starting SASL negotiation: %s %s",
+                  sasl_errstring(sr, NULL, NULL),
+                  sasl_errdetail(sc->conn));
+        return _zsasl_fail(zh, ZSYSTEMERROR);
+    }
+
+    LOG_DEBUG(LOGCALLBACK(zh),
+              "SASL start sr:%d mech:%s client_data_len:%d",
+              sr, chosenmech, (int)client_data_len);
+
+    /*
+     * HACK: Without this, the SASL client is unable to reauthenticate
+     * with the ZooKeeper ensemble after a disconnect.  This is due to
+     * a bug in the JDK's implementation of SASL DIGEST-MD5; the
+     * upstream issue is:
+     *
+     *     JDK-6682540, Incorrect SASL DIGEST-MD5 behavior
+     *     https://bugs.openjdk.java.net/browse/JDK-6682540
+     *
+     * A patch has been committed to the JDK in May 2019, but it will
+     * take a while to appear in production:
+     *
+     *     http://hg.openjdk.java.net/jdk/jdk/rev/0627b8ad33c1
+     *
+     * As a workaround, we just "empty" the client start in DIGEST-MD5
+     * mode, forcing the server to proceed with initial (re)authentication.
+     */
+    if (client_data_len > 0 && strcmp(chosenmech, "DIGEST-MD5") == 0) {
+        LOG_DEBUG(LOGCALLBACK(zh),
+                  "SASL start %s: refusing reauthenticate",
+                  chosenmech);
+
+        client_data = NULL;
+        client_data_len = 0;
+    }
+
+    /*
+     * ZooKeeperSaslClient.java:285 says:
+     *
+     *     GSSAPI: server sends a final packet after authentication
+     *     succeeds or fails.
+     *
+     * so we need to keep track of that.
+     */
+    if (strcmp(chosenmech, "GSSAPI") == 0) {
+        sc->is_gssapi = 1;
+    }
+
+    if (sr == SASL_CONTINUE || client_data_len > 0) {
+        rc = queue_sasl_request(zh, client_data, client_data_len);
+        if (rc < 0) {
+            return _zsasl_fail(zh, rc);
+        }
+    }
+
+    return rc;
+}
+
+int zoo_sasl_client_step(zhandle_t *zh, const char *server_data,
+                         int server_data_len)
+{
+    zoo_sasl_client_t *sc = zh->sasl_client;
+    const char *client_data;
+    unsigned client_data_len;
+    int sr, rc = ZOK;
+
+    if (!sc || sc->state != ZOO_SASL_INTERMEDIATE) {
+        return _zsasl_fail(zh, ZINVALIDSTATE);
+    }
+
+    LOG_DEBUG(LOGCALLBACK(zh),
+              "SASL intermediate server_data_len:%d", server_data_len);
+
+    if (sc->is_gssapi && sc->is_last_packet) {
+        /* See note in zoo_sasl_client_start. */
+        sc->is_last_packet = 0;
+        sc->state = ZOO_SASL_COMPLETE;
+        return rc;
+    }
+
+    sr = sasl_client_step(sc->conn, server_data, server_data_len,
+            NULL, &client_data, &client_data_len);
+
+    LOG_DEBUG(LOGCALLBACK(zh),
+              "SASL intermediate sr:%d client_data_len:%d",
+              sr, (int)client_data_len);
+
+    if (sr != SASL_OK && sr != SASL_CONTINUE) {
+        LOG_ERROR(LOGCALLBACK(zh),
+                  "During SASL negotiation: %s %s",
+                  sasl_errstring(sr, NULL, NULL),
+                  sasl_errdetail(sc->conn));
+        return _zsasl_fail(zh, ZSYSTEMERROR);
+    }
+
+    if (sr == SASL_CONTINUE || client_data_len > 0) {
+        rc = queue_sasl_request(zh, client_data, client_data_len);
+        if (rc < 0) {
+            return _zsasl_fail(zh, rc);
+        }
+    }
+
+    if (sr != SASL_CONTINUE) {
+        if (sc->is_gssapi) {
+            /* See note in zoo_sasl_client_start. */
+            sc->is_last_packet = 1;
+        } else {
+            sc->state = ZOO_SASL_COMPLETE;
+        }
+    }
+
+    return rc;
+}
+
+/*
+ * Cyrus SASL callback for SASL_CB_GETREALM
+ */
+static int _zsasl_getrealm(void *context, int id, const char **availrealms,
+                           const char **result)
+{
+    const char *realm = (const char*)context;
+    *result = realm;
+    return SASL_OK;
+}
+
+/*
+ * Cyrus SASL callback for SASL_CB_USER or SASL_CB_AUTHNAME
+ */
+static int _zsasl_simple(void *context, int id, const char **result,
+                         unsigned *len)
+{
+    const char *user = (const char*)context;
+
+    /* paranoia check */
+    if (!result)
+        return SASL_BADPARAM;
+
+    switch (id) {
+    case SASL_CB_USER:
+        *result = user;
+        break;
+    case SASL_CB_AUTHNAME:
+        *result = user;
+        break;
+    default:
+        return SASL_BADPARAM;
+    }
+
+    return SASL_OK;
+}
+
+#ifndef HAVE_GETPASSPHRASE
+static char *
+getpassphrase(const char *prompt) {
+    return getpass(prompt);
+}
+#endif /* ! HAVE_GETPASSPHRASE */
+
+struct zsasl_secret_ctx {
+    const char *password_file;
+    sasl_secret_t *secret;
+};
+
+/*
+ * Cyrus SASL callback for SASL_CB_PASS
+ */
+static int _zsasl_getsecret(sasl_conn_t *conn, void *context, int id,
+                            sasl_secret_t **psecret)
+{
+    struct zsasl_secret_ctx *secret_ctx = (struct zsasl_secret_ctx *)context;
+    char buf[1024];
+    char *password;
+    size_t len;
+    sasl_secret_t *x;
+
+    /* paranoia check */
+    if (!conn || !psecret || id != SASL_CB_PASS)
+        return SASL_BADPARAM;
+
+    if (secret_ctx->password_file) {
+        char *p;
+        FILE *fh = fopen(secret_ctx->password_file, "rt");
+        if (!fh)
+            return SASL_FAIL;
+
+        if (!fgets(buf, sizeof(buf), fh)) {
+            fclose(fh);
+            return SASL_FAIL;
+        }
+
+        fclose(fh);
+
+        p = strrchr(buf, '\n');
+        if (p)
+            *p = '\0';
+
+        password = buf;
+    } else {
+        password = getpassphrase("Password: ");
+
+        if (!password)
+            return SASL_FAIL;
+    }
+
+    len = strlen(password);
+
+    x = secret_ctx->secret = (sasl_secret_t *)realloc(
+        secret_ctx->secret, sizeof(sasl_secret_t) + len);
+
+    if (!x) {
+        memset(password, 0, len);
+        return SASL_NOMEM;
+    }
+
+    x->len = len;
+    strcpy((char *) x->data, password);
+    memset(password, 0, len);
+
+    *psecret = x;
+    return SASL_OK;
+}
+
+typedef int (* sasl_callback_fn_t)(void);
+
+sasl_callback_t *zoo_sasl_make_basic_callbacks(const char *user,
+                                               const char *realm,
+                                               const char* password_file)
+{
+    struct zsasl_secret_ctx *secret_ctx;
+    const char *user_ctx = NULL;
+    const char *realm_ctx = NULL;
+    int rc;
+
+    secret_ctx = (struct zsasl_secret_ctx *)calloc(
+        1, sizeof(struct zsasl_secret_ctx));
+    rc = secret_ctx ? ZOK : ZSYSTEMERROR;
+
+    rc = rc < 0 ? rc : _zsasl_strdup(&user_ctx, user);
+    rc = rc < 0 ? rc : _zsasl_strdup(&realm_ctx, realm);
+    rc = rc < 0 ? rc : _zsasl_strdup(&secret_ctx->password_file, password_file);
+
+    {
+        sasl_callback_t callbacks[] = {
+            { SASL_CB_GETREALM, (sasl_callback_fn_t)&_zsasl_getrealm, (void*)realm_ctx },
+            { SASL_CB_USER, (sasl_callback_fn_t)&_zsasl_simple, (void*)user_ctx },
+            { SASL_CB_AUTHNAME, (sasl_callback_fn_t)&_zsasl_simple, (void*)user_ctx },
+            { SASL_CB_PASS, (sasl_callback_fn_t)&_zsasl_getsecret, (void*)secret_ctx },
+            { SASL_CB_LIST_END, NULL, NULL }
+        };
+
+        sasl_callback_t *xcallbacks = rc < 0 ? NULL : malloc(sizeof(callbacks));
+
+        if (rc < 0 || !xcallbacks) {
+            if (secret_ctx) {
+                _zsasl_free(&secret_ctx->password_file);
+                free(secret_ctx);
+                secret_ctx = NULL;
+            }
+            _zsasl_free(&realm_ctx);
+            _zsasl_free(&user_ctx);
+            return NULL;
+        }
+
+        memcpy(xcallbacks, callbacks, sizeof(callbacks));
+
+        return xcallbacks;
+    }
+}
diff --git a/zookeeper-client/zookeeper-client-c/src/zk_sasl.h b/zookeeper-client/zookeeper-client-c/src/zk_sasl.h
new file mode 100644
index 0000000..cb840f1
--- /dev/null
+++ b/zookeeper-client/zookeeper-client-c/src/zk_sasl.h
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ZK_SASL_H_
+#define ZK_SASL_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * \brief enumerates SASL authentication states.  Corresponds to
+ * org.apache.zookeeper.client.ZooKeeperSaslClient.SaslState.
+ */
+typedef enum {
+    ZOO_SASL_INITIAL,
+    ZOO_SASL_INTERMEDIATE,
+    ZOO_SASL_COMPLETE,
+    ZOO_SASL_FAILED
+} ZooSaslState;
+
+/**
+ * \brief zoo_sasl_client_t structure.
+ *
+ * This structure holds (a copy of) the original SASL parameters, the
+ * Cyrus SASL client "object," and the current authentication state.
+ *
+ * See \ref zoo_sasl_client_create and \ref zoo_sasl_client_destroy.
+ */
+typedef struct _zoo_sasl_client {
+    zoo_sasl_params_t params;
+    sasl_conn_t *conn;
+    ZooSaslState state;
+    unsigned char is_gssapi;
+    unsigned char is_last_packet;
+} zoo_sasl_client_t;
+
+/**
+ * \brief allocates a \ref zoo_sasl_client_t "object."
+ *
+ * \param sasl_params The SASL parameters to use.  Note while "string"
+ *   parameters are copied, the callbacks array is simply referenced:
+ *   its lifetime must therefore cover that of the handle.
+ * \return the client object, or NULL on failure
+ */
+zoo_sasl_client_t *zoo_sasl_client_create(zoo_sasl_params_t *sasl_params);
+
+/**
+ * \brief "destroys" a \ref zoo_sasl_client_t "object" allocated by
+ * \ref zoo_sasl_client_create.
+ *
+ * \param sasl_client the client "object"
+ */
+void zoo_sasl_client_destroy(zoo_sasl_client_t *sasl_client);
+
+/**
+ * \brief put the handle and SASL client in failed state.
+ *
+ * This sets the SASL client in \ref ZOO_SASL_FAILED state and the
+ * ZooKeeper handle in \ref ZOO_AUTH_FAILED_STATE state.
+ *
+ * \param zh the ZooKeeper handle to mark
+ */
+void zoo_sasl_mark_failed(zhandle_t *zh);
+
+/**
+ * \brief prepares the SASL connection object for the (connecting)
+ * ZooKeeper handle.
+ *
+ * The client is switched to \ref ZOO_SASL_INITIAL state, or \ref
+ * ZOO_SASL_FAILED in case of error.
+ *
+ * \param zh the ZooKeeper handle in \ref ZOO_CONNECTING_STATE state
+ * \return ZOK on success, or one of the following on failure:
+ *   ZINVALIDSTATE - no SASL client present
+ *   ZSYSTEMERROR - SASL library error
+ */
+int zoo_sasl_connect(zhandle_t *zh);
+
+/**
+ * \brief queues an encoded SASL request to ZooKeeper.
+ *
+ * Note that such packets are added to the front of the queue,
+ * pre-empting "normal" communications.
+ *
+ * \param zh the ZooKeeper handle
+ * \param client_data the encoded SASL data, ready to send
+ * \param client_data_len the length of \c client_data
+ * \return ZOK on success, or ZMARSHALLINGERROR if something went wrong
+ */
+int queue_sasl_request(zhandle_t *zh, const char *client_data,
+     int client_data_len);
+
+/**
+ * \brief starts a new SASL authentication session using the
+ * parameters provided to \ref zoo_sasl_client_create
+ *
+ * On entry, the client must be in \ref ZOO_SASL_INITIAL state; this
+ * call switches it to \ref ZOO_SASL_INTERMEDIATE state or \ref
+ * ZOO_SASL_FAILED in case of error.
+ *
+ * Note that this is not a "normal" ZK client function; the
+ * corresponding packets are added to the front of the queue,
+ * pre-empting other requests.
+ *
+ * \param zh the ZooKeeper handle, with the SASL client in
+ *   \ref ZOO_SASL_INITIAL state
+ * \return ZOK on success, or one of the following on failure:
+ *   ZINVALIDSTATE - client not in expected state
+ *   ZSYSTEMERROR - SASL library error
+ *   ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+int zoo_sasl_client_start(zhandle_t *zh);
+
+/**
+ * \brief performs a step in the SASL authentication process.
+ *
+ * On entry, the client must be in \ref ZOO_SASL_INTERMEDIATE
+ * state. This call switches it to \ref ZOO_SASL_COMPLETE state if
+ * (and only if) the process is complete--or to \ref ZOO_SASL_FAILED
+ * in case of error.
+ *
+ * \param zh the ZooKeeper handle, with the SASL client in
+ *   \ref ZOO_SASL_INTERMEDIATE state
+ * \param server_data SASL data from the ZooKeeper server
+ * \param server_data_len length of \c server_data
+ * \return ZOK on success, or one of the following on failure:
+ *   ZINVALIDSTATE - client not in expected state
+ *   ZSYSTEMERROR - SASL library error
+ *   ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+int zoo_sasl_client_step(zhandle_t *zh, const char *server_data,
+    int server_data_len);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /*ZK_SASL_H_*/
diff --git a/zookeeper-client/zookeeper-client-c/src/zookeeper.c b/zookeeper-client/zookeeper-client-c/src/zookeeper.c
index 7ab5eed..db42d6c 100644
--- a/zookeeper-client/zookeeper-client-c/src/zookeeper.c
+++ b/zookeeper-client/zookeeper-client-c/src/zookeeper.c
@@ -32,6 +32,10 @@
 #include "zookeeper_log.h"
 #include "zk_hashtable.h"
 
+#ifdef HAVE_CYRUS_SASL_H
+#include "zk_sasl.h"
+#endif /* HAVE_CYRUS_SASL_H */
+
 #include <stdlib.h>
 #include <stdio.h>
 #include <string.h>
@@ -326,6 +330,17 @@ static void zookeeper_set_sock_noblock(zhandle_t *, socket_t);
 static void zookeeper_set_sock_timeout(zhandle_t *, socket_t, int);
 static socket_t zookeeper_connect(zhandle_t *, struct sockaddr_storage *, socket_t);
 
+/*
+ * return 1 if zh has a SASL client performing authentication, 0 otherwise.
+ */
+static int is_sasl_auth_in_progress(zhandle_t* zh)
+{
+#ifdef HAVE_CYRUS_SASL_H
+    return zh->sasl_client && zh->sasl_client->state == ZOO_SASL_INTERMEDIATE;
+#else /* !HAVE_CYRUS_SASL_H */
+    return 0;
+#endif /* HAVE_CYRUS_SASL_H */
+}
 
 /*
  * abort due to the use of a sync api in a singlethreaded environment
@@ -639,6 +654,13 @@ static void destroy(zhandle_t *zh)
     destroy_zk_hashtable(zh->active_child_watchers);
     addrvec_free(&zh->addrs_old);
     addrvec_free(&zh->addrs_new);
+
+#ifdef HAVE_CYRUS_SASL_H
+    if (zh->sasl_client) {
+        zoo_sasl_client_destroy(zh->sasl_client);
+        zh->sasl_client = NULL;
+    }
+#endif /* HAVE_CYRUS_SASL_H */
 }
 
 static void setup_random()
@@ -1197,7 +1219,7 @@ static void log_env(zhandle_t *zh) {
  */
 static zhandle_t *zookeeper_init_internal(const char *host, watcher_fn watcher,
         int recv_timeout, const clientid_t *clientid, void *context, int flags,
-        log_callback_fn log_callback, zcert_t *cert)
+        log_callback_fn log_callback, zcert_t *cert, void *sasl_params)
 {
     int errnosave = 0;
     zhandle_t *zh = NULL;
@@ -1308,6 +1330,16 @@ static zhandle_t *zookeeper_init_internal(const char *host, watcher_fn watcher,
     zh->active_child_watchers=create_zk_hashtable();
     zh->disable_reconnection_attempt = 0;
 
+#ifdef HAVE_CYRUS_SASL_H
+    if (sasl_params) {
+        zh->sasl_client = zoo_sasl_client_create(
+            (zoo_sasl_params_t*)sasl_params);
+        if (!zh->sasl_client) {
+            goto abort;
+        }
+    }
+#endif /* HAVE_CYRUS_SASL_H */
+
     if (adaptor_init(zh) == -1) {
         goto abort;
     }
@@ -1325,14 +1357,14 @@ abort:
 zhandle_t *zookeeper_init(const char *host, watcher_fn watcher,
         int recv_timeout, const clientid_t *clientid, void *context, int flags)
 {
-    return zookeeper_init_internal(host, watcher, recv_timeout, clientid, context, flags, NULL, NULL);
+    return zookeeper_init_internal(host, watcher, recv_timeout, clientid, context, flags, NULL, NULL, NULL);
 }
 
 zhandle_t *zookeeper_init2(const char *host, watcher_fn watcher,
         int recv_timeout, const clientid_t *clientid, void *context, int flags,
         log_callback_fn log_callback)
 {
-    return zookeeper_init_internal(host, watcher, recv_timeout, clientid, context, flags, log_callback, NULL);
+    return zookeeper_init_internal(host, watcher, recv_timeout, clientid, context, flags, log_callback, NULL, NULL);
 }
 
 #ifdef HAVE_OPENSSL_H
@@ -1345,10 +1377,19 @@ zhandle_t *zookeeper_init_ssl(const char *host, const char *cert, watcher_fn wat
     zcert.cert = strtok(NULL, ",");
     zcert.key = strtok(NULL, ",");
     zcert.passwd = strtok(NULL, ",");       
-    return zookeeper_init_internal(host, watcher, recv_timeout, clientid, context, flags, NULL, &zcert);
+    return zookeeper_init_internal(host, watcher, recv_timeout, clientid, context, flags, NULL, &zcert, NULL);
 }
 #endif
 
+#ifdef HAVE_CYRUS_SASL_H
+zhandle_t *zookeeper_init_sasl(const char *host, watcher_fn watcher,
+        int recv_timeout, const clientid_t *clientid, void *context, int flags,
+        log_callback_fn log_callback, zoo_sasl_params_t *sasl_params)
+{
+    return zookeeper_init_internal(host, watcher, recv_timeout, clientid, context, flags, log_callback, NULL, sasl_params);
+}
+#endif /* HAVE_CYRUS_SASL_H */
+
 /**
  * Set a new list of zk servers to connect to.  Disconnect will occur if
  * current connection endpoint is not in the list.
@@ -2540,7 +2581,7 @@ int zookeeper_interest(zhandle_t *zh, socket_t *fd, int *interest,
         *interest = ZOOKEEPER_READ;
         /* we are interested in a write if we are connected and have something
          * to send, or we are waiting for a connect to finish. */
-        if ((zh->to_send.head && is_connected(zh))
+        if ((zh->to_send.head && (is_connected(zh) || is_sasl_auth_in_progress(zh)))
             || zh->state == ZOO_CONNECTING_STATE
             || zh->state == ZOO_SSL_CONNECTING_STATE) {
             *interest |= ZOOKEEPER_WRITE;
@@ -2691,6 +2732,91 @@ static int init_ssl_for_socket(zsock_t *fd, zhandle_t *zh, int fail_on_error) {
 
 #endif
 
+/*
+ * the "bottom half" of the session establishment procedure, executed
+ * either after receiving the "prime response," or after SASL
+ * authentication is complete
+ */
+static void finalize_session_establishment(zhandle_t *zh) {
+    zh->state = zh->primer_storage.readOnly ?
+        ZOO_READONLY_STATE : ZOO_CONNECTED_STATE;
+    zh->reconfig = 0;
+    LOG_INFO(LOGCALLBACK(zh),
+             "session establishment complete on server %s, sessionId=%#llx, negotiated timeout=%d %s",
+             format_endpoint_info(&zh->addr_cur),
+             zh->client_id.client_id, zh->recv_timeout,
+             zh->primer_storage.readOnly ? "(READ-ONLY mode)" : "");
+    /* we want the auth to be sent for, but since both call push to front
+       we need to call send_watch_set first */
+    send_set_watches(zh);
+    /* send the authentication packet now */
+    send_auth_info(zh);
+    LOG_DEBUG(LOGCALLBACK(zh), "Calling a watcher for a ZOO_SESSION_EVENT and the state=ZOO_CONNECTED_STATE");
+    zh->input_buffer = 0; // just in case the watcher calls zookeeper_process() again
+    PROCESS_SESSION_EVENT(zh, zh->state);
+}
+
+#ifdef HAVE_CYRUS_SASL_H
+
+/*
+ * queue an encoded SASL request to ZooKeeper.  The packet is added to
+ * the front of the queue.
+ *
+ * \param zh the ZooKeeper handle
+ * \param client_data the encoded SASL data, ready to send
+ * \param client_data_len the length of \c client_data
+ * \return ZOK on success, or ZMARSHALLINGERROR if something went wrong
+ */
+int queue_sasl_request(zhandle_t *zh, const char *client_data, int client_data_len)
+{
+    struct oarchive *oa;
+    int rc;
+
+    /* Java client use normal xid, too. */
+    struct RequestHeader h = { get_xid(), ZOO_SASL_OP };
+    struct GetSASLRequest req = { { client_data_len, client_data_len>0 ? (char *) client_data : "" } };
+
+    oa = create_buffer_oarchive();
+    rc = serialize_RequestHeader(oa, "header", &h);
+    rc = rc < 0 ? rc : serialize_GetSASLRequest(oa, "req", &req);
+    rc = rc < 0 ? rc : queue_front_buffer_bytes(&zh->to_send, get_buffer(oa),
+         get_buffer_len(oa));
+    close_buffer_oarchive(&oa, 0);
+
+    LOG_DEBUG(LOGCALLBACK(zh),
+        "SASL: Queued request len=%d rc=%d", client_data_len, rc);
+
+    return (rc < 0) ? ZMARSHALLINGERROR : ZOK;
+}
+
+/*
+ * decode an expected SASL response and perform the corresponding
+ * authentication step
+ */
+static int process_sasl_response(zhandle_t *zh, char *buffer, int len)
+{
+    struct iarchive *ia = create_buffer_iarchive(buffer, len);
+    struct ReplyHeader hdr;
+    struct SetSASLResponse res;
+    int rc;
+
+    rc = ia ? ZOK : ZSYSTEMERROR;
+    rc = rc < 0 ? rc : deserialize_ReplyHeader(ia, "hdr", &hdr);
+    rc = rc < 0 ? rc : deserialize_SetSASLResponse(ia, "reply", &res);
+    rc = rc < 0 ? rc : zoo_sasl_client_step(zh, res.token.buff, res.token.len);
+    deallocate_SetSASLResponse(&res);
+    if (ia) {
+        close_buffer_iarchive(&ia);
+    }
+
+    LOG_DEBUG(LOGCALLBACK(zh),
+        "SASL: Processed response len=%d rc=%d", len, rc);
+
+    return rc;
+}
+
+#endif /* HAVE_CYRUS_SASL_H */
+
 static int check_events(zhandle_t *zh, int events)
 {
     if (zh->fd->sock == -1)
@@ -2759,7 +2885,24 @@ static int check_events(zhandle_t *zh, int events)
         if (rc > 0) {
             get_system_time(&zh->last_recv);
             if (zh->input_buffer != &zh->primer_buffer) {
-                queue_buffer(&zh->to_process, zh->input_buffer, 0);
+                if (is_connected(zh) || !is_sasl_auth_in_progress(zh)) {
+                    queue_buffer(&zh->to_process, zh->input_buffer, 0);
+#ifdef HAVE_CYRUS_SASL_H
+                } else {
+                    rc = process_sasl_response(zh, zh->input_buffer->buffer, zh->input_buffer->curr_offset);
+                    free_buffer(zh->input_buffer);
+                    if (rc < 0) {
+                        zoo_sasl_mark_failed(zh);
+                        return rc;
+                    } else if (zh->sasl_client->state == ZOO_SASL_COMPLETE) {
+                        /*
+                         * SASL authentication just completed; send
+                         * watches, auth. info, etc. now.
+                         */
+                        finalize_session_establishment(zh);
+                    }
+#endif /* HAVE_CYRUS_SASL_H */
+                }
             } else  {
                 int64_t oldid, newid;
                 //deserialize
@@ -2780,22 +2923,28 @@ static int check_events(zhandle_t *zh, int events)
 
                     memcpy(zh->client_id.passwd, &zh->primer_storage.passwd,
                            sizeof(zh->client_id.passwd));
-                    zh->state = zh->primer_storage.readOnly ?
-                      ZOO_READONLY_STATE : ZOO_CONNECTED_STATE;
-                    zh->reconfig = 0;
-                    LOG_INFO(LOGCALLBACK(zh),
-                             "session establishment complete on server %s, sessionId=%#llx, negotiated timeout=%d %s",
-                             format_endpoint_info(&zh->addr_cur),
-                             newid, zh->recv_timeout,
-                             zh->primer_storage.readOnly ? "(READ-ONLY mode)" : "");
-                    /* we want the auth to be sent for, but since both call push to front
-                       we need to call send_watch_set first */
-                    send_set_watches(zh);
-                    /* send the authentication packet now */
-                    send_auth_info(zh);
-                    LOG_DEBUG(LOGCALLBACK(zh), "Calling a watcher for a ZOO_SESSION_EVENT and the state=ZOO_CONNECTED_STATE");
-                    zh->input_buffer = 0; // just in case the watcher calls zookeeper_process() again
-                    PROCESS_SESSION_EVENT(zh, zh->state);
+
+#ifdef HAVE_CYRUS_SASL_H
+                    if (zh->sasl_client) {
+                        /*
+                         * Start a SASL authentication session.
+                         * Watches, auth. info, etc. will be sent
+                         * after it completes.
+                         */
+                        rc = zoo_sasl_connect(zh);
+                        rc = rc < 0 ? rc : zoo_sasl_client_start(zh);
+                        if (rc < 0) {
+                            zoo_sasl_mark_failed(zh);
+                            return rc;
+                        }
+                    } else {
+                        /* Can send watches, auth. info, etc. immediately. */
+                        finalize_session_establishment(zh);
+                    }
+#else /* HAVE_CYRUS_SASL_H */
+                    /* Can send watches, auth. info, etc. immediately. */
+                    finalize_session_establishment(zh);
+#endif /* HAVE_CYRUS_SASL_H */
                 }
             }
             zh->input_buffer = 0;
@@ -4593,7 +4742,7 @@ int flush_send_queue(zhandle_t*zh, int timeout)
     // we use a recursive lock instead and only dequeue the buffer if a send was
     // successful
     lock_buffer_list(&zh->to_send);
-    while (zh->to_send.head != 0 && is_connected(zh)) {
+    while (zh->to_send.head != 0 && (is_connected(zh) || is_sasl_auth_in_progress(zh))) {
         if(timeout!=0){
 #ifndef _WIN32
             struct pollfd fds;
diff --git a/zookeeper-client/zookeeper-client-c/tests/TestSASLAuth.cc b/zookeeper-client/zookeeper-client-c/tests/TestSASLAuth.cc
new file mode 100644
index 0000000..080649d
--- /dev/null
+++ b/zookeeper-client/zookeeper-client-c/tests/TestSASLAuth.cc
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifdef THREADED
+
+#include <cppunit/extensions/HelperMacros.h>
+#include "CppAssertHelper.h"
+
+#include <sys/socket.h>
+#include <unistd.h>
+
+#include <zookeeper.h>
+
+#include "Util.h"
+#include "WatchUtil.h"
+
+class Zookeeper_SASLAuth : public CPPUNIT_NS::TestFixture {
+    CPPUNIT_TEST_SUITE(Zookeeper_SASLAuth);
+    CPPUNIT_TEST(testServerRequireClientSASL);
+#ifdef HAVE_CYRUS_SASL_H
+    CPPUNIT_TEST(testClientSASL);
+#ifdef ZOO_IPV6_ENABLED
+    CPPUNIT_TEST(testClientSASLOverIPv6);
+#endif/* ZOO_IPV6_ENABLED */
+    CPPUNIT_TEST(testClientSASLReadOnly);
+#endif /* HAVE_CYRUS_SASL_H */
+    CPPUNIT_TEST_SUITE_END();
+    FILE *logfile;
+    static const char hostPorts[];
+    static const char jaasConf[];
+    static void watcher(zhandle_t *, int type, int state, const char *path,void*v){
+        watchctx_t *ctx = (watchctx_t*)v;
+
+        if (state == ZOO_CONNECTED_STATE || state == ZOO_READONLY_STATE) {
+            ctx->connected = true;
+        } else {
+            ctx->connected = false;
+        }
+        if (type != ZOO_SESSION_EVENT) {
+            evt_t evt;
+            evt.path = path;
+            evt.type = type;
+            ctx->putEvent(evt);
+        }
+    }
+
+public:
+    Zookeeper_SASLAuth() {
+      logfile = openlogfile("Zookeeper_SASLAuth");
+    }
+
+    ~Zookeeper_SASLAuth() {
+      if (logfile) {
+        fflush(logfile);
+        fclose(logfile);
+        logfile = 0;
+      }
+    }
+
+    void setUp() {
+        zoo_set_log_stream(logfile);
+
+        // Create SASL configuration file for server.
+        FILE *conff = fopen("Zookeeper_SASLAuth.jaas.conf", "wt");
+        CPPUNIT_ASSERT(conff);
+        size_t confLen = strlen(jaasConf);
+        CPPUNIT_ASSERT_EQUAL(fwrite(jaasConf, 1, confLen, conff), confLen);
+        CPPUNIT_ASSERT_EQUAL(fclose(conff), 0);
+        conff = NULL;
+
+        // Create password file for client.
+        FILE *passf = fopen("Zookeeper_SASLAuth.password", "wt");
+        CPPUNIT_ASSERT(passf);
+        CPPUNIT_ASSERT(fputs("mypassword", passf) > 0);
+        CPPUNIT_ASSERT_EQUAL(fclose(passf), 0);
+        passf = NULL;
+    }
+
+    void startServer(bool useJaasConf = true, bool readOnly = false) {
+        char cmd[1024];
+        sprintf(cmd, "%s startRequireSASLAuth %s %s",
+                ZKSERVER_CMD,
+                useJaasConf ? "Zookeeper_SASLAuth.jaas.conf" : "",
+                readOnly ? "true" : "");
+        CPPUNIT_ASSERT(system(cmd) == 0);
+    }
+
+    void stopServer() {
+        char cmd[1024];
+        sprintf(cmd, "%s stop", ZKSERVER_CMD);
+        CPPUNIT_ASSERT(system(cmd) == 0);
+    }
+
+    void testServerRequireClientSASL() {
+        startServer(false);
+
+        watchctx_t ctx;
+        int rc = 0;
+        zhandle_t *zk = zookeeper_init(hostPorts, watcher, 10000, 0, &ctx, 0);
+        ctx.zh = zk;
+        CPPUNIT_ASSERT(zk);
+
+        // Wait for handle to be connected.
+        CPPUNIT_ASSERT(ctx.waitForConnected(zk));
+
+        char pathbuf[80];
+        struct Stat stat_a = {0};
+
+        rc = zoo_create2(zk, "/serverRequireClientSASL", "", 0,
+                         &ZOO_OPEN_ACL_UNSAFE, 0, pathbuf, sizeof(pathbuf), &stat_a);
+        CPPUNIT_ASSERT_EQUAL((int)ZSESSIONCLOSEDREQUIRESASLAUTH, rc);
+
+        stopServer();
+    }
+
+#ifdef HAVE_CYRUS_SASL_H
+    void testClientSASLHelper(const char *hostPorts, const char *path) {
+        startServer();
+
+        // Initialize Cyrus SASL.
+        CPPUNIT_ASSERT_EQUAL(sasl_client_init(NULL), SASL_OK);
+
+        // Initialize SASL parameters.
+        zoo_sasl_params_t sasl_params = { 0 };
+
+        sasl_params.service = "zookeeper";
+        sasl_params.host = "zk-sasl-md5";
+        sasl_params.mechlist = "DIGEST-MD5";
+        sasl_params.callbacks = zoo_sasl_make_basic_callbacks(
+            "myuser", NULL, "Zookeeper_SASLAuth.password");
+
+        // Connect.
+        watchctx_t ctx;
+        int rc = 0;
+        zhandle_t *zk = zookeeper_init_sasl(hostPorts, watcher, 10000, NULL,
+            &ctx, /*flags*/0, /*log_callback*/NULL, &sasl_params);
+        ctx.zh = zk;
+        CPPUNIT_ASSERT(zk);
+
+        // Wait for SASL auth to complete and handle to be connected.
+        CPPUNIT_ASSERT(ctx.waitForConnected(zk));
+
+        // Leave mark.
+        char pathbuf[80];
+        struct Stat stat_a = {0};
+        rc = zoo_create2(zk, path, "", 0,
+            &ZOO_OPEN_ACL_UNSAFE, 0, pathbuf, sizeof(pathbuf), &stat_a);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
+
+        // Stop and restart the server to test automatic reconnect & re-auth.
+        stopServer();
+        CPPUNIT_ASSERT(ctx.waitForDisconnected(zk));
+        startServer();
+
+        // Wait for automatic SASL re-auth to complete.
+        CPPUNIT_ASSERT(ctx.waitForConnected(zk));
+
+        // Check mark left above.
+        rc = zoo_exists(zk, path, /*watch*/false, &stat_a);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
+
+        stopServer();
+    }
+
+    void testClientSASL() {
+        testClientSASLHelper(hostPorts, "/clientSASL");
+    }
+
+    void testClientSASLOverIPv6() {
+        const char *ipAndPort = "::1:22181";
+
+        testClientSASLHelper(ipAndPort, "/clientSASLOverIPv6");
+    }
+
+    void testClientSASLReadOnly() {
+        startServer(/*useJaasConf*/ true, /*readOnly*/ true);
+
+        // Initialize Cyrus SASL.
+        CPPUNIT_ASSERT_EQUAL(sasl_client_init(NULL), SASL_OK);
+
+        // Initialize SASL parameters.
+        zoo_sasl_params_t sasl_params = { 0 };
+
+        sasl_params.service = "zookeeper";
+        sasl_params.host = "zk-sasl-md5";
+        sasl_params.mechlist = "DIGEST-MD5";
+        sasl_params.callbacks = zoo_sasl_make_basic_callbacks(
+            "myuser", NULL, "Zookeeper_SASLAuth.password");
+
+        // Connect.
+        watchctx_t ctx;
+        int rc = 0;
+        zhandle_t *zk = zookeeper_init_sasl(hostPorts, watcher, 10000, NULL,
+            &ctx, /*flags*/ZOO_READONLY, /*log_callback*/NULL, &sasl_params);
+        ctx.zh = zk;
+        CPPUNIT_ASSERT(zk);
+
+        // Wait for SASL auth to complete and handle to be connected.
+        CPPUNIT_ASSERT(ctx.waitForConnected(zk));
+
+        // Assert can read.
+        char buf[1024];
+        int len = sizeof(buf);
+        rc = zoo_get(zk, "/", 0, buf, &len, 0);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
+
+        // Assert can not write.
+        char path[1024];
+        rc = zoo_create(zk, "/test", "hello", 5, &ZOO_OPEN_ACL_UNSAFE, 0, path, sizeof(path));
+        CPPUNIT_ASSERT_EQUAL((int)ZNOTREADONLY, rc);
+
+        stopServer();
+    }
+
+#endif /* HAVE_CYRUS_SASL_H */
+};
+
+const char Zookeeper_SASLAuth::hostPorts[] = "127.0.0.1:22181";
+
+const char Zookeeper_SASLAuth::jaasConf[] =
+  "Server {\n"
+  "  org.apache.zookeeper.server.auth.DigestLoginModule required\n"
+  "    user_myuser=\"mypassword\";\n"
+  "};\n";
+
+CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_SASLAuth);
+
+#endif /* THREADED */
diff --git a/zookeeper-client/zookeeper-client-c/tests/TestServerRequireClientSASLAuth.cc b/zookeeper-client/zookeeper-client-c/tests/TestServerRequireClientSASLAuth.cc
deleted file mode 100644
index 2943888..0000000
--- a/zookeeper-client/zookeeper-client-c/tests/TestServerRequireClientSASLAuth.cc
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <cppunit/extensions/HelperMacros.h>
-#include "CppAssertHelper.h"
-
-#include <sys/socket.h>
-#include <unistd.h>
-
-#include <zookeeper.h>
-
-#include "Util.h"
-#include "WatchUtil.h"
-
-ZOOAPI int zoo_create2(zhandle_t *zh, const char *path, const char *value,
-        int valuelen, const struct ACL_vector *acl, int mode,
-        char *path_buffer, int path_buffer_len, struct Stat *stat);
-
-class Zookeeper_serverRequireClientSASL : public CPPUNIT_NS::TestFixture {
-    CPPUNIT_TEST_SUITE(Zookeeper_serverRequireClientSASL);
-#ifdef THREADED
-    CPPUNIT_TEST(testServerRequireClientSASL);
-#endif
-    CPPUNIT_TEST_SUITE_END();
-    FILE *logfile;
-    static const char hostPorts[];
-    static void watcher(zhandle_t *, int type, int state, const char *path,void*v){
-        watchctx_t *ctx = (watchctx_t*)v;
-
-        if (state == ZOO_CONNECTED_STATE) {
-            ctx->connected = true;
-        } else {
-            ctx->connected = false;
-        }
-        if (type != ZOO_SESSION_EVENT) {
-            evt_t evt;
-            evt.path = path;
-            evt.type = type;
-            ctx->putEvent(evt);
-        }
-    }
-
-public:
-    Zookeeper_serverRequireClientSASL() {
-      logfile = openlogfile("Zookeeper_serverRequireClientSASL");
-    }
-
-    ~Zookeeper_serverRequireClientSASL() {
-      if (logfile) {
-        fflush(logfile);
-        fclose(logfile);
-        logfile = 0;
-      }
-    }
-
-    void setUp() {
-        zoo_set_log_stream(logfile);
-    }
-
-    void startServer() {
-        char cmd[1024];
-        sprintf(cmd, "%s startRequireSASLAuth", ZKSERVER_CMD);
-        CPPUNIT_ASSERT(system(cmd) == 0);
-    }
-
-    void stopServer() {
-        char cmd[1024];
-        sprintf(cmd, "%s stop", ZKSERVER_CMD);
-        CPPUNIT_ASSERT(system(cmd) == 0);
-    }
-
-    void testServerRequireClientSASL() {
-        startServer();
-
-        watchctx_t ctx;
-        int rc = 0;
-        zhandle_t *zk = zookeeper_init(hostPorts, watcher, 10000, 0, &ctx, 0);
-        ctx.zh = zk;
-        CPPUNIT_ASSERT(zk);
-
-        char pathbuf[80];
-        struct Stat stat_a = {0};
-
-        rc = zoo_create2(zk, "/serverRequireClientSASL", "", 0,
-                         &ZOO_OPEN_ACL_UNSAFE, 0, pathbuf, sizeof(pathbuf), &stat_a);
-        CPPUNIT_ASSERT_EQUAL((int)ZSESSIONCLOSEDREQUIRESASLAUTH, rc);
-
-        stopServer();
-    }
-};
-
-const char Zookeeper_serverRequireClientSASL::hostPorts[] = "127.0.0.1:22181";
-
-CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_serverRequireClientSASL);
diff --git a/zookeeper-client/zookeeper-client-c/tests/zkServer.sh b/zookeeper-client/zookeeper-client-c/tests/zkServer.sh
index 432786c..99b716a 100755
--- a/zookeeper-client/zookeeper-client-c/tests/zkServer.sh
+++ b/zookeeper-client/zookeeper-client-c/tests/zkServer.sh
@@ -26,7 +26,7 @@ EXTRA_JVM_ARGS=${EXTRA_JVM_ARGS:-""}
 
 if [ "x$1" == "x" ]
 then
-    echo "USAGE: $0 startClean|start|startCleanReadOnly|startRequireSASLAuth|stop"
+    echo "USAGE: $0 startClean|start|startCleanReadOnly|startRequireSASLAuth [jaasConf] [readOnly]|stop"
     exit 2
 fi
 
@@ -123,14 +123,26 @@ fi
 # ===== initialize JVM arguments
 # =====
 
+read_only=
 PROPERTIES="$EXTRA_JVM_ARGS -Dzookeeper.extendedTypesEnabled=true -Dznode.container.checkIntervalMs=100"
 if [ "x$1" == "xstartRequireSASLAuth" ]
 then
     PROPERTIES="-Dzookeeper.sessionRequireClientSASLAuth=true $PROPERTIES"
+    if [ "x$2" != "x" ]
+    then
+        PROPERTIES="$PROPERTIES -Dzookeeper.authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider"
+        PROPERTIES="$PROPERTIES -Djava.security.auth.login.config=$2"
+    fi
+    if [ "x$3" != "x" ]
+    then
+        PROPERTIES="-Dreadonlymode.enabled=true $PROPERTIES"
+        read_only=true
+    fi
 fi
 if [ "x$1" == "xstartCleanReadOnly" ]
 then
     PROPERTIES="-Dreadonlymode.enabled=true $PROPERTIES"
+    read_only=true
 fi
 
 
@@ -177,7 +189,7 @@ start|startClean|startRequireSASLAuth|startCleanReadOnly)
 
     # ===== prepare the configs
     sed "s#TMPDIR#${tmp_dir}#g;s#CERTDIR#${certs_dir}#g;s#MAXCLIENTCONNECTIONS#${ZKMAXCNXNS}#g;s#CLIENTPORT#${ZOOPORT}#g" ${tests_dir}/zoo.cfg > "${tmp_dir}/zoo.cfg"
-    if [ "x$1" == "xstartCleanReadOnly" ]
+    if [ "x$read_only" != "x" ]
     then
         # we can put the new server to read-only mode by starting only a single instance of a three node server
         echo "server.1=localhost:22881:33881" >> ${tmp_dir}/zoo.cfg