You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/02/09 02:38:33 UTC

[3/6] storm git commit: STORM-2898: Support for WorkerToken authentication

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/digest/ClientCallbackHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/digest/ClientCallbackHandler.java b/storm-client/src/jvm/org/apache/storm/security/auth/digest/ClientCallbackHandler.java
deleted file mode 100644
index 312e4ab..0000000
--- a/storm-client/src/jvm/org/apache/storm/security/auth/digest/ClientCallbackHandler.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.security.auth.digest;
-
-import org.apache.storm.security.auth.AbstractSaslClientCallbackHandler;
-import org.apache.storm.security.auth.AuthUtils;
-
-import javax.security.auth.login.AppConfigurationEntry;
-import javax.security.auth.login.Configuration;
-import java.io.IOException;
-
-/**
- *  client side callback handler.
- */
-public class ClientCallbackHandler extends AbstractSaslClientCallbackHandler {
-
-    /**
-     * Constructor based on a JAAS configuration
-     * 
-     * For digest, you should have a pair of user name and password defined.
-     * @throws IOException
-     */
-    public ClientCallbackHandler(Configuration configuration) throws IOException {
-        if (configuration == null) return;
-        AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(AuthUtils.LOGIN_CONTEXT_CLIENT);
-        if (configurationEntries == null) {
-            String errorMessage = "Could not find a '"+AuthUtils.LOGIN_CONTEXT_CLIENT
-                    + "' entry in this configuration: Client cannot start.";
-            throw new IOException(errorMessage);
-        }
-
-        _password = "";
-        for(AppConfigurationEntry entry: configurationEntries) {
-            if (entry.getOptions().get(USERNAME) != null) {
-                _username = (String)entry.getOptions().get(USERNAME);
-            }
-            if (entry.getOptions().get(PASSWORD) != null) {
-                _password = (String)entry.getOptions().get(PASSWORD);
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java
index 4d123aa..1272712 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java
@@ -15,12 +15,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.security.auth.digest;
 
 import java.io.IOException;
-
+import java.util.Map;
 import javax.security.auth.callback.CallbackHandler;
-
+import javax.security.auth.login.AppConfigurationEntry;
+import org.apache.storm.generated.WorkerToken;
+import org.apache.storm.security.auth.AuthUtils;
+import org.apache.storm.security.auth.sasl.SaslTransportPlugin;
+import org.apache.storm.security.auth.sasl.SimpleSaslClientCallbackHandler;
+import org.apache.storm.security.auth.sasl.SimpleSaslServerCallbackHandler;
+import org.apache.storm.security.auth.workertoken.WorkerTokenAuthorizer;
+import org.apache.storm.security.auth.workertoken.WorkerTokenClientCallbackHandler;
 import org.apache.thrift.transport.TSaslClientTransport;
 import org.apache.thrift.transport.TSaslServerTransport;
 import org.apache.thrift.transport.TTransport;
@@ -29,20 +37,19 @@ import org.apache.thrift.transport.TTransportFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.storm.security.auth.AuthUtils;
-import org.apache.storm.security.auth.SaslTransportPlugin;
-
 public class DigestSaslTransportPlugin extends SaslTransportPlugin {
     public static final String DIGEST = "DIGEST-MD5";
     private static final Logger LOG = LoggerFactory.getLogger(DigestSaslTransportPlugin.class);
 
-    protected TTransportFactory getServerTransportFactory() throws IOException {        
+    protected TTransportFactory getServerTransportFactory() throws IOException {
         //create an authentication callback handler
-        CallbackHandler serer_callback_handler = new ServerCallbackHandler(login_conf);
+        CallbackHandler serverCallbackHandler = new SimpleSaslServerCallbackHandler(
+            new WorkerTokenAuthorizer(conf, type),
+            new JassPasswordProvider(loginConf));
 
         //create a transport factory that will invoke our auth callback for digest
         TSaslServerTransport.Factory factory = new TSaslServerTransport.Factory();
-        factory.addServerDefinition(DIGEST, AuthUtils.SERVICE, "localhost", null, serer_callback_handler);
+        factory.addServerDefinition(DIGEST, AuthUtils.SERVICE, "localhost", null, serverCallbackHandler);
 
         LOG.info("SASL DIGEST-MD5 transport factory will be used");
         return factory;
@@ -50,19 +57,46 @@ public class DigestSaslTransportPlugin extends SaslTransportPlugin {
 
     @Override
     public TTransport connect(TTransport transport, String serverHost, String asUser) throws TTransportException, IOException {
-        ClientCallbackHandler client_callback_handler = new ClientCallbackHandler(login_conf);
-        TSaslClientTransport wrapper_transport = new TSaslClientTransport(DIGEST,
+        CallbackHandler clientCallbackHandler;
+        WorkerToken token = WorkerTokenClientCallbackHandler.findWorkerTokenInSubject(type);
+        if (token != null) {
+            clientCallbackHandler = new WorkerTokenClientCallbackHandler(token);
+        } else if (loginConf != null) {
+            AppConfigurationEntry [] configurationEntries = loginConf.getAppConfigurationEntry(AuthUtils.LOGIN_CONTEXT_CLIENT);
+            if (configurationEntries == null) {
+                String errorMessage = "Could not find a '" + AuthUtils.LOGIN_CONTEXT_CLIENT
+                    + "' entry in this configuration: Client cannot start.";
+                throw new IOException(errorMessage);
+            }
+
+            String username = "";
+            String password = "";
+            for (AppConfigurationEntry entry : configurationEntries) {
+                Map options = entry.getOptions();
+                username = (String)options.getOrDefault("username", username);
+                password = (String)options.getOrDefault("password", password);
+            }
+            clientCallbackHandler = new SimpleSaslClientCallbackHandler(username, password);
+        } else {
+            throw new IOException("Could not find any way to authenticate with the server.");
+        }
+
+        TSaslClientTransport wrapperTransport = new TSaslClientTransport(DIGEST,
                 null,
                 AuthUtils.SERVICE, 
                 serverHost,
                 null,
-                client_callback_handler, 
+                clientCallbackHandler,
                 transport);
 
-        wrapper_transport.open();
+        wrapperTransport.open();
         LOG.debug("SASL DIGEST-MD5 client transport has been established");
 
-        return wrapper_transport;
+        return wrapperTransport;
     }
 
+    @Override
+    public boolean areWorkerTokensSupported() {
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/digest/JassPasswordProvider.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/digest/JassPasswordProvider.java b/storm-client/src/jvm/org/apache/storm/security/auth/digest/JassPasswordProvider.java
new file mode 100644
index 0000000..bb3f1bf
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/digest/JassPasswordProvider.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.security.auth.digest;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import org.apache.storm.security.auth.AuthUtils;
+import org.apache.storm.security.auth.ThriftConnectionType;
+import org.apache.storm.security.auth.sasl.PasswordProvider;
+
+/**
+ * Provides passwords out of a jaas conf for typical MD5-DIGEST authentication support.
+ */
+public class JassPasswordProvider implements PasswordProvider {
+    private static final String USER_PREFIX = "user_";
+    /**
+     * The system property that sets a super user password.  This can be used in addition to the
+     * jaas conf, and takes precedent over a "super" user in the jaas conf if this is set.
+     */
+    public static final String SYSPROP_SUPER_PASSWORD = "storm.SASLAuthenticationProvider.superPassword";
+
+    private Map<String, char[]> credentials = new ConcurrentHashMap<>();
+
+    /**
+     * Constructor.
+     * @param configuration the jaas configuration to get the credentials out of.
+     * @throws IOException if we could not read the Server section in the jaas conf.
+     */
+    public JassPasswordProvider(Configuration configuration) throws IOException {
+        if (configuration == null) {
+            return;
+        }
+
+        AppConfigurationEntry[] configurationEntries = configuration.getAppConfigurationEntry(AuthUtils.LOGIN_CONTEXT_SERVER);
+        if (configurationEntries == null) {
+            String errorMessage = "Could not find a '" + AuthUtils.LOGIN_CONTEXT_SERVER
+                + "' entry in this configuration: Server cannot start.";
+            throw new IOException(errorMessage);
+        }
+        credentials.clear();
+        for (AppConfigurationEntry entry : configurationEntries) {
+            Map<String, ?> options = entry.getOptions();
+            // Populate user -> password map with JAAS configuration entries from the "Server" section.
+            // Usernames are distinguished from other options by prefixing the username with a "user_" prefix.
+            for (Map.Entry<String, ?> pair : options.entrySet()) {
+                String key = pair.getKey();
+                if (key.startsWith(USER_PREFIX)) {
+                    String userName = key.substring(USER_PREFIX.length());
+                    credentials.put(userName, ((String) pair.getValue()).toCharArray());
+                }
+            }
+        }
+
+        String superPassword = System.getProperty(SYSPROP_SUPER_PASSWORD);
+        if (superPassword != null) {
+            credentials.put("super", superPassword.toCharArray());
+        }
+    }
+
+    @Override
+    public Optional<char[]> getPasswordFor(String user) {
+        return Optional.ofNullable(credentials.get(user));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/digest/ServerCallbackHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/digest/ServerCallbackHandler.java b/storm-client/src/jvm/org/apache/storm/security/auth/digest/ServerCallbackHandler.java
deleted file mode 100644
index 7c4414f..0000000
--- a/storm-client/src/jvm/org/apache/storm/security/auth/digest/ServerCallbackHandler.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.security.auth.digest;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.storm.security.auth.AbstractSaslServerCallbackHandler;
-import org.apache.storm.security.auth.ReqContext;
-import org.apache.storm.security.auth.SaslTransportPlugin;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.auth.login.AppConfigurationEntry;
-import javax.security.auth.login.Configuration;
-import javax.security.sasl.AuthorizeCallback;
-import javax.security.sasl.RealmCallback;
-
-import org.apache.storm.security.auth.AuthUtils;
-
-/**
- * SASL server side callback handler
- */
-public class ServerCallbackHandler extends AbstractSaslServerCallbackHandler {
-    private static final Logger LOG = LoggerFactory.getLogger(ServerCallbackHandler.class);
-    private static final String USER_PREFIX = "user_";
-    public static final String SYSPROP_SUPER_PASSWORD = "storm.SASLAuthenticationProvider.superPassword";
-
-    public ServerCallbackHandler(Configuration configuration) throws IOException {
-        if (configuration==null) return;
-
-        AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(AuthUtils.LOGIN_CONTEXT_SERVER);
-        if (configurationEntries == null) {
-            String errorMessage = "Could not find a '"+AuthUtils.LOGIN_CONTEXT_SERVER+"' entry in this configuration: Server cannot start.";
-            throw new IOException(errorMessage);
-        }
-        credentials.clear();
-        for(AppConfigurationEntry entry: configurationEntries) {
-            Map<String,?> options = entry.getOptions();
-            // Populate DIGEST-MD5 user -> password map with JAAS configuration entries from the "Server" section.
-            // Usernames are distinguished from other options by prefixing the username with a "user_" prefix.
-            for(Map.Entry<String, ?> pair : options.entrySet()) {
-                String key = pair.getKey();
-                if (key.startsWith(USER_PREFIX)) {
-                    String userName = key.substring(USER_PREFIX.length());
-                    credentials.put(userName,(String)pair.getValue());
-                }
-            }
-        }
-    }
-
-    @Override
-    protected void handlePasswordCallback(PasswordCallback pc) {
-        LOG.debug("handlePasswordCallback");
-        if ("super".equals(this.userName) && System.getProperty(SYSPROP_SUPER_PASSWORD) != null) {
-            // superuser: use Java system property for password, if available.
-            pc.setPassword(System.getProperty(SYSPROP_SUPER_PASSWORD).toCharArray());
-        } else {
-            super.handlePasswordCallback(pc);
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
index b6571ba..157ae54 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
@@ -35,8 +35,12 @@ import javax.security.auth.login.Configuration;
 import javax.security.auth.login.LoginException;
 import javax.security.sasl.Sasl;
 
+import org.apache.storm.generated.WorkerToken;
 import org.apache.storm.messaging.netty.Login;
 import org.apache.commons.lang.StringUtils;
+import org.apache.storm.security.auth.sasl.SimpleSaslServerCallbackHandler;
+import org.apache.storm.security.auth.workertoken.WorkerTokenAuthorizer;
+import org.apache.storm.security.auth.workertoken.WorkerTokenClientCallbackHandler;
 import org.apache.thrift.transport.TSaslClientTransport;
 import org.apache.thrift.transport.TSaslServerTransport;
 import org.apache.thrift.transport.TTransport;
@@ -47,10 +51,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.storm.security.auth.AuthUtils;
-import org.apache.storm.security.auth.SaslTransportPlugin;
+import org.apache.storm.security.auth.sasl.SaslTransportPlugin;
 
 public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
-    public static final String KERBEROS = "GSSAPI"; 
+    public static final String KERBEROS = "GSSAPI";
+    private static final String DIGEST = "DIGEST-MD5";
     private static final Logger LOG = LoggerFactory.getLogger(KerberosSaslTransportPlugin.class);
     private static Map <LoginCacheKey, Login> loginCache = new ConcurrentHashMap<>();
     private static final String DISABLE_LOGIN_CACHE = "disableLoginCache";
@@ -93,15 +98,16 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
         }
     }
 
+    @Override
     public TTransportFactory getServerTransportFactory() throws IOException {
         //create an authentication callback handler
-        CallbackHandler server_callback_handler = new ServerCallbackHandler(login_conf, topoConf);
+        CallbackHandler server_callback_handler = new ServerCallbackHandler(loginConf);
         
         //login our principal
         Subject subject = null;
         try {
             //specify a configuration object to be used
-            Configuration.setConfiguration(login_conf);
+            Configuration.setConfiguration(loginConf);
             //now login
             Login login = new Login(AuthUtils.LOGIN_CONTEXT_SERVER, server_callback_handler);
             subject = login.getSubject();
@@ -114,15 +120,15 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
         //check the credential of our principal
         if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) { 
             throw new RuntimeException("Fail to verify user principal with section \""
-                    +AuthUtils.LOGIN_CONTEXT_SERVER+"\" in login configuration file "+ login_conf);
+                    + AuthUtils.LOGIN_CONTEXT_SERVER + "\" in login configuration file " + loginConf);
         }
 
-        String principal = AuthUtils.get(login_conf, AuthUtils.LOGIN_CONTEXT_SERVER, "principal"); 
+        String principal = AuthUtils.get(loginConf, AuthUtils.LOGIN_CONTEXT_SERVER, "principal");
         LOG.debug("principal:"+principal);  
         KerberosName serviceKerberosName = new KerberosName(principal);
         String serviceName = serviceKerberosName.getServiceName();
         String hostName = serviceKerberosName.getHostName();
-        Map<String, String> props = new TreeMap<String,String>();
+        Map<String, String> props = new TreeMap<>();
         props.put(Sasl.QOP, "auth");
         props.put(Sasl.SERVER_AUTH, "false");
 
@@ -130,6 +136,10 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
         TSaslServerTransport.Factory factory = new TSaslServerTransport.Factory();
         factory.addServerDefinition(KERBEROS, serviceName, hostName, props, server_callback_handler);
 
+        //Also add in support for worker tokens
+        factory.addServerDefinition(DIGEST, AuthUtils.SERVICE, "localhost", null,
+            new SimpleSaslServerCallbackHandler(new WorkerTokenAuthorizer(conf, type)));
+
         //create a wrap transport factory so that we could apply user credential during connections
         TUGIAssumingTransportFactory wrapFactory = new TUGIAssumingTransportFactory(factory, subject); 
 
@@ -140,9 +150,9 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
     private Login mkLogin() throws IOException {
         try {
             //create an authentication callback handler
-            ClientCallbackHandler client_callback_handler = new ClientCallbackHandler(login_conf);
+            ClientCallbackHandler client_callback_handler = new ClientCallbackHandler(loginConf);
             //specify a configuration object to be used
-            Configuration.setConfiguration(login_conf);
+            Configuration.setConfiguration(loginConf);
             //now login
             Login login = new Login(AuthUtils.LOGIN_CONTEXT_CLIENT, client_callback_handler);
             login.startThreadIfNeeded();
@@ -154,9 +164,28 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
     }
 
     @Override
-    public TTransport connect(TTransport transport, String serverHost, String asUser) throws TTransportException, IOException {
+    public TTransport connect(TTransport transport, String serverHost, String asUser) throws IOException, TTransportException {
+        WorkerToken token = WorkerTokenClientCallbackHandler.findWorkerTokenInSubject(type);
+        if (token != null && asUser != null) {
+            CallbackHandler clientCallbackHandler = new WorkerTokenClientCallbackHandler(token);
+            TSaslClientTransport wrapperTransport = new TSaslClientTransport(DIGEST,
+                null,
+                AuthUtils.SERVICE,
+                serverHost,
+                null,
+                clientCallbackHandler,
+                transport);
+            wrapperTransport.open();
+            LOG.debug("SASL DIGEST-MD5 WorkerToken client transport has been established");
+
+            return wrapperTransport;
+        }
+        return kerberosConnect(transport, serverHost, asUser);
+    }
+
+    private TTransport kerberosConnect(TTransport transport, String serverHost, String asUser) throws IOException {
         //login our user
-        SortedMap<String, ?> authConf = AuthUtils.pullConfig(login_conf, AuthUtils.LOGIN_CONTEXT_CLIENT);
+        SortedMap<String, ?> authConf = AuthUtils.pullConfig(loginConf, AuthUtils.LOGIN_CONTEXT_CLIENT);
         if (authConf == null) {
             throw new RuntimeException("Error in parsing the kerberos login Configuration, returned null");
         }
@@ -194,15 +223,15 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
         final Subject subject = login.getSubject();
         if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) { //error
             throw new RuntimeException("Fail to verify user principal with section \""
-                        +AuthUtils.LOGIN_CONTEXT_CLIENT+"\" in login configuration file "+ login_conf);
+                        +AuthUtils.LOGIN_CONTEXT_CLIENT+"\" in login configuration file "+ loginConf);
         }
 
         final String principal = StringUtils.isBlank(asUser) ? getPrincipal(subject) : asUser;
-        String serviceName = AuthUtils.get(login_conf, AuthUtils.LOGIN_CONTEXT_CLIENT, "serviceName");
+        String serviceName = AuthUtils.get(loginConf, AuthUtils.LOGIN_CONTEXT_CLIENT, "serviceName");
         if (serviceName == null) {
             serviceName = AuthUtils.SERVICE; 
         }
-        Map<String, String> props = new TreeMap<String,String>();
+        Map<String, String> props = new TreeMap<>();
         props.put(Sasl.QOP, "auth");
         props.put(Sasl.SERVER_AUTH, "false");
 
@@ -246,7 +275,8 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
         return ((Principal)(principals.toArray()[0])).getName();
     }
 
-    /** A TransportFactory that wraps another one, but assumes a specified UGI
+    /**
+     * A TransportFactory that wraps another one, but assumes a specified UGI
      * before calling through.                                                                                                                                                      
      *                                                                                                                                                                              
      * This is used on the server side to assume the server's Principal when accepting                                                                                              
@@ -269,22 +299,25 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
         public TTransport getTransport(final TTransport trans) {
             try {
                 return Subject.doAs(subject,
-                        new PrivilegedExceptionAction<TTransport>() {
-                    public TTransport run() {
+                    (PrivilegedExceptionAction<TTransport>) () -> {
                         try {
                             return wrapped.getTransport(trans);
                         }
                         catch (Exception e) {
                             LOG.debug("Storm server failed to open transport " +
-                                    "to interact with a client during session initiation: " + e, e);
+                                "to interact with a client during session initiation: " + e, e);
                             return new NoOpTTrasport(null);
                         }
-                    }
-                });
+                    });
             } catch (PrivilegedActionException e) {
                 LOG.error("Storm server experienced a PrivilegedActionException exception while creating a transport using a JAAS principal context:" + e, e);
                 return null;
             }
         }
     }
+
+    @Override
+    public boolean areWorkerTokensSupported() {
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java
index 59eb80d..d3157c8 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java
@@ -18,79 +18,87 @@
 
 package org.apache.storm.security.auth.kerberos;
 
+import javax.security.sasl.RealmCallback;
 import org.apache.storm.security.auth.AuthUtils;
 import org.apache.storm.security.auth.ReqContext;
-import org.apache.storm.security.auth.SaslTransportPlugin;
+import org.apache.storm.security.auth.sasl.SaslTransportPlugin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.security.auth.Subject;
 import javax.security.auth.callback.*;
 import javax.security.auth.login.AppConfigurationEntry;
 import javax.security.auth.login.Configuration;
 import javax.security.sasl.AuthorizeCallback;
 import java.io.IOException;
-import java.util.Map;
 
 /**
- * SASL server side callback handler
+ * SASL server side callback handler for kerberos auth.
  */
 public class ServerCallbackHandler implements CallbackHandler {
     private static final Logger LOG = LoggerFactory.getLogger(ServerCallbackHandler.class);
 
-    private String userName;
-
-    public ServerCallbackHandler(Configuration configuration, Map<String, Object> topoConf) throws IOException {
-        if (configuration==null) return;
+    public ServerCallbackHandler(Configuration configuration) throws IOException {
+        if (configuration == null) {
+            return;
+        }
 
         AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(AuthUtils.LOGIN_CONTEXT_SERVER);
         if (configurationEntries == null) {
-            String errorMessage = "Could not find a '"+AuthUtils.LOGIN_CONTEXT_SERVER+"' entry in this configuration: Server cannot start.";
+            String errorMessage = "Could not find a '" + AuthUtils.LOGIN_CONTEXT_SERVER
+                + "' entry in this configuration: Server cannot start.";
             LOG.error(errorMessage);
             throw new IOException(errorMessage);
         }
-
     }
 
     public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
+        NameCallback nc = null;
+        PasswordCallback pc = null;
+        AuthorizeCallback ac = null;
         for (Callback callback : callbacks) {
-            if (callback instanceof NameCallback) {
-                handleNameCallback((NameCallback) callback);
+            if (callback instanceof AuthorizeCallback) {
+                ac = (AuthorizeCallback) callback;
+            } else if (callback instanceof NameCallback) {
+                nc = (NameCallback) callback;
             } else if (callback instanceof PasswordCallback) {
-                handlePasswordCallback((PasswordCallback) callback);
-            } else if (callback instanceof AuthorizeCallback) {
-                handleAuthorizeCallback((AuthorizeCallback) callback);
+                pc = (PasswordCallback) callback;
+            } else if (callback instanceof RealmCallback) {
+                //Ignored...
+            } else {
+                throw new UnsupportedCallbackException(callback,
+                    "Unrecognized SASL Callback");
             }
         }
-    }
 
-    private void handleNameCallback(NameCallback nc) {
-        LOG.debug("handleNameCallback");
-        userName = nc.getDefaultName();
-        nc.setName(nc.getDefaultName());
-    }
+        String userName = "UNKNOWN";
+        if (nc != null) {
+            LOG.debug("handleNameCallback");
+             userName = nc.getDefaultName();
+            nc.setName(nc.getDefaultName());
+        }
 
-    private void handlePasswordCallback(PasswordCallback pc) {
-        LOG.warn("No password found for user: " + userName);
-    }
+        if (pc != null) {
+            LOG.warn("No password found for user: {}", userName);
+        }
 
-    private void handleAuthorizeCallback(AuthorizeCallback ac) {
-        String authenticationID = ac.getAuthenticationID();
-        LOG.info("Successfully authenticated client: authenticationID=" + authenticationID + " authorizationID= " + ac.getAuthorizationID());
+        if (ac != null) {
+            String authenticationID = ac.getAuthenticationID();
+            LOG.info("Successfully authenticated client: authenticationID=" + authenticationID + " authorizationID= " + ac.getAuthorizationID());
 
-        //if authorizationId is not set, set it to authenticationId.
-        if(ac.getAuthorizationID() == null) {
-            ac.setAuthorizedID(authenticationID);
-        }
+            //if authorizationId is not set, set it to authenticationId.
+            if (ac.getAuthorizationID() == null) {
+                ac.setAuthorizedID(authenticationID);
+            }
 
-        //When authNid and authZid are not equal , authNId is attempting to impersonate authZid, We
-        //add the authNid as the real user in reqContext's subject which will be used during authorization.
-        if(!ac.getAuthenticationID().equals(ac.getAuthorizationID())) {
-            ReqContext.context().setRealPrincipal(new SaslTransportPlugin.User(ac.getAuthenticationID()));
-        } else {
-            ReqContext.context().setRealPrincipal(null);
-        }
+            //When authNid and authZid are not equal , authNId is attempting to impersonate authZid, We
+            //add the authNid as the real user in reqContext's subject which will be used during authorization.
+            if (!ac.getAuthenticationID().equals(ac.getAuthorizationID())) {
+                ReqContext.context().setRealPrincipal(new SaslTransportPlugin.User(ac.getAuthenticationID()));
+            } else {
+                ReqContext.context().setRealPrincipal(null);
+            }
 
-        ac.setAuthorized(true);
+            ac.setAuthorized(true);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainClientCallbackHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainClientCallbackHandler.java b/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainClientCallbackHandler.java
index 13340df..b01cdc4 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainClientCallbackHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainClientCallbackHandler.java
@@ -15,17 +15,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.security.auth.plain;
 
-import org.apache.storm.security.auth.AbstractSaslClientCallbackHandler;
+import org.apache.storm.security.auth.sasl.SimpleSaslClientCallbackHandler;
 
-public class PlainClientCallbackHandler extends AbstractSaslClientCallbackHandler {
+/**
+ * This should only ever be used for testing.  It provides no security at all.
+ * DO NOT USE THIS.  The user name is the current user and the password is
+ * "password".
+ */
+@Deprecated
+public class PlainClientCallbackHandler extends SimpleSaslClientCallbackHandler {
 
-    /*
+    /**
      * For plain, using constants for a pair of user name and password.
      */
     public PlainClientCallbackHandler() {
-        _username = System.getProperty("user.name");
-        _password = PASSWORD;
+        super(System.getProperty("user.name"), "password");
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainSaslTransportPlugin.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainSaslTransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainSaslTransportPlugin.java
index eaef91a..2df61c1 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainSaslTransportPlugin.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainSaslTransportPlugin.java
@@ -15,10 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.security.auth.plain;
 
+import java.util.Optional;
 import org.apache.storm.security.auth.AuthUtils;
-import org.apache.storm.security.auth.SaslTransportPlugin;
+import org.apache.storm.security.auth.sasl.SaslTransportPlugin;
+import org.apache.storm.security.auth.sasl.SimpleSaslServerCallbackHandler;
 import org.apache.thrift.transport.TSaslClientTransport;
 import org.apache.thrift.transport.TSaslServerTransport;
 import org.apache.thrift.transport.TTransport;
@@ -31,6 +34,11 @@ import javax.security.auth.callback.CallbackHandler;
 import java.io.IOException;
 import java.security.Security;
 
+/**
+ * This should never be used except for testing.  It provides no security at all.
+ * The password is hard coded, and even if it were not it is sent in plain text.
+ */
+@Deprecated
 public class PlainSaslTransportPlugin extends SaslTransportPlugin {
     public static final String PLAIN = "PLAIN";
     private static final Logger LOG = LoggerFactory.getLogger(PlainSaslTransportPlugin.class);
@@ -38,7 +46,7 @@ public class PlainSaslTransportPlugin extends SaslTransportPlugin {
     @Override
     protected TTransportFactory getServerTransportFactory() throws IOException {
         //create an authentication callback handler
-        CallbackHandler serverCallbackHandler = new PlainServerCallbackHandler();
+        CallbackHandler serverCallbackHandler = new SimpleSaslServerCallbackHandler((userName) -> Optional.of("password".toCharArray()));
         if (Security.getProvider(SaslPlainServer.SecurityProvider.SASL_PLAIN_SERVER) == null) {
             Security.addProvider(new SaslPlainServer.SecurityProvider());
         }
@@ -46,7 +54,7 @@ public class PlainSaslTransportPlugin extends SaslTransportPlugin {
         TSaslServerTransport.Factory factory = new TSaslServerTransport.Factory();
         factory.addServerDefinition(PLAIN, AuthUtils.SERVICE, "localhost", null, serverCallbackHandler);
 
-        LOG.info("SASL PLAIN transport factory will be used");
+        LOG.error("SASL PLAIN transport factory will be used.  This is totally insecure.  Please do not use this.");
         return factory;
     }
 
@@ -62,10 +70,8 @@ public class PlainSaslTransportPlugin extends SaslTransportPlugin {
             transport);
 
         wrapperTransport.open();
-        LOG.debug("SASL PLAIN client transport has been established");
+        LOG.error("SASL PLAIN client transport has been established.  This is totally insecure.  Please do not use this.");
 
         return wrapperTransport;
-
     }
-
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainServerCallbackHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainServerCallbackHandler.java b/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainServerCallbackHandler.java
deleted file mode 100644
index c646fc9..0000000
--- a/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainServerCallbackHandler.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.security.auth.plain;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.storm.security.auth.AbstractSaslServerCallbackHandler;
-import org.apache.storm.security.auth.ReqContext;
-import org.apache.storm.security.auth.SaslTransportPlugin;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.sasl.AuthorizeCallback;
-import javax.security.sasl.RealmCallback;
-
-/**
- * SASL server side callback handler
- */
-public class PlainServerCallbackHandler extends AbstractSaslServerCallbackHandler {
-    private static final Logger LOG = LoggerFactory.getLogger(PlainServerCallbackHandler.class);
-    public static final String PASSWORD = "password";
-
-    public PlainServerCallbackHandler() throws IOException {
-        userName=null;
-    }
-
-    protected void handlePasswordCallback(PasswordCallback pc) {
-        LOG.debug("handlePasswordCallback");
-        pc.setPassword(PASSWORD.toCharArray());
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/plain/SaslPlainServer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/plain/SaslPlainServer.java b/storm-client/src/jvm/org/apache/storm/security/auth/plain/SaslPlainServer.java
index c84ce77..62ee872 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/plain/SaslPlainServer.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/plain/SaslPlainServer.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.security.auth.plain;
 
 import javax.security.auth.callback.Callback;
@@ -29,6 +30,7 @@ import javax.security.sasl.SaslServerFactory;
 import java.security.Provider;
 import java.util.Map;
 
+@Deprecated
 public class SaslPlainServer implements SaslServer {
   @SuppressWarnings("serial")
   public static class SecurityProvider extends Provider {

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/sasl/PasswordProvider.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/sasl/PasswordProvider.java b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/PasswordProvider.java
new file mode 100644
index 0000000..d1e0c0f
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/PasswordProvider.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.security.auth.sasl;
+
+import java.util.Optional;
+
+/**
+ * A very basic API that will provide a password for a given user name.
+ * This is intended to be used with the SimpleSaslServerCallbackHandler
+ * to verify a user that is attempting to log in.
+ */
+public interface PasswordProvider {
+    /**
+     * Get an optional password for a user.  If no password for the user is found
+     * the option will be empty and another PasswordProvider would be tried.
+     * @param user the user this is for.
+     * @return the password if it is found.
+     */
+    Optional<char[]> getPasswordFor(String user);
+
+    /**
+     * Convert the supplied user name to the actual user name that should be used
+     * in the system.  This may be called on any name.  If it cannot be translated
+     * then a null may be returned or an exception thrown.  If getPassword returns successfully
+     * this should not return null, nor throw an exception for the same user.
+     * @param user the SASL negotiated user name.
+     * @return the user name that storm should use.
+     */
+    default String userName(String user) {
+        return user;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java
new file mode 100644
index 0000000..71cdbf5
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.security.auth.sasl;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.security.Principal;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import javax.security.auth.Subject;
+import javax.security.auth.login.Configuration;
+import javax.security.sasl.SaslServer;
+import org.apache.storm.security.auth.ITransportPlugin;
+import org.apache.storm.security.auth.ReqContext;
+import org.apache.storm.security.auth.ThriftConnectionType;
+import org.apache.storm.security.auth.kerberos.NoOpTTrasport;
+import org.apache.storm.utils.ExtendedThreadPoolExecutor;
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TSaslServerTransport;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.TTransportFactory;
+
+/**
+ * Base class for SASL authentication plugin.
+ */
+public abstract class SaslTransportPlugin implements ITransportPlugin {
+    protected ThriftConnectionType type;
+    protected Map<String, Object> conf;
+    protected Configuration loginConf;
+    private int port;
+
+    @Override
+    public void prepare(ThriftConnectionType type, Map<String, Object> conf, Configuration loginConf) {
+        this.type = type;
+        this.conf = conf;
+        this.loginConf = loginConf;
+    }
+
+    @Override
+    public TServer getServer(TProcessor processor) throws IOException, TTransportException {
+        int configuredPort = type.getPort(conf);
+        Integer socketTimeout = type.getSocketTimeOut(conf);
+        TTransportFactory serverTransportFactory = getServerTransportFactory();
+        TServerSocket serverTransport = null;
+        if (socketTimeout != null) {
+            serverTransport = new TServerSocket(configuredPort, socketTimeout);
+        } else {
+            serverTransport = new TServerSocket(configuredPort);
+        }
+        this.port = serverTransport.getServerSocket().getLocalPort();
+        int numWorkerThreads = type.getNumThreads(conf);
+        Integer queueSize = type.getQueueSize(conf);
+
+        TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport)
+            .processor(new TUGIWrapProcessor(processor))
+            .minWorkerThreads(numWorkerThreads)
+            .maxWorkerThreads(numWorkerThreads)
+            .protocolFactory(new TBinaryProtocol.Factory(false, true));
+
+        if (serverTransportFactory != null) {
+            serverArgs.transportFactory(serverTransportFactory);
+        }
+        BlockingQueue workQueue = new SynchronousQueue();
+        if (queueSize != null) {
+            workQueue = new ArrayBlockingQueue(queueSize);
+        }
+        ThreadPoolExecutor executorService = new ExtendedThreadPoolExecutor(numWorkerThreads, numWorkerThreads,
+            60, TimeUnit.SECONDS, workQueue);
+        serverArgs.executorService(executorService);
+        return new TThreadPoolServer(serverArgs);
+    }
+
+    /**
+     * Create the transport factory needed for serving.  All subclass must implement this method.
+     * @return server transport factory
+     * @throws IOException on any error.
+     */
+    protected abstract TTransportFactory getServerTransportFactory() throws IOException;
+    
+    @Override
+    public int getPort() {
+        return this.port;
+    }
+
+
+    /**
+     * Processor that pulls the SaslServer object out of the transport, and
+     * assumes the remote user's UGI before calling through to the original
+     * processor. This is used on the server side to set the UGI for each specific call.
+     */
+    private static class TUGIWrapProcessor implements TProcessor {
+        final TProcessor wrapped;
+
+        TUGIWrapProcessor(TProcessor wrapped) {
+            this.wrapped = wrapped;
+        }
+
+        public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException {
+            //populating request context
+            ReqContext reqContext = ReqContext.context();
+
+            TTransport trans = inProt.getTransport();
+            //Sasl transport
+            TSaslServerTransport saslTrans = (TSaslServerTransport)trans;
+
+            if (trans instanceof NoOpTTrasport) {
+                return false;
+            }
+
+            //remote address
+            TSocket tsocket = (TSocket)saslTrans.getUnderlyingTransport();
+            Socket socket = tsocket.getSocket();
+            reqContext.setRemoteAddress(socket.getInetAddress());
+
+            //remote subject
+            SaslServer saslServer = saslTrans.getSaslServer();
+            String authId = saslServer.getAuthorizationID();
+            Subject remoteUser = new Subject();
+            remoteUser.getPrincipals().add(new User(authId));
+            reqContext.setSubject(remoteUser);
+
+            //invoke service handler
+            return wrapped.process(inProt, outProt);
+        }
+    }
+
+    public static class User implements Principal {
+        private final String name;
+
+        public User(String name) {
+            this.name =  name;
+        }
+
+        /**
+         * Get the full name of the user.
+         */
+        public String getName() {
+            return name;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            return !(o == null || getClass() != o.getClass()) && (name.equals(((User) o).name));
+        }
+
+        @Override
+        public int hashCode() {
+            return name.hashCode();
+        }
+
+        @Override
+        public String toString() {
+            return name;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SimpleSaslClientCallbackHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SimpleSaslClientCallbackHandler.java b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SimpleSaslClientCallbackHandler.java
new file mode 100644
index 0000000..2242f0c
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SimpleSaslClientCallbackHandler.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.security.auth.sasl;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+
+/**
+ * A client callback handler that supports a single username and password.
+ */
+public class SimpleSaslClientCallbackHandler implements CallbackHandler {
+    private final String username;
+    private final String password;
+
+    /**
+     * Constructor.
+     * @param username the username to use.
+     * @param password the password to use.
+     */
+    public SimpleSaslClientCallbackHandler(String username, String password) {
+        this.username = username;
+        this.password = password;
+    }
+
+    @Override
+    public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
+        for (Callback c : callbacks) {
+            if (c instanceof NameCallback) {
+                NameCallback nc = (NameCallback) c;
+                nc.setName(username);
+            } else if (c instanceof PasswordCallback) {
+                PasswordCallback pc = (PasswordCallback)c;
+                if (password != null) {
+                    pc.setPassword(password.toCharArray());
+                }
+            } else if (c instanceof AuthorizeCallback) {
+                AuthorizeCallback ac = (AuthorizeCallback) c;
+                String authid = ac.getAuthenticationID();
+                String authzid = ac.getAuthorizationID();
+                if (authid.equals(authzid)) {
+                    ac.setAuthorized(true);
+                } else {
+                    ac.setAuthorized(false);
+                }
+                if (ac.isAuthorized()) {
+                    ac.setAuthorizedID(authzid);
+                }
+            } else if (c instanceof RealmCallback) {
+                RealmCallback rc = (RealmCallback) c;
+                ((RealmCallback) c).setText(rc.getDefaultText());
+            } else {
+                throw new UnsupportedCallbackException(c);
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SimpleSaslServerCallbackHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SimpleSaslServerCallbackHandler.java b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SimpleSaslServerCallbackHandler.java
new file mode 100644
index 0000000..d64fa1b
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SimpleSaslServerCallbackHandler.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.security.auth.sasl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+import org.apache.storm.security.auth.ReqContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SimpleSaslServerCallbackHandler implements CallbackHandler {
+    private static final Logger LOG = LoggerFactory.getLogger(SimpleSaslServerCallbackHandler.class);
+    private final List<PasswordProvider> providers;
+
+    /**
+     * Constructor with different password providers.
+     * @param providers what will provide a password.  They will be checked in order, and the first one to
+     *     return a password wins.
+     */
+    public SimpleSaslServerCallbackHandler(PasswordProvider ... providers) {
+        this(Arrays.asList(providers));
+    }
+
+    /**
+     * Constructor with different password providers.
+     * @param providers what will provide a password.  They will be checked in order, and the first one to
+     *     return a password wins.
+     */
+    public SimpleSaslServerCallbackHandler(List<PasswordProvider> providers) {
+        this.providers = new ArrayList<>(providers);
+    }
+
+    private static void log(String type, AuthorizeCallback ac, NameCallback nc, PasswordCallback pc, RealmCallback rc) {
+        if (LOG.isDebugEnabled()) {
+            String acs = "null";
+            if (ac != null) {
+                acs = "athz: " + ac.getAuthorizationID() + " athn: " + ac.getAuthenticationID() + " authorized: " + ac.getAuthorizedID();
+            }
+
+            String ncs = "null";
+            if (nc != null) {
+                ncs = "default: " + nc.getDefaultName() + " name: " + nc.getName();
+            }
+
+            String pcs = "null";
+            if (pc != null) {
+                char[] pwd = pc.getPassword();
+                pcs = "password: " + (pwd == null ? "null" : "not null " + pwd.length);
+            }
+
+            String rcs = "null";
+            if (rc != null) {
+                rcs = "default: " + rc.getDefaultText() + " text: " + rc.getText();
+            }
+            LOG.debug("{}\nAC: {}\nNC: {}\nPC: {}\nRC: {}", type, acs, ncs, pcs, rcs);
+        }
+    }
+
+    private String translateName(String orig) {
+        for (PasswordProvider provider: providers) {
+            try {
+                String ret = provider.userName(orig);
+                if (ret != null) {
+                    return ret;
+                }
+            } catch (Exception e) {
+                //Translating the name (this call) happens in a different callback from validating
+                // the user name and password. This has to be stateless though, so we cannot save
+                // the password provider away to be sure we got the same one that validated the password.
+                // If the password providers are written correctly this should never happen,
+                // because if they cannot read the name they would return a null.
+                // But on the off chance that something goes wrong with the translation because of a mismatch
+                // we try to skip the bad one.
+                LOG.debug("{} could not read name from {}", provider, orig, e);
+            }
+        }
+        // In the worst case we will return a serialized name after a password provider said that the password
+        // was okay.  In that case the ACLs are likely to prevent the request from going through anyways.
+        // But that is only if there is a bug in one of the password providers.
+        return orig;
+    }
+
+    @Override
+    public void handle(Callback[] callbacks) throws UnsupportedCallbackException, IOException {
+        NameCallback nc = null;
+        PasswordCallback pc = null;
+        AuthorizeCallback ac = null;
+        RealmCallback rc = null;
+        for (Callback callback : callbacks) {
+            if (callback instanceof AuthorizeCallback) {
+                ac = (AuthorizeCallback) callback;
+            } else if (callback instanceof NameCallback) {
+                nc = (NameCallback) callback;
+            } else if (callback instanceof PasswordCallback) {
+                pc = (PasswordCallback) callback;
+            } else if (callback instanceof RealmCallback) {
+                rc = (RealmCallback) callback;
+            } else {
+                throw new UnsupportedCallbackException(callback,
+                    "Unrecognized SASL Callback");
+            }
+        }
+
+        log("GOT", ac, nc, pc, rc);
+
+        if (nc != null) {
+            String userName = nc.getDefaultName();
+            boolean passwordFound = false;
+            for (PasswordProvider provider : providers) {
+                Optional<char[]> password = provider.getPasswordFor(userName);
+                if (password.isPresent()) {
+                    pc.setPassword(password.get());
+                    nc.setName(provider.userName(userName));
+                    passwordFound = true;
+                    break;
+                }
+            }
+            if (!passwordFound) {
+                LOG.warn("No password found for user: {}", userName);
+                throw new IOException("NOT ALLOWED.");
+            }
+        }
+
+        if (rc != null) {
+            rc.setText(rc.getDefaultText());
+        }
+
+        if (ac != null) {
+            String nid = ac.getAuthenticationID();
+            if (nid != null) {
+                nid = translateName(nid);
+            }
+
+            String zid = ac.getAuthorizationID();
+            if (zid != null) {
+                zid = translateName(zid);
+            }
+            LOG.info("Successfully authenticated client: authenticationID = {} authorizationID = {}",
+                nid, zid);
+
+            //if authorizationId is not set, set it to authenticationId.
+            if (zid == null) {
+                ac.setAuthorizedID(nid);
+                zid = nid;
+            } else {
+                ac.setAuthorizedID(zid);
+            }
+
+            //When zid and zid are not equal, nid is attempting to impersonate zid, We
+            //add the nid as the real user in reqContext's subject which will be used during authorization.
+            if (!nid.equals(zid)) {
+                LOG.info("Impersonation attempt  authenticationID = {} authorizationID = {}",
+                    nid,  zid);
+                ReqContext.context().setRealPrincipal(new SaslTransportPlugin.User(nid));
+            } else {
+                ReqContext.context().setRealPrincipal(null);
+            }
+
+            ac.setAuthorized(true);
+        }
+        log("FINISHED", ac, nc, pc, rc);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java b/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java
new file mode 100644
index 0000000..b196ade
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.security.auth.workertoken;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import java.util.Base64;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import javax.crypto.SecretKey;
+import javax.crypto.spec.SecretKeySpec;
+import org.apache.storm.cluster.ClusterStateContext;
+import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.generated.PrivateWorkerKey;
+import org.apache.storm.generated.WorkerTokenInfo;
+import org.apache.storm.generated.WorkerTokenServiceType;
+import org.apache.storm.security.auth.AuthUtils;
+import org.apache.storm.security.auth.ThriftConnectionType;
+import org.apache.storm.security.auth.sasl.PasswordProvider;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Allow for SASL authentication using worker tokens.
+ */
+public class WorkerTokenAuthorizer implements PasswordProvider {
+    private static final Logger LOG = LoggerFactory.getLogger(WorkerTokenAuthorizer.class);
+
+    private static IStormClusterState buildStateIfNeeded(Map<String, Object> conf, ThriftConnectionType connectionType) {
+        IStormClusterState state = null;
+
+        if (AuthUtils.areWorkerTokensEnabledServer(connectionType, conf)) {
+            try {
+                state = ClusterUtils.mkStormClusterState(conf, new ClusterStateContext(DaemonType.UNKNOWN, conf));
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+        return state;
+    }
+
+    private final LoadingCache<WorkerTokenInfo, PrivateWorkerKey> keyCache;
+
+    /**
+     * Constructor.
+     * @param conf the daemon config for the server.
+     * @param connectionType the type of connection we are authorizing.
+     */
+    public WorkerTokenAuthorizer(Map<String, Object> conf, ThriftConnectionType connectionType) {
+        this(connectionType.getWtType(), buildStateIfNeeded(conf, connectionType));
+    }
+
+    @VisibleForTesting
+    WorkerTokenAuthorizer(final WorkerTokenServiceType serviceType, final IStormClusterState state) {
+        LoadingCache<WorkerTokenInfo, PrivateWorkerKey> tmpKeyCache = null;
+        if (state != null) {
+            tmpKeyCache =
+                CacheBuilder.newBuilder()
+                    .maximumSize(2_000)
+                    .expireAfterWrite(2, TimeUnit.HOURS)
+                    .build(new CacheLoader<WorkerTokenInfo, PrivateWorkerKey>() {
+
+                        @Override
+                        public PrivateWorkerKey load(WorkerTokenInfo wtInfo) {
+                            return state.getPrivateWorkerKey(serviceType,
+                                wtInfo.get_topologyId(),
+                                wtInfo.get_secretVersion());
+                        }
+                    });
+        }
+        keyCache = tmpKeyCache;
+    }
+
+    @VisibleForTesting
+    byte[] getSignedPasswordFor(byte[] user, WorkerTokenInfo deser) {
+        assert keyCache != null;
+
+        if (deser.is_set_expirationTimeMillis() && deser.get_expirationTimeMillis() <= Time.currentTimeMillis()) {
+            throw new IllegalArgumentException("Token is not valid, token has expired.");
+        }
+
+        PrivateWorkerKey key = keyCache.getUnchecked(deser);
+        if (key == null) {
+            throw new IllegalArgumentException("Token is not valid, private key not found.");
+        }
+
+        if (key.is_set_expirationTimeMillis() && key.get_expirationTimeMillis() <= Time.currentTimeMillis()) {
+            throw new IllegalArgumentException("Token is not valid, key has expired.");
+        }
+
+        return WorkerTokenSigner.createPassword(user, new SecretKeySpec(key.get_key(), WorkerTokenSigner.DEFAULT_HMAC_ALGORITHM));
+    }
+
+    @Override
+    public Optional<char[]> getPasswordFor(String userName) {
+        if (keyCache == null) {
+            return Optional.empty();
+        }
+        try {
+            byte[] user = Base64.getDecoder().decode(userName);
+            WorkerTokenInfo deser = Utils.deserialize(user, WorkerTokenInfo.class);
+            byte[] password = getSignedPasswordFor(user, deser);
+            return Optional.of(Base64.getEncoder().encodeToString(password).toCharArray());
+        } catch (Exception e) {
+            LOG.debug("Could not decode {}, might just be a plain digest request...", userName, e);
+            return Optional.empty();
+        }
+    }
+
+    @Override
+    public String userName(String userName) {
+        byte[] user = Base64.getDecoder().decode(userName);
+        WorkerTokenInfo deser = Utils.deserialize(user, WorkerTokenInfo.class);
+        return deser.get_userName();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenClientCallbackHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenClientCallbackHandler.java b/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenClientCallbackHandler.java
new file mode 100644
index 0000000..8bdc1be
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenClientCallbackHandler.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.security.auth.workertoken;
+
+import java.security.AccessController;
+import java.util.Base64;
+import javax.security.auth.Subject;
+import org.apache.storm.generated.WorkerToken;
+import org.apache.storm.generated.WorkerTokenServiceType;
+import org.apache.storm.security.auth.AuthUtils;
+import org.apache.storm.security.auth.ThriftConnectionType;
+import org.apache.storm.security.auth.sasl.SimpleSaslClientCallbackHandler;
+
+/**
+ * A Client callback handler for a WorkerToken.  In general a client that wants to
+ * support worker tokens should first check if a WorkerToken is available for the
+ * specific connection type by calling findWorkerTokenInSubject.  If that returns
+ * a token, then proceed to create and use this with a DIGEST-MD5 SaslClient.
+ * If not you should fall back to whatever other client auth you want to do.
+ */
+public class WorkerTokenClientCallbackHandler extends SimpleSaslClientCallbackHandler {
+
+    /**
+     * Look in the current subject for a WorkerToken.  This should really only happen
+     * when we are in a worker, because the tokens will not be placed in anything else.
+     * @param type the type of connection we need a token for.
+     * @return the found token or null.
+     */
+    public static WorkerToken findWorkerTokenInSubject(ThriftConnectionType type) {
+        WorkerTokenServiceType serviceType = type.getWtType();
+        WorkerToken ret = null;
+        if (serviceType != null) {
+            Subject subject = Subject.getSubject(AccessController.getContext());
+            if (subject != null) {
+                ret = AuthUtils.findWorkerToken(subject, serviceType);
+            }
+        }
+        return ret;
+    }
+
+    /**
+     * Constructor.
+     * @param token the token to use to authenticate.  This was probably retrieved by calling findWorkerTokenInSubject.
+     */
+    public WorkerTokenClientCallbackHandler(WorkerToken token) {
+        super(Base64.getEncoder().encodeToString(token.get_info()),
+            Base64.getEncoder().encodeToString(token.get_signature()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenSigner.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenSigner.java b/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenSigner.java
new file mode 100644
index 0000000..46d3c07
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenSigner.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.security.auth.workertoken;
+
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import javax.crypto.Mac;
+import javax.crypto.SecretKey;
+
+/**
+ * Provides everything needed to sign a worker token with a secret key.
+ */
+class WorkerTokenSigner {
+    /**
+     * The name of the hashing algorithm.
+     */
+    static final String DEFAULT_HMAC_ALGORITHM = "HmacSHA256";
+
+    /**
+     * A thread local store for the Macs.
+     */
+    private static final ThreadLocal<Mac> threadLocalMac =
+        ThreadLocal.withInitial(() -> {
+            try {
+                return Mac.getInstance(DEFAULT_HMAC_ALGORITHM);
+            } catch (NoSuchAlgorithmException nsa) {
+                throw new IllegalArgumentException("Can't find " + DEFAULT_HMAC_ALGORITHM + " algorithm.");
+            }
+        });
+
+    /**
+     * Compute HMAC of the identifier using the secret key and return the
+     * output as password.
+     * @param identifier the bytes of the identifier
+     * @param key the secret key
+     * @return the bytes of the generated password
+     */
+    static byte[] createPassword(byte[] identifier, SecretKey key) {
+        Mac mac = threadLocalMac.get();
+        try {
+            mac.init(key);
+        } catch (InvalidKeyException ike) {
+            throw new IllegalArgumentException("Invalid key to HMAC computation", ike);
+        }
+        return mac.doFinal(identifier);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/utils/ObjectReader.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/ObjectReader.java b/storm-client/src/jvm/org/apache/storm/utils/ObjectReader.java
index 07fece1..f50947a 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/ObjectReader.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/ObjectReader.java
@@ -76,6 +76,18 @@ public class ObjectReader {
         throw new IllegalArgumentException("Don't know how to convert " + o + " to int");
     }
 
+    public static Long getLong(Object o, Long defaultValue) {
+        if (null == o) {
+            return defaultValue;
+        }
+        if (o instanceof Number) {
+            return ((Number)o).longValue();
+        } else if (o instanceof String) {
+            return Long.valueOf((String) o);
+        }
+        throw new IllegalArgumentException("Don't know how to convert " + o + " to a long");
+    }
+
     public static Double getDouble(Object o) {
         Double result = getDouble(o, null);
         if (null == result) {

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
index 73cfc81..e20e2ee 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
@@ -42,6 +42,7 @@ import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Base64;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Enumeration;
@@ -81,6 +82,7 @@ import org.apache.storm.generated.Nimbus;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.generated.TopologyInfo;
 import org.apache.storm.generated.TopologySummary;
+import org.apache.storm.generated.WorkerToken;
 import org.apache.storm.security.auth.ReqContext;
 import org.apache.storm.serialization.DefaultSerializationDelegate;
 import org.apache.storm.serialization.SerializationDelegate;
@@ -524,6 +526,14 @@ public class Utils {
         return ret.toString();
     }
 
+    public static Id parseZkId(String id, String configName) {
+        String[] split = id.split(":", 2);
+        if (split.length != 2) {
+            throw new IllegalArgumentException(configName + " does not appear to be in the form scheme:acl, i.e. sasl:storm-user");
+        }
+        return new Id(split[0], split[1]);
+    }
+
     public static List<ACL> getWorkerACL(Map<String, Object> conf) {
         //This is a work around to an issue with ZK where a sasl super user is not super unless there is an open SASL ACL so we are trying to give the correct perms
         if (!isZkAuthenticationConfiguredTopology(conf)) {
@@ -533,12 +543,8 @@ public class Utils {
         if (stormZKUser == null) {
             throw new IllegalArgumentException("Authentication is enabled but " + Config.STORM_ZOOKEEPER_SUPERACL + " is not set");
         }
-        String[] split = stormZKUser.split(":", 2);
-        if (split.length != 2) {
-            throw new IllegalArgumentException(Config.STORM_ZOOKEEPER_SUPERACL + " does not appear to be in the form scheme:acl, i.e. sasl:storm-user");
-        }
-        ArrayList<ACL> ret = new ArrayList<ACL>(ZooDefs.Ids.CREATOR_ALL_ACL);
-        ret.add(new ACL(ZooDefs.Perms.ALL, new Id(split[0], split[1])));
+        ArrayList<ACL> ret = new ArrayList<>(ZooDefs.Ids.CREATOR_ALL_ACL);
+        ret.add(new ACL(ZooDefs.Perms.ALL, parseZkId(stormZKUser, Config.STORM_ZOOKEEPER_SUPERACL)));
         return ret;
     }
 
@@ -693,6 +699,27 @@ public class Utils {
         return serializationDelegate.deserialize(serialized, clazz);
     }
 
+    /**
+     * Serialize an object using the configured serialization and then base64 encode it into a string.
+     * @param obj the object to encode
+     * @return a string with the encoded object in it.
+     */
+    public static String serializeToString(Object obj) {
+        return Base64.getEncoder().encodeToString(serializationDelegate.serialize(obj));
+    }
+
+    /**
+     * Deserialize an object stored in a string. The String is assumed to be a base64 encoded string
+     * containing the bytes to actually deserialize.
+     * @param str the encoded string.
+     * @param clazz the thrift class we are expecting.
+     * @param <T> The type of clazz
+     * @return the decoded object
+     */
+    public static <T> T deserializeFromString(String str, Class<T> clazz) {
+        return deserialize(Base64.getDecoder().decode(str), clazz);
+    }
+
     public static byte[] toByteArray(ByteBuffer buffer) {
         byte[] ret = new byte[buffer.remaining()];
         buffer.get(ret, 0, ret.length);