You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2017/05/25 11:31:21 UTC

bookkeeper git commit: BOOKKEEPER-391: Support Kerberos authentication of bookkeeper

Repository: bookkeeper
Updated Branches:
  refs/heads/master 7da9ed293 -> 667390d1a


BOOKKEEPER-391: Support Kerberos authentication of bookkeeper

This patch contains a very basic AuthProvider which uses JAAS and so enables the usage or GSSAPI/Kerberos for BookKeeper authentication

Author: eolivelli <eo...@apache.org>
Author: eolivelli <eo...@gmail.com>

Reviewers: Robert (Bobby) Evans <None>, Sijie Guo <None>

Closes #110 from eolivelli/BOOKKEEPER-391-kerberos


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/667390d1
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/667390d1
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/667390d1

Branch: refs/heads/master
Commit: 667390d1a6305c31608130929ba0d68a0b5a4763
Parents: 7da9ed2
Author: Enrico Olivelli <eo...@apache.org>
Authored: Thu May 25 13:31:04 2017 +0200
Committer: eolivelli <eo...@apache.org>
Committed: Thu May 25 13:31:04 2017 +0200

----------------------------------------------------------------------
 bookkeeper-server/pom.xml                       |  16 +-
 .../sasl/JAASCredentialsContainer.java          |  42 +++
 .../bookkeeper/sasl/SASLBookieAuthProvider.java |  73 +++++
 .../sasl/SASLBookieAuthProviderFactory.java     | 217 +++++++++++++++
 .../bookkeeper/sasl/SASLClientAuthProvider.java | 101 +++++++
 .../sasl/SASLClientProviderFactory.java         | 156 +++++++++++
 .../apache/bookkeeper/sasl/SaslClientState.java | 180 ++++++++++++
 .../apache/bookkeeper/sasl/SaslConstants.java   |  88 ++++++
 .../apache/bookkeeper/sasl/SaslServerState.java | 238 ++++++++++++++++
 .../bookkeeper/sasl/TGTRefreshThread.java       | 272 +++++++++++++++++++
 .../bookkeeper/sasl/GSSAPIBookKeeperTest.java   | 256 +++++++++++++++++
 .../sasl/MD5DigestBookKeeperTest.java           | 142 ++++++++++
 .../src/test/resources/jaas_md5.conf            |  30 ++
 13 files changed, 1810 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/667390d1/bookkeeper-server/pom.xml
----------------------------------------------------------------------
diff --git a/bookkeeper-server/pom.xml b/bookkeeper-server/pom.xml
index e5de842..36a9b57 100644
--- a/bookkeeper-server/pom.xml
+++ b/bookkeeper-server/pom.xml
@@ -139,7 +139,7 @@
     <dependency>
       <groupId>commons-io</groupId>
       <artifactId>commons-io</artifactId>
-      <version>2.1</version>
+      <version>2.4</version>
     </dependency>
     <dependency>
       <groupId>net.java.dev.jna</groupId>
@@ -216,10 +216,24 @@
       <artifactId>netty-all</artifactId>
       <version>${netty.version}</version>
     </dependency>
+    <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-minikdc</artifactId>
+        <version>2.7.3</version>
+        <scope>test</scope>
+    </dependency>          
   </dependencies>
   <build>
     <plugins>
       <plugin>
+            <!-- for mini-kdc -->
+            <groupId>org.apache.felix</groupId>
+            <artifactId>maven-bundle-plugin</artifactId>
+            <version>3.2.0</version>
+            <inherited>true</inherited>
+            <extensions>true</extensions>
+      </plugin>
+      <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-shade-plugin</artifactId>
         <version>2.4.3</version>

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/667390d1/bookkeeper-server/src/main/java/org/apache/bookkeeper/sasl/JAASCredentialsContainer.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/sasl/JAASCredentialsContainer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/sasl/JAASCredentialsContainer.java
new file mode 100644
index 0000000..791f106
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/sasl/JAASCredentialsContainer.java
@@ -0,0 +1,42 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.sasl;
+
+import javax.security.auth.Subject;
+import javax.security.auth.login.LoginContext;
+import org.apache.bookkeeper.conf.AbstractConfiguration;
+
+public interface JAASCredentialsContainer {
+
+    Subject getSubject();
+
+    LoginContext getLogin();
+
+    void setLogin(LoginContext login);
+
+    boolean isUsingTicketCache();
+
+    String getPrincipal();
+
+    AbstractConfiguration getConfiguration();
+
+    String getLoginContextName();
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/667390d1/bookkeeper-server/src/main/java/org/apache/bookkeeper/sasl/SASLBookieAuthProvider.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/sasl/SASLBookieAuthProvider.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/sasl/SASLBookieAuthProvider.java
new file mode 100644
index 0000000..488e1f6
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/sasl/SASLBookieAuthProvider.java
@@ -0,0 +1,73 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.sasl;
+
+import java.io.IOException;
+import java.util.regex.Pattern;
+import javax.security.auth.Subject;
+import javax.security.auth.login.LoginException;
+import javax.security.sasl.SaslException;
+import org.apache.bookkeeper.auth.AuthCallbacks;
+import org.apache.bookkeeper.auth.AuthToken;
+import org.apache.bookkeeper.auth.BookieAuthProvider;
+import org.apache.bookkeeper.bookie.BookieConnectionPeer;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SASLBookieAuthProvider implements BookieAuthProvider {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SASLBookieAuthProvider.class);
+
+    private SaslServerState server;
+    private final AuthCallbacks.GenericCallback<Void> completeCb;
+    
+    SASLBookieAuthProvider(BookieConnectionPeer addr, AuthCallbacks.GenericCallback<Void> completeCb,
+        ServerConfiguration serverConfiguration, Subject subject, Pattern allowedIdsPattern) {
+        this.completeCb = completeCb;
+        try {
+            server = new SaslServerState(serverConfiguration, subject, allowedIdsPattern);
+        } catch (IOException | LoginException error) {
+            LOG.error("Error while booting SASL server", error);
+            completeCb.operationComplete(BKException.Code.UnauthorizedAccessException, null);
+        }
+    }
+
+    @Override
+    public void process(AuthToken m, AuthCallbacks.GenericCallback<AuthToken> cb) {
+        try {
+            byte[] clientSideToken = m.getData();
+            byte[] response = server.response(clientSideToken);
+            if (response != null) {
+                cb.operationComplete(BKException.Code.OK, AuthToken.wrap(response));
+            }
+            if (server.isComplete()) {
+                completeCb.operationComplete(BKException.Code.OK, null);
+            }
+        } catch (SaslException err) {
+            LOG.debug("SASL error", err);
+            completeCb.operationComplete(BKException.Code.UnauthorizedAccessException, null);
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/667390d1/bookkeeper-server/src/main/java/org/apache/bookkeeper/sasl/SASLBookieAuthProviderFactory.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/sasl/SASLBookieAuthProviderFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/sasl/SASLBookieAuthProviderFactory.java
new file mode 100644
index 0000000..23a29a2
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/sasl/SASLBookieAuthProviderFactory.java
@@ -0,0 +1,217 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.sasl;
+
+import java.io.IOException;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+import javax.security.auth.Subject;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.kerberos.KerberosTicket;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.SaslException;
+import org.apache.bookkeeper.auth.AuthCallbacks;
+import org.apache.bookkeeper.bookie.BookieConnectionPeer;
+import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BookieAuthProvider which uses JDK-bundled SASL
+ */
+public class SASLBookieAuthProviderFactory implements org.apache.bookkeeper.auth.BookieAuthProvider.Factory,
+    JAASCredentialsContainer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SASLBookieAuthProviderFactory.class);
+
+    private Pattern allowedIdsPattern;
+    private ServerConfiguration serverConfiguration;
+    private Subject subject;
+    private boolean isKrbTicket;
+    private boolean isUsingTicketCache;
+    private String principal;
+    private String loginContextName;
+    private LoginContext login;
+    private TGTRefreshThread ticketRefreshThread;
+
+    @Override
+    public void init(ServerConfiguration conf) throws IOException {
+        this.serverConfiguration = conf;
+
+        final String allowedIdsPatternRegExp = conf.getString(SaslConstants.JAAS_CLIENT_ALLOWED_IDS,
+            SaslConstants.JAAS_CLIENT_ALLOWED_IDS_DEFAULT);
+        try {
+            this.allowedIdsPattern = Pattern.compile(allowedIdsPatternRegExp);
+        } catch (PatternSyntaxException error) {
+            LOG.error("Invalid regular expression " + allowedIdsPatternRegExp, error);
+            throw new IOException(error);
+        }
+
+        try {
+            loginContextName = serverConfiguration.getString(SaslConstants.JAAS_BOOKIE_SECTION_NAME,
+                SaslConstants.JAAS_DEFAULT_BOOKIE_SECTION_NAME);
+
+            this.login = loginServer();
+            this.subject = login.getSubject();
+            this.isKrbTicket = !subject.getPrivateCredentials(KerberosTicket.class).isEmpty();
+            if (isKrbTicket) {
+                this.isUsingTicketCache = SaslConstants.isUsingTicketCache(loginContextName);
+                this.principal = SaslConstants.getPrincipal(loginContextName);
+                this.ticketRefreshThread = new TGTRefreshThread(this);
+                ticketRefreshThread.start();
+            }
+        } catch (SaslException | LoginException error) {
+            throw new IOException(error);
+        }
+    }
+
+    @Override
+    public org.apache.bookkeeper.auth.BookieAuthProvider newProvider(BookieConnectionPeer addr,
+        AuthCallbacks.GenericCallback<Void> completeCb) {
+        return new SASLBookieAuthProvider(addr, completeCb, serverConfiguration,
+            subject, allowedIdsPattern);
+    }
+
+    @Override
+    public String getPluginName() {
+        return SaslConstants.PLUGIN_NAME;
+    }
+
+    @Override
+    public void close() {
+        if (ticketRefreshThread != null) {
+            ticketRefreshThread.interrupt();
+            try {
+                ticketRefreshThread.join(10000);
+            } catch (InterruptedException exit) {
+                LOG.debug("interrupted while waiting for TGT reresh thread to stop", exit);
+            }
+        }
+    }
+
+    @Override
+    public Subject getSubject() {
+        return subject;
+    }
+
+    @Override
+    public LoginContext getLogin() {
+        return login;
+    }
+
+    @Override
+    public void setLogin(LoginContext login) {
+        this.login = login;
+    }
+
+    @Override
+    public boolean isUsingTicketCache() {
+        return isUsingTicketCache;
+    }
+
+    @Override
+    public String getPrincipal() {
+        return principal;
+    }
+
+    @Override
+    public AbstractConfiguration getConfiguration() {
+        return serverConfiguration;
+    }
+
+    @Override
+    public String getLoginContextName() {
+        return loginContextName;
+    }
+
+    private LoginContext loginServer() throws SaslException, LoginException {
+
+        AppConfigurationEntry[] entries = Configuration.getConfiguration()
+            .getAppConfigurationEntry(loginContextName);
+        if (entries == null) {
+            LOG.info("JAAS not configured or no "
+                + loginContextName + " present in JAAS Configuration file");
+            return null;
+        }
+        LoginContext loginContext = new LoginContext(loginContextName, new ClientCallbackHandler(null));
+        loginContext.login();
+        return loginContext;
+
+    }
+
+    private static class ClientCallbackHandler implements CallbackHandler {
+
+        private String password = null;
+
+        public ClientCallbackHandler(String password) {
+            this.password = password;
+        }
+
+        @Override
+        public void handle(Callback[] callbacks) throws
+            UnsupportedCallbackException {
+            for (Callback callback : callbacks) {
+                if (callback instanceof NameCallback) {
+                    NameCallback nc = (NameCallback) callback;
+                    nc.setName(nc.getDefaultName());
+                } else {
+                    if (callback instanceof PasswordCallback) {
+                        PasswordCallback pc = (PasswordCallback) callback;
+                        if (password != null) {
+                            pc.setPassword(this.password.toCharArray());
+                        }
+                    } else {
+                        if (callback instanceof RealmCallback) {
+                            RealmCallback rc = (RealmCallback) callback;
+                            rc.setText(rc.getDefaultText());
+                        } else {
+                            if (callback instanceof AuthorizeCallback) {
+                                AuthorizeCallback ac = (AuthorizeCallback) callback;
+                                String authid = ac.getAuthenticationID();
+                                String authzid = ac.getAuthorizationID();
+                                if (authid.equals(authzid)) {
+                                    ac.setAuthorized(true);
+                                } else {
+                                    ac.setAuthorized(false);
+                                }
+                                if (ac.isAuthorized()) {
+                                    ac.setAuthorizedID(authzid);
+                                }
+                            } else {
+                                throw new UnsupportedCallbackException(callback, "Unrecognized SASL ClientCallback");
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/667390d1/bookkeeper-server/src/main/java/org/apache/bookkeeper/sasl/SASLClientAuthProvider.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/sasl/SASLClientAuthProvider.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/sasl/SASLClientAuthProvider.java
new file mode 100644
index 0000000..48806dd
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/sasl/SASLClientAuthProvider.java
@@ -0,0 +1,101 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.sasl;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import javax.security.auth.Subject;
+import javax.security.sasl.SaslException;
+import org.apache.bookkeeper.auth.AuthCallbacks;
+import org.apache.bookkeeper.auth.AuthToken;
+import org.apache.bookkeeper.auth.ClientAuthProvider;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.ClientConnectionPeer;
+import org.slf4j.LoggerFactory;
+
+public class SASLClientAuthProvider implements ClientAuthProvider {
+
+    private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(SASLClientAuthProvider.class);
+
+    private SaslClientState client;
+    private final AuthCallbacks.GenericCallback<Void> completeCb;
+
+    SASLClientAuthProvider(ClientConnectionPeer addr, AuthCallbacks.GenericCallback<Void> completeCb,
+        Subject subject) {
+        this.completeCb = completeCb;
+        try {
+            SocketAddress remoteAddr = addr.getRemoteAddr();
+            String hostname;
+            if (remoteAddr instanceof InetSocketAddress) {
+                InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddr;
+                hostname = inetSocketAddress.getHostName();
+            } else {
+                hostname = InetAddress.getLocalHost().getHostName();
+            }
+            client = new SaslClientState(hostname, subject);
+            LOG.debug("SASLClientAuthProvider Boot " + client + " for " + hostname);
+        } catch (IOException error) {
+            LOG.error("Error while booting SASL client", error);
+            completeCb.operationComplete(BKException.Code.UnauthorizedAccessException, null);
+        }
+    }
+
+    @Override
+    public void init(AuthCallbacks.GenericCallback<AuthToken> cb) {
+        try {
+            if (client.hasInitialResponse()) {
+                byte[] response = client.evaluateChallenge(new byte[0]);
+                cb.operationComplete(BKException.Code.OK, AuthToken.wrap(response));
+            } else {
+                cb.operationComplete(BKException.Code.OK, AuthToken.wrap(new byte[0]));
+            }
+        } catch (SaslException err) {
+            LOG.error("Error on SASL client", err);
+            completeCb.operationComplete(BKException.Code.UnauthorizedAccessException, null);
+        }
+    }
+
+    @Override
+    public void process(AuthToken m, AuthCallbacks.GenericCallback<AuthToken> cb) {
+        if (client.isComplete()) {
+            completeCb.operationComplete(BKException.Code.OK, null);
+            return;
+        }
+        try {
+            byte[] responseToken = m.getData();
+            byte[] response = client.evaluateChallenge(responseToken);
+            if (response == null) {
+                response = new byte[0];
+            }
+            cb.operationComplete(BKException.Code.OK, AuthToken.wrap(response));
+            if (client.isComplete()) {
+                completeCb.operationComplete(BKException.Code.OK, null);
+            }
+        } catch (SaslException err) {
+            LOG.error("Error on SASL client", err);
+            completeCb.operationComplete(BKException.Code.UnauthorizedAccessException, null);
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/667390d1/bookkeeper-server/src/main/java/org/apache/bookkeeper/sasl/SASLClientProviderFactory.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/sasl/SASLClientProviderFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/sasl/SASLClientProviderFactory.java
new file mode 100644
index 0000000..5f20793
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/sasl/SASLClientProviderFactory.java
@@ -0,0 +1,156 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.sasl;
+
+import java.io.IOException;
+import javax.security.auth.Subject;
+import javax.security.auth.kerberos.KerberosTicket;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+import javax.security.sasl.SaslException;
+import org.apache.bookkeeper.auth.AuthCallbacks;
+import org.apache.bookkeeper.auth.ClientAuthProvider;
+import org.apache.bookkeeper.client.ClientConnectionPeer;
+import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ClientAuthProvider which uses JDK-bundled SASL
+ */
+public class SASLClientProviderFactory implements
+    org.apache.bookkeeper.auth.ClientAuthProvider.Factory, JAASCredentialsContainer {
+
+    private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(SASLClientProviderFactory.class);
+
+    private ClientConfiguration clientConfiguration;
+    private LoginContext login;
+    private Subject subject;
+    private String principal;
+    private boolean isKrbTicket;
+    private boolean isUsingTicketCache;
+    private String loginContextName;
+    private TGTRefreshThread ticketRefreshThread;
+
+    @Override
+    public void init(ClientConfiguration conf) throws IOException {
+        this.clientConfiguration = conf;
+        try {
+
+            this.login = loginClient();
+            this.subject = login.getSubject();
+            this.isKrbTicket = !subject.getPrivateCredentials(KerberosTicket.class).isEmpty();
+            boolean systemRole = ClientConfiguration.CLIENT_ROLE_SYSTEM.equals(clientConfiguration.getClientRole());
+            this.loginContextName = systemRole
+                ? clientConfiguration.getString(SaslConstants.JAAS_AUDITOR_SECTION_NAME, SaslConstants.JAAS_DEFAULT_AUDITOR_SECTION_NAME)
+                : clientConfiguration.getString(SaslConstants.JAAS_CLIENT_SECTION_NAME, SaslConstants.JAAS_DEFAULT_CLIENT_SECTION_NAME);
+            if (isKrbTicket) {
+                this.isUsingTicketCache = SaslConstants.isUsingTicketCache(loginContextName);
+                this.principal = SaslConstants.getPrincipal(loginContextName);
+                ticketRefreshThread = new TGTRefreshThread(this);
+                ticketRefreshThread.start();
+            }
+        } catch (SaslException | LoginException error) {
+            throw new IOException(error);
+        }
+    }
+
+    @Override
+    public ClientAuthProvider newProvider(ClientConnectionPeer addr, AuthCallbacks.GenericCallback<Void> completeCb) {
+        return new SASLClientAuthProvider(addr, completeCb, subject);
+    }
+
+    @Override
+    public String getPluginName() {
+        return SaslConstants.PLUGIN_NAME;
+    }
+
+    private LoginContext loginClient() throws SaslException, LoginException {
+        boolean systemRole = ClientConfiguration.CLIENT_ROLE_SYSTEM.equals(clientConfiguration.getClientRole());
+        String configurationEntry = systemRole
+            ? clientConfiguration.getString(SaslConstants.JAAS_AUDITOR_SECTION_NAME, SaslConstants.JAAS_DEFAULT_AUDITOR_SECTION_NAME)
+            : clientConfiguration.getString(SaslConstants.JAAS_CLIENT_SECTION_NAME, SaslConstants.JAAS_DEFAULT_CLIENT_SECTION_NAME);
+        AppConfigurationEntry[] entries = Configuration.getConfiguration()
+            .getAppConfigurationEntry(configurationEntry);
+        if (entries == null) {
+            LOG.info("No JAAS Configuration found with section BookKeeper");
+            return null;
+        }
+        try {
+            LoginContext loginContext = new LoginContext(configurationEntry, new SaslClientState.ClientCallbackHandler(null));
+            loginContext.login();
+            return loginContext;
+        } catch (LoginException error) {
+            LOG.error("Error JAAS Configuration subject", error);
+            return null;
+        }
+    }
+
+    @Override
+    public void close() {
+        if (ticketRefreshThread != null) {
+            ticketRefreshThread.interrupt();
+            try {
+                ticketRefreshThread.join(10000);
+            } catch (InterruptedException exit) {
+                LOG.debug("interrupted while waiting for TGT reresh thread to stop", exit);
+            }
+        }
+    }
+
+    @Override
+    public LoginContext getLogin() {
+        return login;
+    }
+
+    @Override
+    public void setLogin(LoginContext login) {
+        this.login = login;
+    }
+
+    @Override
+    public Subject getSubject() {
+        return subject;
+    }
+
+    @Override
+    public boolean isUsingTicketCache() {
+        return isUsingTicketCache;
+    }
+
+    @Override
+    public String getPrincipal() {
+        return principal;
+    }
+
+    @Override
+    public AbstractConfiguration getConfiguration() {
+        return clientConfiguration;
+    }
+
+    @Override
+    public String getLoginContextName() {
+        return loginContextName;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/667390d1/bookkeeper-server/src/main/java/org/apache/bookkeeper/sasl/SaslClientState.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/sasl/SaslClientState.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/sasl/SaslClientState.java
new file mode 100644
index 0000000..4121f87
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/sasl/SaslClientState.java
@@ -0,0 +1,180 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.sasl;
+
+import java.security.Principal;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import javax.security.auth.Subject;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import org.apache.zookeeper.server.auth.KerberosName;
+import org.slf4j.LoggerFactory;
+
+public class SaslClientState {
+
+    private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(SaslClientState.class);
+
+    private final SaslClient saslClient;
+    private final Subject clientSubject;
+    private String username;
+    private String password;
+
+    public SaslClientState(String serverHostname, Subject subject) throws SaslException {
+        String serverPrincipal = SaslConstants.SASL_BOOKKEEPER_PROTOCOL + "/" + serverHostname;
+        this.clientSubject = subject;
+        if (clientSubject == null) {
+            throw new SaslException("Cannot create JAAS Sujbect for SASL");
+        }
+        if (clientSubject.getPrincipals().isEmpty()) {
+            LOG.debug("Using JAAS/SASL/DIGEST-MD5 auth to connect to {}", serverPrincipal);
+            String[] mechs = {"DIGEST-MD5"};
+            username = (String) (clientSubject.getPublicCredentials().toArray()[0]);
+            password = (String) (clientSubject.getPrivateCredentials().toArray()[0]);
+            saslClient = Sasl.createSaslClient(mechs, username, SaslConstants.SASL_BOOKKEEPER_PROTOCOL,
+                SaslConstants.SASL_MD5_DUMMY_HOSTNAME, null, new ClientCallbackHandler(password));
+        } else { // GSSAPI/Kerberos
+            final Object[] principals = clientSubject.getPrincipals().toArray();
+            final Principal clientPrincipal = (Principal) principals[0];
+            final KerberosName clientKerberosName = new KerberosName(clientPrincipal.getName());
+            KerberosName serviceKerberosName = new KerberosName(serverPrincipal + "@" + clientKerberosName.getRealm());
+            final String serviceName = serviceKerberosName.getServiceName();
+            final String serviceHostname = serviceKerberosName.getHostName();
+            final String clientPrincipalName = clientKerberosName.toString();
+            LOG.debug("Using JAAS/SASL/GSSAPI auth to connect to server Principal {}", serverPrincipal);
+            try {
+                saslClient = Subject.doAs(clientSubject, new PrivilegedExceptionAction<SaslClient>() {
+                    @Override
+                    public SaslClient run() throws SaslException {
+                        String[] mechs = {"GSSAPI"};
+                        return Sasl.createSaslClient(mechs, clientPrincipalName, serviceName, serviceHostname, null,
+                            new ClientCallbackHandler(null));
+                    }
+                });
+            } catch (PrivilegedActionException err) {
+                LOG.debug("GSSAPI client error", err.getCause());
+                throw new SaslException("error while booting GSSAPI client", err.getCause());
+            }
+        }
+        if (saslClient == null) {
+            throw new SaslException("Cannot create JVM SASL Client");
+        }
+
+    }
+
+    public byte[] evaluateChallenge(final byte[] saslToken) throws SaslException {
+        if (saslToken == null) {
+            throw new SaslException("saslToken is null");
+        }
+        if (clientSubject != null) {
+            try {
+                final byte[] retval
+                    = Subject.doAs(clientSubject, new PrivilegedExceptionAction<byte[]>() {
+                        @Override
+                        public byte[] run() throws SaslException {
+                            return saslClient.evaluateChallenge(saslToken);
+                        }
+                    });
+                return retval;
+            } catch (PrivilegedActionException e) {
+                LOG.debug("SASL error", e.getCause());
+                throw new SaslException("SASL/JAAS error", e.getCause());
+            }
+        } else {
+            return saslClient.evaluateChallenge(saslToken);
+        }
+    }
+
+    public boolean hasInitialResponse() {
+        return saslClient.hasInitialResponse();
+    }
+
+    static class ClientCallbackHandler implements CallbackHandler {
+
+        private String password = null;
+
+        public ClientCallbackHandler(String password) {
+            this.password = password;
+        }
+
+        @Override
+        public void handle(Callback[] callbacks) throws
+            UnsupportedCallbackException {
+            for (Callback callback : callbacks) {
+                if (callback instanceof NameCallback) {
+                    NameCallback nc = (NameCallback) callback;
+                    nc.setName(nc.getDefaultName());
+                } else {
+                    if (callback instanceof PasswordCallback) {
+                        PasswordCallback pc = (PasswordCallback) callback;
+                        if (password != null) {
+                            pc.setPassword(this.password.toCharArray());
+                        }
+                    } else {
+                        if (callback instanceof RealmCallback) {
+                            RealmCallback rc = (RealmCallback) callback;
+                            rc.setText(rc.getDefaultText());
+                        } else {
+                            if (callback instanceof AuthorizeCallback) {
+                                AuthorizeCallback ac = (AuthorizeCallback) callback;
+                                String authid = ac.getAuthenticationID();
+                                String authzid = ac.getAuthorizationID();
+                                if (authid.equals(authzid)) {
+                                    ac.setAuthorized(true);
+                                } else {
+                                    ac.setAuthorized(false);
+                                }
+                                if (ac.isAuthorized()) {
+                                    ac.setAuthorizedID(authzid);
+                                }
+                            } else {
+                                throw new UnsupportedCallbackException(callback, "Unrecognized SASL ClientCallback");
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    public boolean isComplete() {
+        return saslClient.isComplete();
+    }
+
+    public byte[] saslResponse(byte[] saslTokenMessage) {
+        try {
+            byte[] retval = saslClient.evaluateChallenge(saslTokenMessage);
+            return retval;
+        } catch (SaslException e) {
+            LOG.debug("saslResponse: Failed to respond to SASL server's token:", e);
+            return null;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/667390d1/bookkeeper-server/src/main/java/org/apache/bookkeeper/sasl/SaslConstants.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/sasl/SaslConstants.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/sasl/SaslConstants.java
new file mode 100644
index 0000000..9688b70
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/sasl/SaslConstants.java
@@ -0,0 +1,88 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.sasl;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+
+public class SaslConstants {
+
+    static final String PLUGIN_NAME = "sasl";
+
+    public static final String JAAS_BOOKIE_SECTION_NAME = "saslJaasBookieSectionName";
+    public static final String JAAS_DEFAULT_BOOKIE_SECTION_NAME = "Bookie";
+
+    public static final String JAAS_AUDITOR_SECTION_NAME = "saslJaasAuditorSectionName";
+    public static final String JAAS_DEFAULT_AUDITOR_SECTION_NAME = "Auditor";
+
+    public static final String JAAS_CLIENT_SECTION_NAME = "saslJaasClientSectionName";
+    public static final String JAAS_DEFAULT_CLIENT_SECTION_NAME = "BookKeeper";
+
+    /**
+     * This is a regexp which limits the range of possible ids which can connect to the Bookie using SASL
+     * By default only clients whose id contains the word 'bookkeeper' are allowed to connect
+     */
+    public static final String JAAS_CLIENT_ALLOWED_IDS = "saslJaasClientAllowedIds";
+    public static final String JAAS_CLIENT_ALLOWED_IDS_DEFAULT = ".*bookkeeper.*";
+
+    static final String KINIT_COMMAND_DEFAULT = "/usr/bin/kinit";
+
+    static final String KINIT_COMMAND = "kerberos.kinit";
+
+    static final String SASL_BOOKKEEPER_PROTOCOL = "bookkeeper";
+    static final String SASL_BOOKKEEPER_REALM = "bookkeeper";
+
+    static final String SASL_MD5_DUMMY_HOSTNAME = "bookkeeper";
+
+    static boolean isUsingTicketCache(String configurationEntry) {
+
+        AppConfigurationEntry[] entries = Configuration.getConfiguration()
+            .getAppConfigurationEntry(configurationEntry);
+        if (entries == null) {
+            return false;
+        }
+        for (AppConfigurationEntry entry : entries) {
+            // there will only be a single entry, so this for() loop will only be iterated through once.
+            if (entry.getOptions().get("useTicketCache") != null) {
+                String val = (String) entry.getOptions().get("useTicketCache");
+                if (val.equals("true")) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    static String getPrincipal(String configurationEntry) {
+
+        AppConfigurationEntry[] entries = Configuration.getConfiguration()
+            .getAppConfigurationEntry(configurationEntry);
+        if (entries == null) {
+            return null;
+        }
+        for (AppConfigurationEntry entry : entries) {
+            if (entry.getOptions().get("principal") != null) {
+                return (String) entry.getOptions().get("principal");
+            }
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/667390d1/bookkeeper-server/src/main/java/org/apache/bookkeeper/sasl/SaslServerState.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/sasl/SaslServerState.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/sasl/SaslServerState.java
new file mode 100644
index 0000000..776bf3e
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/sasl/SaslServerState.java
@@ -0,0 +1,238 @@
+/*
+ Licensed to Diennea S.r.l. under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. Diennea S.r.l. licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+ */
+package org.apache.bookkeeper.sasl;
+
+import java.io.IOException;
+import java.security.Principal;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Pattern;
+import javax.security.auth.Subject;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.zookeeper.server.auth.KerberosName;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Server side Sasl implementation
+ */
+public class SaslServerState {
+
+    private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(SaslServerState.class);
+
+    private final SaslServer saslServer;
+    private final Pattern allowedIdsPattern;
+
+    public SaslServerState(
+        ServerConfiguration serverConfiguration, Subject subject, Pattern allowedIdsPattern)
+        throws IOException, SaslException, LoginException {
+        this.allowedIdsPattern = allowedIdsPattern;
+        saslServer = createSaslServer(subject, serverConfiguration);
+    }
+
+    private SaslServer createSaslServer(final Subject subject, ServerConfiguration serverConfiguration)
+        throws SaslException, IOException {
+
+        SaslServerCallbackHandler callbackHandler = new SaslServerCallbackHandler(Configuration.getConfiguration(),
+            serverConfiguration);
+        if (subject.getPrincipals().size() > 0) {
+            try {
+                final Object[] principals = subject.getPrincipals().toArray();
+                final Principal servicePrincipal = (Principal) principals[0];
+                LOG.debug("Authentication will use SASL/JAAS/Kerberos, servicePrincipal is {}", servicePrincipal);
+
+                final String servicePrincipalNameAndHostname = servicePrincipal.getName();
+                int indexOf = servicePrincipalNameAndHostname.indexOf("/");
+                final String serviceHostnameAndKerbDomain = servicePrincipalNameAndHostname.substring(indexOf + 1,
+                    servicePrincipalNameAndHostname.length());
+                int indexOfAt = serviceHostnameAndKerbDomain.indexOf("@");
+
+                final String servicePrincipalName, serviceHostname;
+                if (indexOf > 0) {
+                    servicePrincipalName = servicePrincipalNameAndHostname.substring(0, indexOf);
+                    serviceHostname = serviceHostnameAndKerbDomain.substring(0, indexOfAt);
+                } else {
+                    servicePrincipalName = servicePrincipalNameAndHostname.substring(0, indexOfAt);
+                    serviceHostname = null;
+                }
+
+                try {
+                    return Subject.doAs(subject, new PrivilegedExceptionAction<SaslServer>() {
+                        @Override
+                        public SaslServer run() {
+                            try {
+                                SaslServer saslServer;
+                                saslServer = Sasl.createSaslServer("GSSAPI", servicePrincipalName, serviceHostname, null,
+                                    callbackHandler);
+                                return saslServer;
+                            } catch (SaslException e) {
+                                throw new RuntimeException(e);
+                            }
+                        }
+                    }
+                    );
+                } catch (PrivilegedActionException e) {
+                    throw new SaslException("error on GSSAPI boot", e.getCause());
+                }
+            } catch (IndexOutOfBoundsException e) {
+                throw new SaslException("error on GSSAPI boot", e);
+            }
+        } else {
+            LOG.debug("Authentication will use SASL/JAAS/DIGEST-MD5");
+            return Sasl.createSaslServer("DIGEST-MD5", SaslConstants.SASL_BOOKKEEPER_PROTOCOL,
+                SaslConstants.SASL_MD5_DUMMY_HOSTNAME, null, callbackHandler);
+        }
+    }
+
+    public boolean isComplete() {
+        return saslServer.isComplete();
+    }
+
+    public String getUserName() {
+        return saslServer.getAuthorizationID();
+    }
+
+    public byte[] response(byte[] token) throws SaslException {
+        try {
+            byte[] retval = saslServer.evaluateResponse(token);
+            return retval;
+        } catch (SaslException e) {
+            LOG.error("response: Failed to evaluate client token", e);
+            throw e;
+        }
+    }
+
+    private class SaslServerCallbackHandler implements CallbackHandler {
+
+        private static final String USER_PREFIX = "user_";
+
+        private String userName;
+        private final Map<String, String> credentials = new HashMap<>();
+
+        public SaslServerCallbackHandler(Configuration configuration, ServerConfiguration serverConfiguration) throws IOException {
+            String configurationEntry = serverConfiguration.getString(SaslConstants.JAAS_BOOKIE_SECTION_NAME,
+                SaslConstants.JAAS_DEFAULT_BOOKIE_SECTION_NAME);
+            AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(configurationEntry);
+
+            if (configurationEntries == null) {
+                String errorMessage = "Could not find a '" + configurationEntry + "' entry in this configuration: Server cannot start.";
+
+                throw new IOException(errorMessage);
+            }
+            credentials.clear();
+            for (AppConfigurationEntry entry : configurationEntries) {
+                Map<String, ?> options = entry.getOptions();
+                // Populate DIGEST-MD5 user -> password map with JAAS configuration entries from the "Server" section.
+                // Usernames are distinguished from other options by prefixing the username with a "user_" prefix.
+                for (Map.Entry<String, ?> pair : options.entrySet()) {
+                    String key = pair.getKey();
+                    if (key.startsWith(USER_PREFIX)) {
+                        String userName = key.substring(USER_PREFIX.length());
+                        credentials.put(userName, (String) pair.getValue());
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
+            for (Callback callback : callbacks) {
+                if (callback instanceof NameCallback) {
+                    handleNameCallback((NameCallback) callback);
+                } else if (callback instanceof PasswordCallback) {
+                    handlePasswordCallback((PasswordCallback) callback);
+                } else if (callback instanceof RealmCallback) {
+                    handleRealmCallback((RealmCallback) callback);
+                } else if (callback instanceof AuthorizeCallback) {
+                    handleAuthorizeCallback((AuthorizeCallback) callback);
+                }
+            }
+        }
+
+        private void handleNameCallback(NameCallback nc) {
+            // check to see if this user is in the user password database.
+            if (credentials.get(nc.getDefaultName()) == null) {
+                LOG.error("User '" + nc.getDefaultName() + "' not found in list of JAAS DIGEST-MD5 users.");
+                return;
+            }
+            nc.setName(nc.getDefaultName());
+            userName = nc.getDefaultName();
+        }
+
+        private void handlePasswordCallback(PasswordCallback pc) {
+            if (credentials.containsKey(userName)) {
+                pc.setPassword(credentials.get(userName).toCharArray());
+            } else {
+                LOG.info("No password found for user: " + userName);
+            }
+        }
+
+        private void handleRealmCallback(RealmCallback rc) {
+            LOG.debug("client supplied realm: " + rc.getDefaultText());
+            rc.setText(rc.getDefaultText());
+        }
+
+        private void handleAuthorizeCallback(AuthorizeCallback ac) {
+            String authenticationID = ac.getAuthenticationID();
+            String authorizationID = ac.getAuthorizationID();
+            if (!authenticationID.equals(authorizationID)) {
+                ac.setAuthorized(false);
+                LOG.info("Forbidden access to client: authenticationID=" + authenticationID
+                    + " is different from authorizationID=" + authorizationID + ".");
+                return;
+            }
+            if (!allowedIdsPattern.matcher(authenticationID).matches()) {
+                ac.setAuthorized(false);
+                LOG.info("Forbidden access to client: authenticationID=" + authenticationID
+                    + " is not allowed (see " + SaslConstants.JAAS_CLIENT_ALLOWED_IDS + " property)");
+                return;
+            }
+            ac.setAuthorized(true);
+
+            LOG.debug("Successfully authenticated client: authenticationID=" + authenticationID
+                + ";  authorizationID=" + authorizationID + ".");
+
+            KerberosName kerberosName = new KerberosName(authenticationID);
+            try {
+                StringBuilder userNameBuilder = new StringBuilder(kerberosName.getShortName());
+                userNameBuilder.append("/").append(kerberosName.getHostName());
+                userNameBuilder.append("@").append(kerberosName.getRealm());
+                LOG.debug("Setting authorizedID: " + userNameBuilder);
+                ac.setAuthorizedID(userNameBuilder.toString());
+            } catch (IOException e) {
+                LOG.error("Failed to set name based on Kerberos authentication rules.");
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/667390d1/bookkeeper-server/src/main/java/org/apache/bookkeeper/sasl/TGTRefreshThread.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/sasl/TGTRefreshThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/sasl/TGTRefreshThread.java
new file mode 100644
index 0000000..e8fbaa0
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/sasl/TGTRefreshThread.java
@@ -0,0 +1,272 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.sasl;
+
+import java.util.Date;
+import java.util.Random;
+import java.util.Set;
+import javax.security.auth.kerberos.KerberosPrincipal;
+import javax.security.auth.kerberos.KerberosTicket;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+import org.apache.zookeeper.Login;
+import org.apache.zookeeper.Shell;
+import org.apache.zookeeper.common.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// copied from Apache ZooKeeper TGT refresh logic
+class TGTRefreshThread extends Thread {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TGTRefreshThread.class);
+    private static final Random rng = new Random();
+
+    private long lastLogin;
+    private final JAASCredentialsContainer container;
+
+    public long getLastLogin() {
+        return lastLogin;
+    }
+
+    public void setLastLogin(long lastLogin) {
+        this.lastLogin = lastLogin;
+    }
+
+    public TGTRefreshThread(JAASCredentialsContainer container) {
+        this.container = container;
+        // Initialize 'lastLogin' to do a login at first time
+        this.lastLogin = System.currentTimeMillis() - MIN_TIME_BEFORE_RELOGIN;
+        setDaemon(true);
+        setName("bookkeeper-tgt-refresh-thread");
+    } // Initialize 'lastLogin' to do a login at first time
+
+    private synchronized KerberosTicket getTGT() {
+        Set<KerberosTicket> tickets = container.getSubject().getPrivateCredentials(KerberosTicket.class);
+        for (KerberosTicket ticket : tickets) {
+            KerberosPrincipal server = ticket.getServer();
+            if (server.getName().equals("krbtgt/" + server.getRealm() + "@" + server.getRealm())) {
+                LOG.debug("Client principal is \"" + ticket.getClient().getName() + "\".");
+                LOG.debug("Server principal is \"" + ticket.getServer().getName() + "\".");
+                return ticket;
+            }
+        }
+        return null;
+    }
+    // LoginThread will sleep until 80% of time from last refresh to
+    // ticket's expiry has been reached, at which time it will wake
+    // and try to renew the ticket.
+    private static final float TICKET_RENEW_WINDOW = 0.80f;
+    /**
+     * Percentage of random jitter added to the renewal time
+     */
+    private static final float TICKET_RENEW_JITTER = 0.05f;
+    // Regardless of TICKET_RENEW_WINDOW setting above and the ticket expiry time,
+    // thread will not sleep between refresh attempts any less than 1 minute (60*1000 milliseconds = 1 minute).
+    // Change the '1' to e.g. 5, to change this to 5 minutes.
+    private static final long MIN_TIME_BEFORE_RELOGIN = 1 * 60 * 1000L;
+
+    private long getRefreshTime(KerberosTicket tgt) {
+        long start = tgt.getStartTime().getTime();
+        long expires = tgt.getEndTime().getTime();
+        LOG.info("TGT valid starting at:        {}", tgt.getStartTime().toString());
+        LOG.info("TGT expires:                  {}", tgt.getEndTime().toString());
+        long proposedRefresh = start
+            + (long) ((expires - start) * (TICKET_RENEW_WINDOW + (TICKET_RENEW_JITTER * rng.nextDouble())));
+        if (proposedRefresh > expires) {
+            // proposedRefresh is too far in the future: it's after ticket expires: simply return now.
+            return Time.currentWallTime();
+        } else {
+            return proposedRefresh;
+        }
+    }
+
+    @Override
+    public void run() {
+        LOG.info("TGT refresh thread started.");
+        while (true) {
+            // renewal thread's main loop. if it exits from here, thread will exit.
+            KerberosTicket tgt = getTGT();
+            long now = Time.currentWallTime();
+            long nextRefresh;
+            Date nextRefreshDate;
+            if (tgt == null) {
+                nextRefresh = now + MIN_TIME_BEFORE_RELOGIN;
+                nextRefreshDate = new Date(nextRefresh);
+                LOG.warn("No TGT found: will try again at {}", nextRefreshDate);
+            } else {
+                nextRefresh = getRefreshTime(tgt);
+                long expiry = tgt.getEndTime().getTime();
+                Date expiryDate = new Date(expiry);
+                if ((container.isUsingTicketCache()) && (tgt.getEndTime().equals(tgt.getRenewTill()))) {
+                    Object[] logPayload = {expiryDate, container.getPrincipal(), container.getPrincipal()};
+                    LOG.error("The TGT cannot be renewed beyond the next expiry date: {}."
+                        + "This process will not be able to authenticate new SASL connections after that "
+                        + "time (for example, it will not be authenticate a new connection with a Bookie "
+                        + ").  Ask your system administrator to either increase the "
+                        + "'renew until' time by doing : 'modprinc -maxrenewlife {}' within "
+                        + "kadmin, or instead, to generate a keytab for {}. Because the TGT's "
+                        + "expiry cannot be further extended by refreshing, exiting refresh thread now.", logPayload);
+                    return;
+                }
+                // determine how long to sleep from looking at ticket's expiry.
+                // We should not allow the ticket to expire, but we should take into consideration
+                // MIN_TIME_BEFORE_RELOGIN. Will not sleep less than MIN_TIME_BEFORE_RELOGIN, unless doing so
+                // would cause ticket expiration.
+                if ((nextRefresh > expiry) || ((now + MIN_TIME_BEFORE_RELOGIN) > expiry)) {
+                    // expiry is before next scheduled refresh).
+                    nextRefresh = now;
+                } else {
+                    if (nextRefresh < (now + MIN_TIME_BEFORE_RELOGIN)) {
+                        // next scheduled refresh is sooner than (now + MIN_TIME_BEFORE_LOGIN).
+                        Date until = new Date(nextRefresh);
+                        Date newuntil = new Date(now + MIN_TIME_BEFORE_RELOGIN);
+                        Object[] logPayload = {until, newuntil, MIN_TIME_BEFORE_RELOGIN / 1000};
+                        LOG.warn("TGT refresh thread time adjusted from : {} to : {} since "
+                            + "the former is sooner than the minimum refresh interval ("
+                            + "{} seconds) from now.", logPayload);
+                    }
+                    nextRefresh = Math.max(nextRefresh, now + MIN_TIME_BEFORE_RELOGIN);
+                }
+                nextRefreshDate = new Date(nextRefresh);
+                if (nextRefresh > expiry) {
+                    Object[] logPayload = {nextRefreshDate, expiryDate};
+                    LOG.error("next refresh: {} is later than expiry {}." + " This may indicate a clock skew problem."
+                        + "Check that this host and the KDC's " + "hosts' clocks are in sync. Exiting refresh thread.",
+                        logPayload);
+                    return;
+                }
+            }
+            if (now == nextRefresh) {
+                LOG.info("refreshing now because expiry is before next scheduled refresh time.");
+            } else if (now < nextRefresh) {
+                Date until = new Date(nextRefresh);
+                LOG.info("TGT refresh sleeping until: {}", until.toString());
+                try {
+                    Thread.sleep(nextRefresh - now);
+                } catch (InterruptedException ie) {
+                    LOG.warn("TGT renewal thread has been interrupted and will exit.");
+                    break;
+                }
+            } else {
+                LOG.error("nextRefresh:{} is in the past: exiting refresh thread. Check"
+                    + " clock sync between this host and KDC - (KDC's clock is likely ahead of this host)."
+                    + " Manual intervention will be required for this client to successfully authenticate."
+                    + " Exiting refresh thread.", nextRefreshDate);
+                break;
+            }
+            if (container.isUsingTicketCache()) {
+                String cmd = container.getConfiguration().getString(SaslConstants.KINIT_COMMAND, SaslConstants.KINIT_COMMAND_DEFAULT);
+                String kinitArgs = "-R";
+                int retry = 1;
+                while (retry >= 0) {
+                    try {
+                        LOG.debug("running ticket cache refresh command: {} {}", cmd, kinitArgs);
+                        Shell.execCommand(cmd, kinitArgs);
+                        break;
+                    } catch (Exception e) {
+                        if (retry > 0) {
+                            --retry;
+                            // sleep for 10 seconds
+                            try {
+                                Thread.sleep(10 * 1000);
+                            } catch (InterruptedException ie) {
+                                LOG.error("Interrupted while renewing TGT, exiting Login thread");
+                                return;
+                            }
+                        } else {
+                            Object[] logPayload = {cmd, kinitArgs, e.toString(), e};
+                            LOG.warn("Could not renew TGT due to problem running shell command: '{}"
+                                + " {}'; exception was:{}. Exiting refresh thread.", logPayload);
+                            return;
+                        }
+                    }
+                }
+            }
+            try {
+                int retry = 1;
+                while (retry >= 0) {
+                    try {
+                        reLogin();
+                        break;
+                    } catch (LoginException le) {
+                        if (retry > 0) {
+                            --retry;
+                            // sleep for 10 seconds.
+                            try {
+                                Thread.sleep(10 * 1000);
+                            } catch (InterruptedException e) {
+                                LOG.error("Interrupted during login retry after LoginException:", le);
+                                throw le;
+                            }
+                        } else {
+                            LOG.error("Could not refresh TGT for principal: {}.", container.getPrincipal(), le);
+                        }
+                    }
+                }
+            } catch (LoginException le) {
+                LOG.error("Failed to refresh TGT: refresh thread exiting now.", le);
+                break;
+            }
+        }
+    }
+
+    /**
+     * Re-login a principal. This method assumes that {@link #login(String)} has happened already.
+     *
+     * @throws javax.security.auth.login.LoginException on a failure
+     */
+    // c.f. HADOOP-6559
+    private synchronized void reLogin() throws LoginException {
+        LoginContext login = container.getLogin();
+        if (login == null) {
+            throw new LoginException("login must be done first");
+        }
+        if (!hasSufficientTimeElapsed()) {
+            return;
+        }
+        LOG.info("Initiating logout for {}", container.getPrincipal());
+        synchronized (Login.class) {
+            //clear up the kerberos state. But the tokens are not cleared! As per
+            //the Java kerberos login module code, only the kerberos credentials
+            //are cleared
+            login.logout();
+            //login and also update the subject field of this instance to
+            //have the new credentials (pass it to the LoginContext constructor)
+            login = new LoginContext(container.getLoginContextName(), container.getSubject());
+            LOG.info("Initiating re-login for {}", container.getPrincipal());
+            login.login();
+            container.setLogin(login);
+        }
+    }
+
+    private boolean hasSufficientTimeElapsed() {
+        long now = System.currentTimeMillis();
+        if (now - getLastLogin() < MIN_TIME_BEFORE_RELOGIN) {
+            LOG.warn("Not attempting to re-login since the last re-login was "
+                + "attempted less than {} seconds before.", MIN_TIME_BEFORE_RELOGIN / 1000);
+            return false;
+        }
+        // register most recent relogin attempt
+        setLastLogin(now);
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/667390d1/bookkeeper-server/src/test/java/org/apache/bookkeeper/sasl/GSSAPIBookKeeperTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/sasl/GSSAPIBookKeeperTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/sasl/GSSAPIBookKeeperTest.java
new file mode 100644
index 0000000..07d31a8
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/sasl/GSSAPIBookKeeperTest.java
@@ -0,0 +1,256 @@
+package org.apache.bookkeeper.sasl;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Arrays;
+import org.apache.bookkeeper.client.*;
+import java.util.Enumeration;
+import java.util.Properties;
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+ */
+import java.util.concurrent.atomic.AtomicLong;
+import javax.security.auth.login.Configuration;
+import org.apache.bookkeeper.client.BKException.BKUnauthorizedAccessException;
+
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.zookeeper.KeeperException;
+import org.junit.After;
+import org.junit.AfterClass;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.*;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+public class GSSAPIBookKeeperTest extends BookKeeperClusterTestCase {
+
+    static final Logger LOG = LoggerFactory.getLogger(GSSAPIBookKeeperTest.class);
+
+    private static final byte[] PASSWD = "testPasswd".getBytes();
+    private static final byte[] ENTRY = "TestEntry".getBytes();
+
+    private MiniKdc kdc;
+    private Properties conf;
+
+    @Rule
+    public TemporaryFolder kdcDir = new TemporaryFolder();
+
+    @Rule
+    public TemporaryFolder kerberosWorkDir = new TemporaryFolder();
+
+    @Before
+    public void startMiniKdc() throws Exception {
+
+        conf = MiniKdc.createConf();
+        kdc = new MiniKdc(conf, kdcDir.getRoot());
+        kdc.start();
+
+        String localhostName = InetAddress.getLocalHost().getHostName();
+
+        String principalServerNoRealm = "bookkeeper/" + localhostName;
+        String principalServer = "bookkeeper/" + localhostName + "@" + kdc.getRealm();
+        LOG.info("principalServer: " + principalServer);
+        String principalClientNoRealm = "bookkeeperclient/" + localhostName;
+        String principalClient = principalClientNoRealm + "@" + kdc.getRealm();
+        LOG.info("principalClient: " + principalClient);
+
+        File keytabClient = new File(kerberosWorkDir.getRoot(), "bookkeeperclient.keytab");
+        kdc.createPrincipal(keytabClient, principalClientNoRealm);
+
+        File keytabServer = new File(kerberosWorkDir.getRoot(), "bookkeeperserver.keytab");
+        kdc.createPrincipal(keytabServer, principalServerNoRealm);
+
+        File jaas_file = new File(kerberosWorkDir.getRoot(), "jaas.conf");
+        try (FileWriter writer = new FileWriter(jaas_file)) {
+            writer.write("\n"
+                + "Bookie {\n"
+                + "  com.sun.security.auth.module.Krb5LoginModule required debug=true\n"
+                + "  useKeyTab=true\n"
+                + "  keyTab=\"" + keytabServer.getAbsolutePath() + "\n"
+                + "  storeKey=true\n"
+                + "  useTicketCache=false\n" // won't test useTicketCache=true on JUnit tests
+                + "  principal=\"" + principalServer + "\";\n"
+                + "};\n"
+                + "\n"
+                + "\n"
+                + "\n"
+                + "BookKeeper {\n"
+                + "  com.sun.security.auth.module.Krb5LoginModule required debug=true\n"
+                + "  useKeyTab=true\n"
+                + "  keyTab=\"" + keytabClient.getAbsolutePath() + "\n"
+                + "  storeKey=true\n"
+                + "  useTicketCache=false\n"
+                + "  principal=\"" + principalClient + "\";\n"
+                + "};\n"
+            );
+
+        }
+
+        File krb5file = new File(kerberosWorkDir.getRoot(), "krb5.conf");
+        try (FileWriter writer = new FileWriter(krb5file)) {
+            writer.write("[libdefaults]\n"
+                + " default_realm = " + kdc.getRealm() + "\n"
+                + "\n"
+                + "\n"
+                + "[realms]\n"
+                + " " + kdc.getRealm() + "  = {\n"
+                + "  kdc = " + kdc.getHost() + ":" + kdc.getPort() + "\n"
+                + " }"
+            );
+
+        }
+
+        System.setProperty("java.security.auth.login.config", jaas_file.getAbsolutePath());
+        System.setProperty("java.security.krb5.conf", krb5file.getAbsolutePath());
+        javax.security.auth.login.Configuration.getConfiguration().refresh();
+
+    }
+
+    @After
+    public void stopMiniKdc() {
+        System.clearProperty("java.security.auth.login.config");
+        System.clearProperty("java.security.krb5.conf");
+        if (kdc != null) {
+            kdc.stop();
+        }
+    }
+
+    public GSSAPIBookKeeperTest() {
+        super(0); // start them later when auth providers are configured
+    }
+
+    // we pass in ledgerId because the method may throw exceptions
+    private void connectAndWriteToBookie(ClientConfiguration conf, AtomicLong ledgerWritten)
+        throws BKException, InterruptedException, IOException, KeeperException {
+        LOG.info("Connecting to bookie");
+        try (BookKeeper bkc = new BookKeeper(conf, zkc)) {
+            LedgerHandle l = bkc.createLedger(1, 1, DigestType.CRC32,
+                PASSWD);
+            ledgerWritten.set(l.getId());
+            l.addEntry(ENTRY);
+            l.close();
+        }
+    }
+
+    /**
+     * check if the entry exists. Restart the bookie to allow access
+     */
+    private int entryCount(long ledgerId, ServerConfiguration bookieConf,
+        ClientConfiguration clientConf) throws Exception {
+        LOG.info("Counting entries in {}", ledgerId);
+        for (ServerConfiguration conf : bsConfs) {
+            bookieConf.setUseHostNameAsBookieID(true);
+            bookieConf.setBookieAuthProviderFactoryClass(
+                SASLBookieAuthProviderFactory.class.getName());
+        }
+        clientConf.setClientAuthProviderFactoryClass(
+            SASLClientProviderFactory.class.getName());
+
+        restartBookies();
+
+        try (BookKeeper bkc = new BookKeeper(clientConf, zkc);
+            LedgerHandle lh = bkc.openLedger(ledgerId, DigestType.CRC32,
+                PASSWD);) {
+            if (lh.getLastAddConfirmed() < 0) {
+                return 0;
+            }
+            Enumeration<LedgerEntry> e = lh.readEntries(0, lh.getLastAddConfirmed());
+            int count = 0;
+            while (e.hasMoreElements()) {
+                count++;
+                assertTrue("Should match what we wrote",
+                    Arrays.equals(e.nextElement().getEntry(), ENTRY));
+            }
+            return count;
+        }
+    }
+
+    /**
+     * Test an connection will authorize with a single message to the server and a single response.
+     */
+    @Test(timeout = 30000)
+    public void testSingleMessageAuth() throws Exception {
+        ServerConfiguration bookieConf = newServerConfiguration();
+        bookieConf.setUseHostNameAsBookieID(true);
+        bookieConf.setBookieAuthProviderFactoryClass(
+            SASLBookieAuthProviderFactory.class.getName());
+
+        ClientConfiguration clientConf = newClientConfiguration();
+        clientConf.setClientAuthProviderFactoryClass(
+            SASLClientProviderFactory.class.getName());
+
+        startAndStoreBookie(bookieConf);
+
+        AtomicLong ledgerId = new AtomicLong(-1);
+        connectAndWriteToBookie(clientConf, ledgerId); // should succeed
+
+        assertFalse(ledgerId.get() == -1);
+        assertEquals("Should have entry", 1, entryCount(ledgerId.get(), bookieConf, clientConf));
+    }
+
+    @Test(timeout = 30000)
+    public void testNotAllowedClientId() throws Exception {
+        ServerConfiguration bookieConf = newServerConfiguration();
+        bookieConf.setUseHostNameAsBookieID(true);
+        bookieConf.setBookieAuthProviderFactoryClass(
+            SASLBookieAuthProviderFactory.class.getName());
+        bookieConf.setProperty(SaslConstants.JAAS_CLIENT_ALLOWED_IDS, "nobody");
+
+        ClientConfiguration clientConf = newClientConfiguration();
+        clientConf.setClientAuthProviderFactoryClass(
+            SASLClientProviderFactory.class.getName());
+
+        startAndStoreBookie(bookieConf);
+
+        AtomicLong ledgerId = new AtomicLong(-1);
+        try {
+            connectAndWriteToBookie(clientConf, ledgerId);
+            fail("should not be able to access the bookie");
+        } catch (BKUnauthorizedAccessException err) {
+        }
+
+    }
+
+    BookieServer startAndStoreBookie(ServerConfiguration conf) throws Exception {
+        bsConfs.add(conf);
+        BookieServer s = startBookie(conf);
+        bs.add(s);
+        return s;
+    }
+
+    @AfterClass
+    public static void resetJAAS() {
+        System.clearProperty("java.security.auth.login.config");
+        Configuration.getConfiguration().refresh();
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/667390d1/bookkeeper-server/src/test/java/org/apache/bookkeeper/sasl/MD5DigestBookKeeperTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/sasl/MD5DigestBookKeeperTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/sasl/MD5DigestBookKeeperTest.java
new file mode 100644
index 0000000..185c5cf
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/sasl/MD5DigestBookKeeperTest.java
@@ -0,0 +1,142 @@
+package org.apache.bookkeeper.sasl;
+
+import java.io.File;
+import java.util.Arrays;
+import org.apache.bookkeeper.client.*;
+import java.util.Enumeration;
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+ */
+import java.util.concurrent.atomic.AtomicLong;
+import javax.security.auth.login.Configuration;
+
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.proto.BookieServer;
+import static org.apache.bookkeeper.sasl.SaslConstants.JAAS_CLIENT_ALLOWED_IDS;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.AfterClass;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.*;
+
+public class MD5DigestBookKeeperTest extends BookKeeperClusterTestCase {
+
+    static final Logger LOG = LoggerFactory.getLogger(MD5DigestBookKeeperTest.class);
+
+    private static final byte[] PASSWD = "testPasswd".getBytes();
+    private static final byte[] ENTRY = "TestEntry".getBytes();
+
+    static {
+        System.setProperty("java.security.auth.login.config", new File("src/test/resources/jaas_md5.conf").getAbsolutePath());
+    }
+
+    public MD5DigestBookKeeperTest() {
+        super(0); // start them later when auth providers are configured
+    }
+
+    // we pass in ledgerId because the method may throw exceptions
+    private void connectAndWriteToBookie(ClientConfiguration conf, AtomicLong ledgerWritten)
+        throws Exception {
+        LOG.info("Connecting to bookie");
+        BookKeeper bkc = new BookKeeper(conf, zkc);
+        LedgerHandle l = bkc.createLedger(1, 1, DigestType.CRC32,
+            PASSWD);
+        ledgerWritten.set(l.getId());
+        l.addEntry(ENTRY);
+        l.close();
+        bkc.close();
+    }
+
+    /**
+     * check if the entry exists. Restart the bookie to allow access
+     */
+    private int entryCount(long ledgerId, ServerConfiguration bookieConf,
+        ClientConfiguration clientConf) throws Exception {
+        LOG.info("Counting entries in {}", ledgerId);
+        for (ServerConfiguration conf : bsConfs) {
+            bookieConf.setBookieAuthProviderFactoryClass(
+                SASLBookieAuthProviderFactory.class.getName());
+            bookieConf.setProperty(JAAS_CLIENT_ALLOWED_IDS, ".*hd.*");
+        }
+        clientConf.setClientAuthProviderFactoryClass(
+            SASLClientProviderFactory.class.getName());
+
+        restartBookies();
+
+        try (BookKeeper bkc = new BookKeeper(clientConf, zkc);
+            LedgerHandle lh = bkc.openLedger(ledgerId, DigestType.CRC32,
+                PASSWD);) {
+
+            if (lh.getLastAddConfirmed() < 0) {
+                return 0;
+            }
+            Enumeration<LedgerEntry> e = lh.readEntries(0, lh.getLastAddConfirmed());
+            int count = 0;
+            while (e.hasMoreElements()) {
+                count++;
+                assertTrue("Should match what we wrote",
+                    Arrays.equals(e.nextElement().getEntry(), ENTRY));
+            }
+            return count;
+        }
+    }
+
+    /**
+     * Test an connection will authorize with a single message to the server and a single response.
+     */
+    @Test(timeout = 30000)
+    public void testSingleMessageAuth() throws Exception {
+        ServerConfiguration bookieConf = newServerConfiguration();
+        bookieConf.setBookieAuthProviderFactoryClass(
+            SASLBookieAuthProviderFactory.class.getName());
+        bookieConf.setProperty(JAAS_CLIENT_ALLOWED_IDS, ".*hd.*");
+
+        ClientConfiguration clientConf = newClientConfiguration();
+        clientConf.setClientAuthProviderFactoryClass(
+            SASLClientProviderFactory.class.getName());
+
+        startAndStoreBookie(bookieConf);
+
+        AtomicLong ledgerId = new AtomicLong(-1);
+        connectAndWriteToBookie(clientConf, ledgerId); // should succeed
+
+        assertFalse(ledgerId.get() == -1);
+        assertEquals("Should have entry", 1, entryCount(ledgerId.get(), bookieConf, clientConf));
+    }
+
+    BookieServer startAndStoreBookie(ServerConfiguration conf) throws Exception {
+        bsConfs.add(conf);
+        BookieServer s = startBookie(conf);
+        bs.add(s);
+        return s;
+    }
+
+    @AfterClass
+    public static void resetJAAS() {
+        System.clearProperty("java.security.auth.login.config");
+        Configuration.getConfiguration().refresh();
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/667390d1/bookkeeper-server/src/test/resources/jaas_md5.conf
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/resources/jaas_md5.conf b/bookkeeper-server/src/test/resources/jaas_md5.conf
new file mode 100644
index 0000000..060c825
--- /dev/null
+++ b/bookkeeper-server/src/test/resources/jaas_md5.conf
@@ -0,0 +1,30 @@
+/*
+* Copyright 2016 The Apache Software Foundation
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+Bookie {
+       org.apache.zookeeper.server.auth.DigestLoginModule required
+       user_hd="testpwd";
+};
+
+BookKeeper {
+       org.apache.zookeeper.server.auth.DigestLoginModule required
+       username="hd"
+       password="testpwd";
+};