You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/04/06 17:32:51 UTC

[02/52] [partial] storm git commit: STORM-2441 Break down 'storm-core' to extract client (worker) artifacts

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslClientHandler.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslClientHandler.java
new file mode 100644
index 0000000..d7b20d6
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslClientHandler.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import java.io.IOException;
+import java.util.Map;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KerberosSaslClientHandler extends SimpleChannelUpstreamHandler {
+
+    private static final Logger LOG = LoggerFactory
+        .getLogger(KerberosSaslClientHandler.class);
+    private ISaslClient client;
+    long start_time;
+    /** Used for client or server's token to send or receive from each other. */
+    private Map storm_conf;
+    private String jaas_section;
+    private String host;
+
+    public KerberosSaslClientHandler(ISaslClient client, Map storm_conf, String jaas_section, String host) throws IOException {
+        this.client = client;
+        this.storm_conf = storm_conf;
+        this.jaas_section = jaas_section;
+        this.host = host;
+        start_time = System.currentTimeMillis();
+    }
+
+    @Override
+    public void channelConnected(ChannelHandlerContext ctx,
+                                 ChannelStateEvent event) {
+        // register the newly established channel
+        Channel channel = ctx.getChannel();
+        client.channelConnected(channel);
+
+        LOG.info("Connection established from {} to {}",
+                 channel.getLocalAddress(), channel.getRemoteAddress());
+
+        try {
+            KerberosSaslNettyClient saslNettyClient = KerberosSaslNettyClientState.getKerberosSaslNettyClient
+                .get(channel);
+
+            if (saslNettyClient == null) {
+                LOG.debug("Creating saslNettyClient now for channel: {}",
+                          channel);
+                saslNettyClient = new KerberosSaslNettyClient(storm_conf, jaas_section, host);
+                KerberosSaslNettyClientState.getKerberosSaslNettyClient.set(channel,
+                                                                            saslNettyClient);
+            }
+            LOG.debug("Going to initiate Kerberos negotiations.");
+            byte[] initialChallenge = saslNettyClient.saslResponse(new SaslMessageToken(new byte[0]));
+            LOG.debug("Sending initial challenge: {}", initialChallenge);
+            channel.write(new SaslMessageToken(initialChallenge));
+        } catch (Exception e) {
+            LOG.error("Failed to authenticate with server due to error: ",
+                      e);
+        }
+        return;
+
+    }
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent event)
+        throws Exception {
+        LOG.debug("send/recv time (ms): {}",
+                  (System.currentTimeMillis() - start_time));
+
+        Channel channel = ctx.getChannel();
+
+        // Generate SASL response to server using Channel-local SASL client.
+        KerberosSaslNettyClient saslNettyClient = KerberosSaslNettyClientState.getKerberosSaslNettyClient
+            .get(channel);
+        if (saslNettyClient == null) {
+            throw new Exception("saslNettyClient was unexpectedly null for channel:" + channel);
+        }
+
+        // examine the response message from server
+        if (event.getMessage() instanceof ControlMessage) {
+            ControlMessage msg = (ControlMessage) event.getMessage();
+            if (msg == ControlMessage.SASL_COMPLETE_REQUEST) {
+                LOG.debug("Server has sent us the SaslComplete message. Allowing normal work to proceed.");
+
+                if (!saslNettyClient.isComplete()) {
+                    String message = "Server returned a Sasl-complete message, but as far as we can tell, we are not authenticated yet.";
+                    LOG.error(message);
+                    throw new Exception(message);
+                }
+                ctx.getPipeline().remove(this);
+                this.client.channelReady();
+
+                // We call fireMessageReceived since the client is allowed to
+                // perform this request. The client's request will now proceed
+                // to the next pipeline component namely StormClientHandler.
+                Channels.fireMessageReceived(ctx, msg);
+            } else {
+                LOG.warn("Unexpected control message: {}", msg);
+            }
+            return;
+        }
+        else if (event.getMessage() instanceof SaslMessageToken) {
+            SaslMessageToken saslTokenMessage = (SaslMessageToken) event
+                .getMessage();
+            LOG.debug("Responding to server's token of length: {}",
+                      saslTokenMessage.getSaslToken().length);
+
+            // Generate SASL response (but we only actually send the response if
+            // it's non-null.
+            byte[] responseToServer = saslNettyClient
+                .saslResponse(saslTokenMessage);
+            if (responseToServer == null) {
+                // If we generate a null response, then authentication has completed
+                // (if not, warn), and return without sending a response back to the
+                // server.
+                LOG.debug("Response to server is null: authentication should now be complete.");
+                if (!saslNettyClient.isComplete()) {
+                    LOG.warn("Generated a null response, but authentication is not complete.");
+                    throw new Exception("Our reponse to the server is null, but as far as we can tell, we are not authenticated yet.");
+                }
+                this.client.channelReady();
+                return;
+            } else {
+                LOG.debug("Response to server token has length: {}",
+                          responseToServer.length);
+            }
+            // Construct a message containing the SASL response and send it to the
+            // server.
+            SaslMessageToken saslResponse = new SaslMessageToken(responseToServer);
+            channel.write(saslResponse);
+        } else {
+            LOG.error("Unexpected message from server: {}", event.getMessage());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClient.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClient.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClient.java
new file mode 100644
index 0000000..2efcb19
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClient.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import org.apache.storm.Config;
+import org.apache.storm.security.auth.AuthUtils;
+import java.io.IOException;
+import java.security.Principal;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Map;
+import java.util.TreeMap;
+import javax.security.auth.Subject;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.kerberos.KerberosTicket;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginException;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import org.apache.zookeeper.server.auth.KerberosName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements SASL logic for storm worker client processes.
+ */
+public class KerberosSaslNettyClient {
+
+    private static final Logger LOG = LoggerFactory
+        .getLogger(KerberosSaslNettyClient.class);
+
+    /**
+     * Used to respond to server's counterpart, SaslServer with SASL tokens
+     * represented as byte arrays.
+     */
+    private SaslClient saslClient;
+    private Subject subject;
+    private String jaas_section;
+
+    /**
+     * Create a KerberosSaslNettyClient for authentication with servers.
+     */
+    public KerberosSaslNettyClient(Map storm_conf, String jaas_section, String host) {
+        LOG.debug("KerberosSaslNettyClient: Creating SASL {} client to authenticate to server ",
+                  SaslUtils.KERBEROS);
+
+        LOG.info("Creating Kerberos Client.");
+
+        Configuration login_conf;
+        try {
+            login_conf = AuthUtils.GetConfiguration(storm_conf);
+        }
+        catch (Throwable t) {
+            LOG.error("Failed to get login_conf: ", t);
+            throw t;
+        }
+        LOG.debug("KerberosSaslNettyClient: authmethod {}", SaslUtils.KERBEROS);
+
+        SaslClientCallbackHandler ch = new SaslClientCallbackHandler();
+
+        subject = null;
+        try {
+            LOG.debug("Setting Configuration to login_config: {}", login_conf);
+            //specify a configuration object to be used
+            Configuration.setConfiguration(login_conf);
+            //now login
+            LOG.debug("Trying to login.");
+            Login login = new Login(jaas_section, ch);
+            subject = login.getSubject();
+            LOG.debug("Got Subject: {}", subject.toString());
+        } catch (LoginException ex) {
+            LOG.error("Client failed to login in principal:" + ex, ex);
+            throw new RuntimeException(ex);
+        }
+
+        //check the credential of our principal
+        if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) {
+            LOG.error("Failed to verify user principal.");
+            throw new RuntimeException("Fail to verify user principal with section \"" +
+                                       jaas_section +
+                                       "\" in login configuration file " +
+                                       login_conf);
+        }
+
+        String serviceName = null;
+        try {
+            serviceName = AuthUtils.get(login_conf, jaas_section, "serviceName");
+        }
+        catch (IOException e) {
+            LOG.error("Failed to get service name.", e);
+            throw new RuntimeException(e);
+        }
+
+        try {
+            Principal principal = (Principal)subject.getPrincipals().toArray()[0];
+            final String fPrincipalName = principal.getName();
+            final String fHost = host;
+            final String fServiceName = serviceName;
+            final CallbackHandler fch = ch;
+            LOG.debug("Kerberos Client with principal: {}, host: {}", fPrincipalName, fHost);
+            saslClient = Subject.doAs(subject, new PrivilegedExceptionAction<SaslClient>() {
+                    public SaslClient run() {
+                        try {
+                            Map<String, String> props = new TreeMap<String,String>();
+                            props.put(Sasl.QOP, "auth");
+                            props.put(Sasl.SERVER_AUTH, "false");
+                            return Sasl.createSaslClient(
+                                new String[] { SaslUtils.KERBEROS },
+                                fPrincipalName,
+                                fServiceName,
+                                fHost,
+                                props, fch);
+                        }
+                        catch (Exception e) {
+                            LOG.error("Subject failed to create sasl client.", e);
+                            return null;
+                        }
+                    }
+                });
+            LOG.info("Got Client: {}", saslClient);
+
+        } catch (PrivilegedActionException e) {
+            LOG.error("KerberosSaslNettyClient: Could not create Sasl Netty Client.");
+            throw new RuntimeException(e);
+        }
+    }
+
+    public boolean isComplete() {
+        return saslClient.isComplete();
+    }
+
+    /**
+     * Respond to server's SASL token.
+     *
+     * @param saslTokenMessage
+     *            contains server's SASL token
+     * @return client's response SASL token
+     */
+    public byte[] saslResponse(SaslMessageToken saslTokenMessage) {
+        try {
+            final SaslMessageToken fSaslTokenMessage = saslTokenMessage;
+            byte [] retval = Subject.doAs(subject, new PrivilegedExceptionAction<byte[]>() {
+                    public byte[] run() {
+                        try {
+                            byte[] retval = saslClient.evaluateChallenge(fSaslTokenMessage
+                                                                         .getSaslToken());
+                            return retval;
+                        } catch (SaslException e) {
+                            LOG.error("saslResponse: Failed to respond to SASL server's token:",
+                                      e);
+                            throw new RuntimeException(e);
+                        }
+                    }
+                });
+            return retval;
+        }
+        catch (PrivilegedActionException e) {
+            LOG.error("Failed to generate response for token: ", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Implementation of javax.security.auth.callback.CallbackHandler that works
+     * with Storm topology tokens.
+     */
+    private static class SaslClientCallbackHandler implements CallbackHandler {
+
+        /**
+         * Implementation used to respond to SASL tokens from server.
+         *
+         * @param callbacks
+         *            objects that indicate what credential information the
+         *            server's SaslServer requires from the client.
+         * @throws UnsupportedCallbackException
+         */
+        public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
+            for (Callback callback : callbacks) {
+                LOG.info("Kerberos Client Callback Handler got callback: {}", callback.getClass());
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClientState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClientState.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClientState.java
new file mode 100644
index 0000000..dc76b0d
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClientState.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelLocal;
+
+final class KerberosSaslNettyClientState {
+
+    public static final ChannelLocal<KerberosSaslNettyClient> getKerberosSaslNettyClient = new ChannelLocal<KerberosSaslNettyClient>() {
+        protected KerberosSaslNettyClient initialValue(Channel channel) {
+            return null;
+        }
+    };
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServer.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServer.java
new file mode 100644
index 0000000..72486ef
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServer.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import org.apache.storm.security.auth.AuthUtils;
+import org.apache.storm.security.auth.KerberosPrincipalToLocal;
+import java.io.IOException;
+import java.security.Principal;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import javax.security.auth.Subject;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.kerberos.KerberosPrincipal;
+import javax.security.auth.kerberos.KerberosTicket;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import org.apache.zookeeper.server.auth.KerberosName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+class KerberosSaslNettyServer {
+
+    private static final Logger LOG = LoggerFactory
+        .getLogger(KerberosSaslNettyServer.class);
+
+    private SaslServer saslServer;
+    private Subject subject;
+    private List<String> authorizedUsers;
+
+    KerberosSaslNettyServer(Map storm_conf, String jaas_section, List<String> authorizedUsers) {
+        this.authorizedUsers = authorizedUsers;
+        LOG.debug("Getting Configuration.");
+        Configuration login_conf;
+        try {
+            login_conf = AuthUtils.GetConfiguration(storm_conf);
+        }
+        catch (Throwable t) {
+            LOG.error("Failed to get login_conf: ", t);
+            throw t;
+        }
+
+        LOG.debug("KerberosSaslNettyServer: authmethod {}", SaslUtils.KERBEROS);
+
+        KerberosSaslCallbackHandler ch = new KerberosSaslNettyServer.KerberosSaslCallbackHandler(authorizedUsers);
+
+        //login our principal
+        subject = null;
+        try {
+            LOG.debug("Setting Configuration to login_config: {}", login_conf);
+            //specify a configuration object to be used
+            Configuration.setConfiguration(login_conf);
+            //now login
+            LOG.debug("Trying to login.");
+            Login login = new Login(jaas_section, ch);
+            subject = login.getSubject();
+            LOG.debug("Got Subject: {}", subject.toString());
+        } catch (LoginException ex) {
+            LOG.error("Server failed to login in principal:", ex);
+            throw new RuntimeException(ex);
+        }
+
+        //check the credential of our principal
+        if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) {
+            LOG.error("Failed to verifyuser principal.");
+            throw new RuntimeException("Fail to verify user principal with section \""
+                                       + jaas_section
+                                       + "\" in login configuration file "
+                                       + login_conf);
+        }
+
+        try {
+            LOG.info("Creating Kerberos Server.");
+            final CallbackHandler fch = ch;
+            Principal p = (Principal)subject.getPrincipals().toArray()[0];
+            KerberosName kName = new KerberosName(p.getName());
+            final String fHost = kName.getHostName();
+            final String fServiceName = kName.getServiceName();
+            LOG.debug("Server with host: {}", fHost);
+            saslServer =
+                Subject.doAs(subject, new PrivilegedExceptionAction<SaslServer>() {
+                        public SaslServer run() {
+                            try {
+                                Map<String, String> props = new TreeMap<String,String>();
+                                props.put(Sasl.QOP, "auth");
+                                props.put(Sasl.SERVER_AUTH, "false");
+                                return Sasl.createSaslServer(SaslUtils.KERBEROS,
+                                                             fServiceName,
+                                                             fHost, props, fch);
+                            }
+                            catch (Exception e) {
+                                LOG.error("Subject failed to create sasl server.", e);
+                                return null;
+                            }
+                        }
+                    });
+            LOG.info("Got Server: {}", saslServer);
+
+        } catch (PrivilegedActionException e) {
+            LOG.error("KerberosSaslNettyServer: Could not create SaslServer: ", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public boolean isComplete() {
+        return saslServer.isComplete();
+    }
+
+    public String getUserName() {
+        return saslServer.getAuthorizationID();
+    }
+
+    /** CallbackHandler for SASL DIGEST-MD5 mechanism */
+    public static class KerberosSaslCallbackHandler implements CallbackHandler {
+
+        /** Used to authenticate the clients */
+        private List<String> authorizedUsers;
+
+        public KerberosSaslCallbackHandler(List<String> authorizedUsers) {
+            LOG.debug("KerberosSaslCallback: Creating KerberosSaslCallback handler.");
+            this.authorizedUsers = authorizedUsers;
+        }
+
+        @Override
+        public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+            for (Callback callback : callbacks) {
+                LOG.info("Kerberos Callback Handler got callback: {}", callback.getClass());
+                if(callback instanceof AuthorizeCallback) {
+                    AuthorizeCallback ac = (AuthorizeCallback)callback;
+                    if(!ac.getAuthenticationID().equals(ac.getAuthorizationID())) {
+                        LOG.debug("{} != {}", ac.getAuthenticationID(), ac.getAuthorizationID());
+                        continue;
+                    }
+
+                    LOG.debug("Authorized Users: {}", authorizedUsers);
+                    LOG.debug("Checking authorization for: {}", ac.getAuthorizationID());
+                    for(String user : authorizedUsers) {
+                        String requester = ac.getAuthorizationID();
+
+                        KerberosPrincipal principal = new KerberosPrincipal(requester);
+                        requester = new KerberosPrincipalToLocal().toLocal(principal);
+
+                        if(requester.equals(user) ) {
+                            ac.setAuthorized(true);
+                            break;
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Used by SaslTokenMessage::processToken() to respond to server SASL
+     * tokens.
+     *
+     * @param token
+     *            Server's SASL token
+     * @return token to send back to the server.
+     */
+    public byte[] response(final byte[] token) {
+        try {
+            byte [] retval = Subject.doAs(subject, new PrivilegedExceptionAction<byte[]>() {
+                    public byte[] run(){
+                        try {
+                            LOG.debug("response: Responding to input token of length: {}",
+                                      token.length);
+                            byte[] retval = saslServer.evaluateResponse(token);
+                            return retval;
+                        } catch (SaslException e) {
+                            LOG.error("response: Failed to evaluate client token of length: {} : {}",
+                                      token.length, e);
+                            throw new RuntimeException(e);
+                        }
+                    }
+                });
+            return retval;
+        }
+        catch (PrivilegedActionException e) {
+            LOG.error("Failed to generate response for token: ", e);
+            throw new RuntimeException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServerState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServerState.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServerState.java
new file mode 100644
index 0000000..2ee2bf4
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServerState.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelLocal;
+
+final class KerberosSaslNettyServerState {
+
+    public static final ChannelLocal<KerberosSaslNettyServer> getKerberosSaslNettyServer = new ChannelLocal<KerberosSaslNettyServer>() {
+            protected KerberosSaslNettyServer initialValue(Channel channel) {
+                return null;
+            }
+        };
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslServerHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslServerHandler.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslServerHandler.java
new file mode 100644
index 0000000..14ac172
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslServerHandler.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KerberosSaslServerHandler extends SimpleChannelUpstreamHandler {
+
+    ISaslServer server;
+    /** Used for client or server's token to send or receive from each other. */
+    private Map storm_conf;
+    private String jaas_section;
+    private List<String> authorizedUsers;
+
+    private static final Logger LOG = LoggerFactory
+        .getLogger(KerberosSaslServerHandler.class);
+
+    public KerberosSaslServerHandler(ISaslServer server, Map storm_conf, String jaas_section, List<String> authorizedUsers) throws IOException {
+        this.server = server;
+        this.storm_conf = storm_conf;
+        this.jaas_section = jaas_section;
+        this.authorizedUsers = authorizedUsers;
+    }
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+        throws Exception {
+        Object msg = e.getMessage();
+        if (msg == null) {
+            return;
+        }
+
+        Channel channel = ctx.getChannel();
+
+
+        if (msg instanceof SaslMessageToken) {
+            // initialize server-side SASL functionality, if we haven't yet
+            // (in which case we are looking at the first SASL message from the
+            // client).
+            try {
+                LOG.debug("Got SaslMessageToken!");
+
+                KerberosSaslNettyServer saslNettyServer = KerberosSaslNettyServerState.getKerberosSaslNettyServer
+                    .get(channel);
+                if (saslNettyServer == null) {
+                    LOG.debug("No saslNettyServer for {}  yet; creating now, with topology token: ", channel);
+                    try {
+                        saslNettyServer = new KerberosSaslNettyServer(storm_conf, jaas_section, authorizedUsers);
+                        KerberosSaslNettyServerState.getKerberosSaslNettyServer.set(channel,
+                                                                                    saslNettyServer);
+                    } catch (RuntimeException ioe) {
+                        LOG.error("Error occurred while creating saslNettyServer on server {} for client {}",
+                                  channel.getLocalAddress(), channel.getRemoteAddress());
+                        throw ioe;
+                    }
+                } else {
+                    LOG.debug("Found existing saslNettyServer on server: {} for client {}",
+                              channel.getLocalAddress(), channel.getRemoteAddress());
+                }
+
+                byte[] responseBytes = saslNettyServer.response(((SaslMessageToken) msg)
+                                                                .getSaslToken());
+
+                SaslMessageToken saslTokenMessageRequest = new SaslMessageToken(responseBytes);
+
+                if(saslTokenMessageRequest.getSaslToken() == null) {
+                    channel.write(ControlMessage.SASL_COMPLETE_REQUEST);
+                } else {
+                    // Send response to client.
+                    channel.write(saslTokenMessageRequest);
+                }
+
+                if (saslNettyServer.isComplete()) {
+                    // If authentication of client is complete, we will also send a
+                    // SASL-Complete message to the client.
+                    LOG.info("SASL authentication is complete for client with username: {}",
+                             saslNettyServer.getUserName());
+                    channel.write(ControlMessage.SASL_COMPLETE_REQUEST);
+                    LOG.debug("Removing SaslServerHandler from pipeline since SASL authentication is complete.");
+                    ctx.getPipeline().remove(this);
+                    server.authenticated(channel);
+                }
+                return;
+            }
+            catch (Exception ex) {
+                LOG.error("Failed to handle SaslMessageToken: ", ex);
+                throw ex;
+            }
+        } else {
+            // Client should not be sending other-than-SASL messages before
+            // SaslServerHandler has removed itself from the pipeline. Such
+            // non-SASL requests will be denied by the Authorize channel handler
+            // (the next handler upstream in the server pipeline) if SASL
+            // authentication has not completed.
+            LOG.warn("Sending upstream an unexpected non-SASL message : {}",
+                     msg);
+            Channels.fireMessageReceived(ctx, msg);
+        }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+        if(server != null) {
+            server.closeChannel(e.getChannel());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java
new file mode 100644
index 0000000..718c8f3
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java
@@ -0,0 +1,411 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.messaging.netty;
+
+/**
+ * This class is responsible for refreshing Kerberos credentials for
+ * logins for both Zookeeper client and server.
+ * See ZooKeeperSaslServer for server-side usage.
+ * See ZooKeeperSaslClient for client-side usage.
+ * This class is a copied from https://github.com/apache/zookeeper/blob/branch-3.4/src/java/main/org/apache/zookeeper/Login.java
+ * with the difference that refresh thread does not die.
+ */
+
+import javax.security.auth.kerberos.KerberosPrincipal;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+import javax.security.auth.callback.CallbackHandler;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.Shell;
+import org.apache.zookeeper.client.ZooKeeperSaslClient;
+import javax.security.auth.kerberos.KerberosTicket;
+import javax.security.auth.Subject;
+import java.util.Date;
+import java.util.Random;
+import java.util.Set;
+
+public class Login {
+    Logger LOG = Logger.getLogger(Login.class);
+    public CallbackHandler callbackHandler;
+
+    // Login will sleep until 80% of time from last refresh to
+    // ticket's expiry has been reached, at which time it will wake
+    // and try to renew the ticket.
+    private static final float TICKET_RENEW_WINDOW = 0.80f;
+
+    /**
+     * Percentage of random jitter added to the renewal time
+     */
+    private static final float TICKET_RENEW_JITTER = 0.05f;
+
+    // Regardless of TICKET_RENEW_WINDOW setting above and the ticket expiry time,
+    // thread will not sleep between refresh attempts any less than 1 minute (60*1000 milliseconds = 1 minute).
+    // Change the '1' to e.g. 5, to change this to 5 minutes.
+    private static final long MIN_TIME_BEFORE_RELOGIN = 1 * 60 * 1000L;
+
+    private Subject subject = null;
+    private Thread t = null;
+    private boolean isKrbTicket = false;
+    private boolean isUsingTicketCache = false;
+    private boolean isUsingKeytab = false;
+
+    /** Random number generator */
+    private static Random rng = new Random();
+
+    private LoginContext login = null;
+    private String loginContextName = null;
+    private String keytabFile = null;
+    private String principal = null;
+
+    private long lastLogin = 0;
+
+    /**
+     * Login constructor. The constructor starts the thread used
+     * to periodically re-login to the Kerberos Ticket Granting Server.
+     * @param loginContextName
+     *               name of section in JAAS file that will be use to login.
+     *               Passed as first param to javax.security.auth.login.LoginContext().
+     *
+     * @param callbackHandler
+     *               Passed as second param to javax.security.auth.login.LoginContext().
+     * @throws javax.security.auth.login.LoginException
+     *               Thrown if authentication fails.
+     */
+    public Login(final String loginContextName, CallbackHandler callbackHandler)
+        throws LoginException {
+        this.callbackHandler = callbackHandler;
+        login = login(loginContextName);
+        this.loginContextName = loginContextName;
+        subject = login.getSubject();
+        isKrbTicket = !subject.getPrivateCredentials(KerberosTicket.class).isEmpty();
+        AppConfigurationEntry entries[] = Configuration.getConfiguration().getAppConfigurationEntry(loginContextName);
+        for (AppConfigurationEntry entry: entries) {
+            // there will only be a single entry, so this for() loop will only be iterated through once.
+            if (entry.getOptions().get("useTicketCache") != null) {
+                String val = (String)entry.getOptions().get("useTicketCache");
+                if (val.equals("true")) {
+                    isUsingTicketCache = true;
+                }
+            }
+            if (entry.getOptions().get("keyTab") != null) {
+                keytabFile = (String)entry.getOptions().get("keyTab");
+                isUsingKeytab = true;
+            }
+            if (entry.getOptions().get("principal") != null) {
+                principal = (String)entry.getOptions().get("principal");
+            }
+            break;
+        }
+
+        if (!isKrbTicket) {
+            // if no TGT, do not bother with ticket management.
+            return;
+        }
+
+        // Refresh the Ticket Granting Ticket (TGT) periodically. How often to refresh is determined by the
+        // TGT's existing expiry date and the configured MIN_TIME_BEFORE_RELOGIN. For testing and development,
+        // you can decrease the interval of expiration of tickets (for example, to 3 minutes) by running :
+        //  "modprinc -maxlife 3mins <principal>" in kadmin.
+        t = new Thread(new Runnable() {
+            public void run() {
+                LOG.info("TGT refresh thread started.");
+                while (true) {  // renewal thread's main loop. if it exits from here, thread will exit.
+                    KerberosTicket tgt = getTGT();
+                    long now = System.currentTimeMillis();
+                    long nextRefresh;
+                    Date nextRefreshDate;
+                    if (tgt == null) {
+                        nextRefresh = now + MIN_TIME_BEFORE_RELOGIN;
+                        nextRefreshDate = new Date(nextRefresh);
+                        LOG.warn("No TGT found: will try again at " + nextRefreshDate);
+                    } else {
+                        nextRefresh = getRefreshTime(tgt);
+                        long expiry = tgt.getEndTime().getTime();
+                        Date expiryDate = new Date(expiry);
+                        if ((isUsingTicketCache) && (tgt.getEndTime().equals(tgt.getRenewTill()))) {
+                            LOG.error("The TGT cannot be renewed beyond the next expiry date: " + expiryDate + "." +
+                                "This process will not be able to authenticate new SASL connections after that " +
+                                "time (for example, it will not be authenticate a new connection with a Zookeeper " +
+                                "Quorum member).  Ask your system administrator to either increase the " +
+                                "'renew until' time by doing : 'modprinc -maxrenewlife " + principal + "' within " +
+                                "kadmin, or instead, to generate a keytab for " + principal + ". Because the TGT's " +
+                                "expiry cannot be further extended by refreshing, exiting refresh thread now.");
+                            return;
+                        }
+                        // determine how long to sleep from looking at ticket's expiry.
+                        // We should not allow the ticket to expire, but we should take into consideration
+                        // MIN_TIME_BEFORE_RELOGIN. Will not sleep less than MIN_TIME_BEFORE_RELOGIN, unless doing so
+                        // would cause ticket expiration.
+                        if ((nextRefresh > expiry) ||
+                            ((now + MIN_TIME_BEFORE_RELOGIN) > expiry)) {
+                            // expiry is before next scheduled refresh).
+                            nextRefresh = now;
+                        } else {
+                            if (nextRefresh < (now + MIN_TIME_BEFORE_RELOGIN)) {
+                                // next scheduled refresh is sooner than (now + MIN_TIME_BEFORE_LOGIN).
+                                Date until = new Date(nextRefresh);
+                                Date newuntil = new Date(now + MIN_TIME_BEFORE_RELOGIN);
+                                LOG.warn("TGT refresh thread time adjusted from : " + until + " to : " + newuntil + " since "
+                                    + "the former is sooner than the minimum refresh interval ("
+                                    + MIN_TIME_BEFORE_RELOGIN / 1000 + " seconds) from now.");
+                            }
+                            nextRefresh = Math.max(nextRefresh, now + MIN_TIME_BEFORE_RELOGIN);
+                        }
+                    }
+                    if (tgt != null && now > tgt.getEndTime().getTime()) {
+                        if ((now - tgt.getEndTime().getTime()) < ( 10 * MIN_TIME_BEFORE_RELOGIN)) {
+                            Date until = new Date(now + MIN_TIME_BEFORE_RELOGIN);
+                            LOG.info("TGT already expired but giving additional 10 minutes past TGT expiry, refresh sleeping until: " + until.toString());
+                            try {
+                                Thread.sleep(MIN_TIME_BEFORE_RELOGIN);
+                            } catch (InterruptedException ie) {
+                                LOG.warn("TGT renewal thread has been interrupted and will exit.");
+                                return;
+                            }
+                        } else {
+                            LOG.error("nextRefresh:" + new Date(nextRefresh) + " is in the past: exiting refresh thread. Check"
+                                + " clock sync between this host and KDC - (KDC's clock is likely ahead of this host)."
+                                + " Manual intervention will be required for this client to successfully authenticate."
+                                + " Exiting worker!.");
+                            Runtime.getRuntime().exit(-3);
+                        }
+                    } else if (now < nextRefresh) {
+                        Date until = new Date(nextRefresh);
+                        LOG.info("TGT refresh sleeping until: " + until.toString());
+                        try {
+                            Thread.sleep(nextRefresh - now);
+                        } catch (InterruptedException ie) {
+                            LOG.warn("TGT renewal thread has been interrupted and will exit.");
+                            return;
+                        }
+                    }
+
+                    if (isUsingTicketCache) {
+                        String cmd = "/usr/bin/kinit";
+                        if (System.getProperty("zookeeper.kinit") != null) {
+                            cmd = System.getProperty("zookeeper.kinit");
+                        }
+                        String kinitArgs = "-R";
+                        int retry = 1;
+                        while (retry >= 0) {
+                            try {
+                                LOG.debug("running ticket cache refresh command: " + cmd + " " + kinitArgs);
+                                Shell.execCommand(cmd, kinitArgs);
+                                break;
+                            } catch (Exception e) {
+                                if (retry > 0) {
+                                    --retry;
+                                    // sleep for 10 seconds
+                                    try {
+                                        Thread.sleep(10 * 1000);
+                                    } catch (InterruptedException ie) {
+                                        LOG.error("Interrupted while renewing TGT, exiting Login thread");
+                                        return;
+                                    }
+                                } else {
+                                    LOG.warn("Could not renew TGT due to problem running shell command: '" + cmd
+                                        + " " + kinitArgs + "'" + "; exception was:" + e + ". Exiting refresh thread.",e);
+                                    return;
+                                }
+                            }
+                        }
+                    }
+                    try {
+                        int retry = 1;
+                        while (retry >= 0) {
+                            try {
+                                reLogin();
+                                break;
+                            } catch (LoginException le) {
+                                if (retry > 0) {
+                                    --retry;
+                                    // sleep for 10 seconds.
+                                    try {
+                                        Thread.sleep(10 * 1000);
+                                    } catch (InterruptedException e) {
+                                        LOG.error("Interrupted during login retry after LoginException:", le);
+                                        throw le;
+                                    }
+                                } else {
+                                    LOG.error("Could not refresh TGT for principal: " + principal + ".", le);
+                                }
+                            }
+                        }
+                    } catch (LoginException le) {
+                        LOG.error("Failed to refresh TGT: refresh thread exiting now.",le);
+                        break;
+                    }
+                }
+            }
+        });
+        t.setName("Refresh-TGT");
+        t.setDaemon(true);
+    }
+
+    public void startThreadIfNeeded() {
+        // thread object 't' will be null if a refresh thread is not needed.
+        if (t != null) {
+            t.start();
+        }
+    }
+
+    public void shutdown() {
+        if ((t != null) && (t.isAlive())) {
+            t.interrupt();
+            try {
+                t.join();
+            } catch (InterruptedException e) {
+                LOG.warn("error while waiting for Login thread to shutdown: " + e);
+            }
+        }
+    }
+
+    public Subject getSubject() {
+        return subject;
+    }
+
+    public String getLoginContextName() {
+        return loginContextName;
+    }
+
+    private synchronized LoginContext login(final String loginContextName) throws LoginException {
+        if (loginContextName == null) {
+            throw new LoginException("loginContext name (JAAS file section header) was null. " +
+                "Please check your java.security.login.auth.config (=" +
+                System.getProperty("java.security.login.auth.config") +
+                ") and your " + ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY + "(=" +
+                System.getProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, "Client") + ")");
+        }
+        LoginContext loginContext = new LoginContext(loginContextName,callbackHandler);
+        loginContext.login();
+        LOG.info("successfully logged in.");
+        return loginContext;
+    }
+
+    // c.f. org.apache.hadoop.security.UserGroupInformation.
+    private long getRefreshTime(KerberosTicket tgt) {
+        long start = tgt.getStartTime().getTime();
+        long expires = tgt.getEndTime().getTime();
+        LOG.info("TGT valid starting at:        " + tgt.getStartTime().toString());
+        LOG.info("TGT expires:                  " + tgt.getEndTime().toString());
+        long proposedRefresh = start + (long) ((expires - start) *
+            (TICKET_RENEW_WINDOW + (TICKET_RENEW_JITTER * rng.nextDouble())));
+        if (proposedRefresh > expires) {
+            // proposedRefresh is too far in the future: it's after ticket expires: simply return now.
+            return System.currentTimeMillis();
+        }
+        else {
+            return proposedRefresh;
+        }
+    }
+
+    private synchronized KerberosTicket getTGT() {
+        Set<KerberosTicket> tickets = subject.getPrivateCredentials(KerberosTicket.class);
+        for(KerberosTicket ticket: tickets) {
+            KerberosPrincipal server = ticket.getServer();
+            if (server.getName().equals("krbtgt/" + server.getRealm() + "@" + server.getRealm())) {
+                LOG.debug("Found tgt " + ticket + ".");
+                return ticket;
+            }
+        }
+        return null;
+    }
+
+    private void sleepUntilSufficientTimeElapsed() {
+        long now = System.currentTimeMillis();
+        if (now - getLastLogin() < MIN_TIME_BEFORE_RELOGIN ) {
+            LOG.warn("Not attempting to re-login since the last re-login was " +
+                "attempted less than " + (MIN_TIME_BEFORE_RELOGIN/1000) + " seconds"+
+                " before.");
+            try {
+                Thread.sleep(MIN_TIME_BEFORE_RELOGIN - (now - getLastLogin()));
+            } catch (InterruptedException e) {
+                LOG.warn("TGT renewal thread has been interrupted and will exit.");
+                Runtime.getRuntime().exit(-2);
+            }
+        }
+        // register most recent relogin attempt
+        setLastLogin(System.currentTimeMillis());
+    }
+
+    /**
+     * Returns login object
+     * @return login
+     */
+    private LoginContext getLogin() {
+        return login;
+    }
+
+    /**
+     * Set the login object
+     * @param login
+     */
+    private void setLogin(LoginContext login) {
+        this.login = login;
+    }
+
+    /**
+     * Set the last login time.
+     * @param time the number of milliseconds since the beginning of time
+     */
+    private void setLastLogin(long time) {
+        lastLogin = time;
+    }
+
+    /**
+     * Get the time of the last login.
+     * @return the number of milliseconds since the beginning of time.
+     */
+    private long getLastLogin() {
+        return lastLogin;
+    }
+
+    /**
+     * Re-login a principal. This method assumes that {@link #login(String)} has happened already.
+     * @throws javax.security.auth.login.LoginException on a failure
+     */
+    // c.f. HADOOP-6559
+    private synchronized void reLogin()
+        throws LoginException {
+        if (!isKrbTicket) {
+            return;
+        }
+        LoginContext login = getLogin();
+        if (login  == null) {
+            throw new LoginException("login must be done first");
+        }
+        sleepUntilSufficientTimeElapsed();
+        LOG.info("Initiating logout for " + principal);
+        synchronized (Login.class) {
+            //clear up the kerberos state. But the tokens are not cleared! As per
+            //the Java kerberos login module code, only the kerberos credentials
+            //are cleared
+            login.logout();
+            //login and also update the subject field of this instance to
+            //have the new credentials (pass it to the LoginContext constructor)
+            login = new LoginContext(loginContextName, getSubject());
+            LOG.info("Initiating re-login for " + principal);
+            login.login();
+            setLogin(login);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageBatch.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageBatch.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageBatch.java
new file mode 100644
index 0000000..1c5dd9d
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageBatch.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import org.apache.storm.messaging.TaskMessage;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferOutputStream;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import java.util.ArrayList;
+
+class MessageBatch {
+    private int buffer_size;
+    private ArrayList<TaskMessage> msgs;
+    private int encoded_length;
+
+    MessageBatch(int buffer_size) {
+        this.buffer_size = buffer_size;
+        msgs = new ArrayList<>();
+        encoded_length = ControlMessage.EOB_MESSAGE.encodeLength();
+    }
+
+    void add(TaskMessage msg) {
+        if (msg == null)
+            throw new RuntimeException("null object forbidden in message batch");
+
+        msgs.add(msg);
+        encoded_length += msgEncodeLength(msg);
+    }
+
+
+    private int msgEncodeLength(TaskMessage taskMsg) {
+        if (taskMsg == null) return 0;
+
+        int size = 6; //INT + SHORT
+        if (taskMsg.message() != null) 
+            size += taskMsg.message().length;
+        return size;
+    }
+
+    /**
+     * @return true if this batch used up allowed buffer size
+     */
+    boolean isFull() {
+        return encoded_length >= buffer_size;
+    }
+
+    /**
+     * @return true if this batch doesn't have any messages
+     */
+    boolean isEmpty() {
+        return msgs.isEmpty();
+    }
+
+    /**
+     * @return number of msgs in this batch
+     */
+    int size() {
+        return msgs.size();
+    }
+
+    /**
+     * create a buffer containing the encoding of this batch
+     */
+    ChannelBuffer buffer() throws Exception {
+        ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encoded_length));
+        
+        for (TaskMessage msg : msgs) {
+            writeTaskMessage(bout, msg);
+        }
+
+        //add a END_OF_BATCH indicator
+        ControlMessage.EOB_MESSAGE.write(bout);
+
+        bout.close();
+
+        return bout.buffer();
+    }
+
+    /**
+     * write a TaskMessage into a stream
+     *
+     * Each TaskMessage is encoded as:
+     *  task ... short(2)
+     *  len ... int(4)
+     *  payload ... byte[]     *  
+     */
+    private void writeTaskMessage(ChannelBufferOutputStream bout, TaskMessage message) throws Exception {
+        int payload_len = 0;
+        if (message.message() != null)
+            payload_len =  message.message().length;
+
+        int task_id = message.task();
+        if (task_id > Short.MAX_VALUE)
+            throw new RuntimeException("Task ID should not exceed "+Short.MAX_VALUE);
+        
+        bout.writeShort((short)task_id);
+        bout.writeInt(payload_len);
+        if (payload_len >0)
+            bout.write(message.message());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageBuffer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageBuffer.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageBuffer.java
new file mode 100644
index 0000000..4262f41
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageBuffer.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import org.apache.storm.messaging.TaskMessage;
+
+/**
+ * Encapsulates the state used for batching up messages.
+ */
+public class MessageBuffer {
+    private final int mesageBatchSize;
+    private MessageBatch currentBatch;
+
+    public MessageBuffer(int mesageBatchSize){
+        this.mesageBatchSize = mesageBatchSize;
+        this.currentBatch = new MessageBatch(mesageBatchSize);
+    }
+
+    public synchronized MessageBatch add(TaskMessage msg){
+        currentBatch.add(msg);
+        if(currentBatch.isFull()){
+            MessageBatch ret = currentBatch;
+            currentBatch = new MessageBatch(mesageBatchSize);
+            return ret;
+        } else {
+            return null;
+        }
+    }
+
+    public synchronized boolean isEmpty() {
+        return currentBatch.isEmpty();
+    }
+
+    public synchronized MessageBatch drain() {
+        if(!currentBatch.isEmpty()) {
+            MessageBatch ret = currentBatch;
+            currentBatch = new MessageBatch(mesageBatchSize);
+            return ret;
+        } else {
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java
new file mode 100644
index 0000000..9030424
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.storm.messaging.TaskMessage;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.frame.FrameDecoder;
+
+public class MessageDecoder extends FrameDecoder {    
+    /*
+     * Each ControlMessage is encoded as:
+     *  code (<0) ... short(2)
+     * Each TaskMessage is encoded as:
+     *  task (>=0) ... short(2)
+     *  len ... int(4)
+     *  payload ... byte[]     *  
+     */
+    protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception {
+        // Make sure that we have received at least a short 
+        long available = buf.readableBytes();
+        if (available < 2) {
+            //need more data
+            return null;
+        }
+
+        List<Object> ret = new ArrayList<>();
+
+        // Use while loop, try to decode as more messages as possible in single call
+        while (available >= 2) {
+
+            // Mark the current buffer position before reading task/len field
+            // because the whole frame might not be in the buffer yet.
+            // We will reset the buffer position to the marked position if
+            // there's not enough bytes in the buffer.
+            buf.markReaderIndex();
+
+            // read the short field
+            short code = buf.readShort();
+            available -= 2;
+
+            // case 1: Control message
+            ControlMessage ctrl_msg = ControlMessage.mkMessage(code);
+            if (ctrl_msg != null) {
+
+                if (ctrl_msg == ControlMessage.EOB_MESSAGE) {
+                    continue;
+                } else {
+                    return ctrl_msg;
+                }
+            }
+            
+            //case 2: SaslTokenMessageRequest
+            if(code == SaslMessageToken.IDENTIFIER) {
+            	// Make sure that we have received at least an integer (length) 
+                if (buf.readableBytes() < 4) {
+                    //need more data
+                    buf.resetReaderIndex();
+                    return null;
+                }
+                
+                // Read the length field.
+                int length = buf.readInt();
+                if (length<=0) {
+                    return new SaslMessageToken(null);
+                }
+                
+                // Make sure if there's enough bytes in the buffer.
+                if (buf.readableBytes() < length) {
+                    // The whole bytes were not received yet - return null.
+                    buf.resetReaderIndex();
+                    return null;
+                }
+                
+                // There's enough bytes in the buffer. Read it.  
+                ChannelBuffer payload = buf.readBytes(length);
+                
+                // Successfully decoded a frame.
+                // Return a SaslTokenMessageRequest object
+                return new SaslMessageToken(payload.array());
+            }
+
+            // case 3: task Message
+
+            // Make sure that we have received at least an integer (length)
+            if (available < 4) {
+                // need more data
+                buf.resetReaderIndex();
+                break;
+            }
+
+            // Read the length field.
+            int length = buf.readInt();
+
+            available -= 4;
+
+            if (length <= 0) {
+                ret.add(new TaskMessage(code, null));
+                break;
+            }
+
+            // Make sure if there's enough bytes in the buffer.
+            if (available < length) {
+                // The whole bytes were not received yet - return null.
+                buf.resetReaderIndex();
+                break;
+            }
+            available -= length;
+
+            // There's enough bytes in the buffer. Read it.
+            ChannelBuffer payload = buf.readBytes(length);
+
+
+            // Successfully decoded a frame.
+            // Return a TaskMessage object
+            ret.add(new TaskMessage(code, payload.array()));
+        }
+
+        if (ret.size() == 0) {
+            return null;
+        } else {
+            return ret;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageEncoder.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageEncoder.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageEncoder.java
new file mode 100644
index 0000000..0e9fc98
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageEncoder.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
+
+public class MessageEncoder extends OneToOneEncoder {    
+    @Override
+    protected Object encode(ChannelHandlerContext ctx, Channel channel, Object obj) throws Exception {
+        if (obj instanceof ControlMessage) {
+            return ((ControlMessage)obj).buffer();
+        }
+
+        if (obj instanceof MessageBatch) {
+            return ((MessageBatch)obj).buffer();
+        } 
+        
+        if (obj instanceof SaslMessageToken) {
+        	return ((SaslMessageToken)obj).buffer();
+        }
+        
+        throw new RuntimeException("Unsupported encoding of object of class "+obj.getClass().getName());
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/NettyRenameThreadFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/NettyRenameThreadFactory.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/NettyRenameThreadFactory.java
new file mode 100644
index 0000000..e60c711
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/NettyRenameThreadFactory.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jboss.netty.util.ThreadNameDeterminer;
+import org.jboss.netty.util.ThreadRenamingRunnable;
+
+public class NettyRenameThreadFactory  implements ThreadFactory {
+    
+    static {
+      //Rename Netty threads
+      ThreadRenamingRunnable.setThreadNameDeterminer(ThreadNameDeterminer.CURRENT);
+    }
+  
+    final ThreadGroup group;
+    final AtomicInteger index = new AtomicInteger(1);
+    final String name;
+    static final NettyUncaughtExceptionHandler uncaughtExceptionHandler = new NettyUncaughtExceptionHandler();
+
+    public NettyRenameThreadFactory(String name) {
+        SecurityManager s = System.getSecurityManager();
+        group = (s != null)? s.getThreadGroup() :
+                             Thread.currentThread().getThreadGroup();
+        this.name = name;
+    }
+
+    public Thread newThread(Runnable r) {
+        Thread t = new Thread(group, r, name + "-" + index.getAndIncrement(), 0);
+        if (t.isDaemon()) {
+            t.setDaemon(false);
+        }
+        if (t.getPriority() != Thread.NORM_PRIORITY) {
+            t.setPriority(Thread.NORM_PRIORITY);
+        }
+        t.setUncaughtExceptionHandler(uncaughtExceptionHandler);
+        return t;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/NettyUncaughtExceptionHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/NettyUncaughtExceptionHandler.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/NettyUncaughtExceptionHandler.java
new file mode 100644
index 0000000..fd48bdc
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/NettyUncaughtExceptionHandler.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NettyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
+    private static final Logger LOG = LoggerFactory.getLogger(NettyUncaughtExceptionHandler.class);
+    @Override
+    public void uncaughtException(Thread t, Throwable e) {
+        try {
+            Utils.handleUncaughtException(e);
+        } catch (Error error) {
+            LOG.info("Received error in netty thread.. terminating server...");
+            Runtime.getRuntime().exit(1);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslMessageToken.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslMessageToken.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslMessageToken.java
new file mode 100644
index 0000000..ccac70f
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslMessageToken.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import java.io.IOException;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferOutputStream;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Send and receive SASL tokens.
+ */
+public class SaslMessageToken implements INettySerializable {
+    public static final short IDENTIFIER = -500;
+
+    /** Class logger */
+    private static final Logger LOG = LoggerFactory
+            .getLogger(SaslMessageToken.class);
+
+    /** Used for client or server's token to send or receive from each other. */
+    private byte[] token;
+
+    /**
+     * Constructor used for reflection only.
+     */
+    public SaslMessageToken() {
+    }
+
+    /**
+     * Constructor used to send request.
+     * 
+     * @param token
+     *            the SASL token, generated by a SaslClient or SaslServer.
+     */
+    public SaslMessageToken(byte[] token) {
+        this.token = token;
+    }
+
+    /**
+     * Read accessor for SASL token
+     * 
+     * @return saslToken SASL token
+     */
+    public byte[] getSaslToken() {
+        return token;
+    }
+
+    /**
+     * Write accessor for SASL token
+     * 
+     * @param token
+     *            SASL token
+     */
+    public void setSaslToken(byte[] token) {
+        this.token = token;
+    }
+
+    public int encodeLength() {
+        return 2 + 4 + token.length;
+    }
+
+    /**
+     * encode the current SaslToken Message into a channel buffer
+     * SaslTokenMessageRequest is encoded as: identifier .... short(2)
+     * payload length .... int payload .... byte[]
+     * 
+     * @throws IOException
+     */
+    public ChannelBuffer buffer() throws IOException {
+        ChannelBufferOutputStream bout = new ChannelBufferOutputStream(
+                ChannelBuffers.directBuffer(encodeLength()));
+        int payload_len = 0;
+        if (token != null)
+            payload_len = token.length;
+
+        bout.writeShort(IDENTIFIER);
+        bout.writeInt(payload_len);
+
+        if (payload_len > 0) {
+            bout.write(token);
+        }
+        bout.close();
+        return bout.buffer();
+    }
+    
+    public static SaslMessageToken read(byte[] serial) {
+        ChannelBuffer sm_buffer = ChannelBuffers.copiedBuffer(serial);
+        short identifier = sm_buffer.readShort();
+        int payload_len = sm_buffer.readInt();
+        if(identifier != IDENTIFIER) {
+            return null;
+        }
+        byte token[] = new byte[payload_len];
+        sm_buffer.readBytes(token, 0, payload_len);
+        return new SaslMessageToken(token);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyClient.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyClient.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyClient.java
new file mode 100644
index 0000000..b24e7d6
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyClient.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import java.io.IOException;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements SASL logic for storm worker client processes.
+ */
+public class SaslNettyClient {
+
+    private static final Logger LOG = LoggerFactory
+            .getLogger(SaslNettyClient.class);
+
+    /**
+     * Used to respond to server's counterpart, SaslServer with SASL tokens
+     * represented as byte arrays.
+     */
+    private SaslClient saslClient;
+
+    /**
+     * Create a SaslNettyClient for authentication with servers.
+     */
+    public SaslNettyClient(String topologyName, byte[] token) {
+        try {
+            LOG.debug("SaslNettyClient: Creating SASL {} client to authenticate to server ",
+                      SaslUtils.AUTH_DIGEST_MD5);
+
+            saslClient = Sasl.createSaslClient(
+                    new String[] { SaslUtils.AUTH_DIGEST_MD5 }, null, null,
+                    SaslUtils.DEFAULT_REALM, SaslUtils.getSaslProps(),
+                    new SaslClientCallbackHandler(topologyName, token));
+
+        } catch (IOException e) {
+            LOG.error("SaslNettyClient: Could not obtain topology token for Netty "
+                    + "Client to use to authenticate with a Netty Server.");
+            saslClient = null;
+        }
+    }
+
+    public boolean isComplete() {
+        return saslClient.isComplete();
+    }
+
+    /**
+     * Respond to server's SASL token.
+     * 
+     * @param saslTokenMessage
+     *            contains server's SASL token
+     * @return client's response SASL token
+     */
+    public byte[] saslResponse(SaslMessageToken saslTokenMessage) {
+        try {
+            return saslClient.evaluateChallenge(saslTokenMessage.getSaslToken());
+        } catch (SaslException e) {
+            LOG.error(
+                    "saslResponse: Failed to respond to SASL server's token:",
+                    e);
+            return null;
+        }
+    }
+
+    /**
+     * Implementation of javax.security.auth.callback.CallbackHandler that works
+     * with Storm topology tokens.
+     */
+    private static class SaslClientCallbackHandler implements CallbackHandler {
+        /** Generated username contained in TopologyToken */
+        private final String userName;
+        /** Generated password contained in TopologyToken */
+        private final char[] userPassword;
+
+        /**
+         * Set private members using topology token.
+         */
+        public SaslClientCallbackHandler(String topologyToken, byte[] token) {
+            this.userName = SaslUtils
+                    .encodeIdentifier(topologyToken.getBytes());
+            this.userPassword = SaslUtils.encodePassword(token);
+        }
+
+        /**
+         * Implementation used to respond to SASL tokens from server.
+         * 
+         * @param callbacks
+         *            objects that indicate what credential information the
+         *            server's SaslServer requires from the client.
+         * @throws UnsupportedCallbackException
+         */
+        public void handle(Callback[] callbacks)
+                throws UnsupportedCallbackException {
+            NameCallback nc = null;
+            PasswordCallback pc = null;
+            RealmCallback rc = null;
+            for (Callback callback : callbacks) {
+                if (callback instanceof RealmChoiceCallback) {
+                    continue;
+                } else if (callback instanceof NameCallback) {
+                    nc = (NameCallback) callback;
+                } else if (callback instanceof PasswordCallback) {
+                    pc = (PasswordCallback) callback;
+                } else if (callback instanceof RealmCallback) {
+                    rc = (RealmCallback) callback;
+                } else {
+                    throw new UnsupportedCallbackException(callback,
+                            "handle: Unrecognized SASL client callback");
+                }
+            }
+            if (nc != null) {
+                LOG.debug("handle: SASL client callback: setting username: {}",
+                          userName);
+                nc.setName(userName);
+            }
+            if (pc != null) {
+                LOG.debug("handle: SASL client callback: setting userPassword");
+                pc.setPassword(userPassword);
+            }
+            if (rc != null) {
+                LOG.debug("handle: SASL client callback: setting realm: {}",
+                        rc.getDefaultText());
+                rc.setText(rc.getDefaultText());
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyClientState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyClientState.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyClientState.java
new file mode 100644
index 0000000..3a7d6a2
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyClientState.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelLocal;
+
+final class SaslNettyClientState {
+
+	public static final ChannelLocal<SaslNettyClient> getSaslNettyClient = new ChannelLocal<SaslNettyClient>() {
+		protected SaslNettyClient initialValue(Channel channel) {
+			return null;
+		}
+	};
+
+}