You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/30 18:16:37 UTC

[1/9] git commit: STORM-348: Netty SASL Authentication

Repository: incubator-storm
Updated Branches:
  refs/heads/security 559c883d5 -> cf5fc0c3f


STORM-348: Netty SASL Authentication


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/41986445
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/41986445
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/41986445

Branch: refs/heads/security
Commit: 41986445fb89ff77c101fcdd6daccb945160e8a9
Parents: 642ed74
Author: Raghavendra Nandagopal <sp...@gmail.com>
Authored: Tue Jul 22 13:20:54 2014 -0700
Committer: Raghavendra Nandagopal <sp...@gmail.com>
Committed: Tue Jul 22 13:20:54 2014 -0700

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/Config.java   |   5 +
 .../backtype/storm/messaging/netty/Client.java  |  29 ++--
 .../storm/messaging/netty/ControlMessage.java   |   4 +-
 .../storm/messaging/netty/MessageDecoder.java   |  32 +++-
 .../storm/messaging/netty/MessageEncoder.java   |   4 +
 .../storm/messaging/netty/SaslMessageToken.java | 100 +++++++++++
 .../storm/messaging/netty/SaslNettyClient.java  | 167 ++++++++++++++++++
 .../messaging/netty/SaslNettyClientState.java   |  31 ++++
 .../storm/messaging/netty/SaslNettyServer.java  | 168 +++++++++++++++++++
 .../messaging/netty/SaslNettyServerState.java   |  31 ++++
 .../messaging/netty/SaslStormClientHandler.java | 159 ++++++++++++++++++
 .../netty/SaslStormServerAuthorizeHandler.java  |  83 +++++++++
 .../messaging/netty/SaslStormServerHandler.java | 159 ++++++++++++++++++
 .../storm/messaging/netty/SaslUtils.java        |  61 +++++++
 .../backtype/storm/messaging/netty/Server.java  |  27 +--
 .../netty/StormClientPipelineFactory.java       |  26 ++-
 .../netty/StormServerPipelineFactory.java       |  31 +++-
 17 files changed, 1077 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/41986445/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 3b3f7e5..46b120c 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -105,6 +105,11 @@ public class Config extends HashMap<String, Object> {
     public static final String STORM_NETTY_FLUSH_CHECK_INTERVAL_MS = "storm.messaging.netty.flush.check.interval.ms";
     public static final Object STORM_NETTY_FLUSH_CHECK_INTERVAL_MS_SCHEMA = Number.class;
     
+    /**
+     * Netty based messaging: Is authentication required for Netty messaging from client worker process to server worker process.
+     */
+    public static final String STORM_MESSAGING_NETTY_AUTHENTICATION = "storm.messaging.netty.authentication"; 
+    public static final Object STORM_MESSAGING_NETTY_AUTHENTICATION_SCHEMA = Boolean.class;
     
     /**
      * A list of hosts of ZooKeeper servers used to manage the cluster.

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/41986445/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 8d2d221..64a1757 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -17,17 +17,6 @@
  */
 package backtype.storm.messaging.netty;
 
-import backtype.storm.Config;
-import backtype.storm.messaging.IConnection;
-import backtype.storm.messaging.TaskMessage;
-import backtype.storm.utils.Utils;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -39,6 +28,19 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.Config;
+import backtype.storm.messaging.IConnection;
+import backtype.storm.messaging.TaskMessage;
+import backtype.storm.utils.Utils;
+
 public class Client implements IConnection {
     private static final Logger LOG = LoggerFactory.getLogger(Client.class);
     private static final String PREFIX = "Netty-Client-";
@@ -57,8 +59,10 @@ public class Client implements IConnection {
     private int messageBatchSize;
     
     private AtomicLong pendings;
+    
+    Map storm_conf;
 
-    MessageBatch messageBatch = null;
+	MessageBatch messageBatch = null;
     private AtomicLong flushCheckTimer;
     private int flushCheckInterval;
     private ScheduledExecutorService scheduler;
@@ -66,6 +70,7 @@ public class Client implements IConnection {
     @SuppressWarnings("rawtypes")
     Client(Map storm_conf, ChannelFactory factory, 
             ScheduledExecutorService scheduler, String host, int port) {
+    	this.storm_conf = storm_conf;
         this.factory = factory;
         this.scheduler = scheduler;
         channelRef = new AtomicReference<Channel>(null);

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/41986445/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java b/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
index b7335b3..fb3efe6 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
@@ -27,7 +27,9 @@ enum ControlMessage {
     CLOSE_MESSAGE((short)-100),
     EOB_MESSAGE((short)-201),
     OK_RESPONSE((short)-200),
-    FAILURE_RESPONSE((short)-400);
+    FAILURE_RESPONSE((short)-400),
+    SASL_TOKEN_MESSAGE_REQUEST((short)-202),
+    SASL_COMPLETE_REQUEST((short)-203);
 
     private short code;
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/41986445/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
index 72c3cf7..7d8bf54 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
@@ -68,8 +68,38 @@ public class MessageDecoder extends FrameDecoder {
                     return ctrl_msg;
                 }
             }
+            
+            //case 2: SaslTokenMessageRequest
+            if(code==-500) {
+            	// Make sure that we have received at least an integer (length) 
+                if (buf.readableBytes() < 4) {
+                    //need more data
+                    buf.resetReaderIndex();
+                    return null;
+                }
+                
+                // Read the length field.
+                int length = buf.readInt();
+                if (length<=0) {
+                    return new SaslMessageToken(null);
+                }
+                
+                // Make sure if there's enough bytes in the buffer.
+                if (buf.readableBytes() < length) {
+                    // The whole bytes were not received yet - return null.
+                    buf.resetReaderIndex();
+                    return null;
+                }
+                
+                // There's enough bytes in the buffer. Read it.  
+                ChannelBuffer payload = buf.readBytes(length);
+                
+                // Successfully decoded a frame.
+                // Return a SaslTokenMessageRequest object
+                return new SaslMessageToken(payload.array());
+            }
 
-            // case 2: task Message
+            // case 3: task Message
             short task = code;
 
             // Make sure that we have received at least an integer (length)

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/41986445/storm-core/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
index e6e65c3..e5dd22f 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
@@ -32,6 +32,10 @@ public class MessageEncoder extends OneToOneEncoder {
             return ((MessageBatch)obj).buffer();
         } 
         
+        if (obj instanceof SaslMessageToken) {
+        	return ((SaslMessageToken)obj).buffer();
+        }
+        
         throw new RuntimeException("Unsupported encoding of object of class "+obj.getClass().getName());
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/41986445/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java
new file mode 100644
index 0000000..8383d2c
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferOutputStream;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Send and receive SASL tokens.
+ */
+public class SaslMessageToken {
+	/** Class logger */
+	private static final Logger LOG = LoggerFactory
+			.getLogger(SaslMessageToken.class);
+
+	/** Used for client or server's token to send or receive from each other. */
+	private byte[] token;
+
+	/**
+	 * Constructor used for reflection only.
+	 */
+	public SaslMessageToken() {
+	}
+
+	/**
+	 * Constructor used to send request.
+	 * 
+	 * @param token
+	 *            the SASL token, generated by a SaslClient or SaslServer.
+	 */
+	public SaslMessageToken(byte[] token) {
+		this.token = token;
+	}
+
+	/**
+	 * Read accessor for SASL token
+	 * 
+	 * @return saslToken SASL token
+	 */
+	public byte[] getSaslToken() {
+		return token;
+	}
+
+	/**
+	 * Write accessor for SASL token
+	 * 
+	 * @param token
+	 *            SASL token
+	 */
+	public void setSaslToken(byte[] token) {
+		this.token = token;
+	}
+
+	int encodeLength() {
+		return 2+4+token.length;
+	}
+
+	/**
+	 * encode the current SaslToken Message into a channel buffer
+	 * SaslTokenMessageRequest is encoded as:
+	 * identifier .... short(2) always it is -500
+	 * payload length .... int
+	 * payload .... byte[] 
+	 * @throws Exception
+	 */
+	ChannelBuffer buffer() throws Exception {
+		ChannelBufferOutputStream bout = new ChannelBufferOutputStream(
+				ChannelBuffers.directBuffer(encodeLength()));
+		short identifier = -500;
+		int payload_len = 0;
+        if (token != null)
+            payload_len =  token.length;
+		
+		bout.writeShort((short)identifier);
+		bout.writeInt((int)payload_len);
+		if(payload_len>0) {
+			bout.write(token);
+		}
+		bout.close();
+		return bout.buffer();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/41986445/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java
new file mode 100644
index 0000000..a4f1b5e
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import java.io.IOException;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements SASL logic for storm worker client processes.
+ */
+public class SaslNettyClient {
+
+	private static final Logger LOG = LoggerFactory
+			.getLogger(SaslNettyClient.class);
+
+	/**
+	 * Used to respond to server's counterpart, SaslServer with SASL tokens
+	 * represented as byte arrays.
+	 */
+	private SaslClient saslClient;
+
+	/**
+	 * Create a SaslNettyClient for authentication with servers.
+	 */
+	public SaslNettyClient(String topologyUser) {
+		try {
+			LOG.debug("SaslNettyClient: Creating SASL "
+					+ SaslUtils.AUTH_DIGEST_MD5
+					+ " client to authenticate to server ");
+
+			saslClient = Sasl.createSaslClient(
+					new String[] { SaslUtils.AUTH_DIGEST_MD5 }, null, null,
+					SaslUtils.DEFAULT_REALM, SaslUtils.getSaslProps(),
+					new SaslClientCallbackHandler(topologyUser));
+
+		} catch (IOException e) {
+			LOG.error("SaslNettyClient: Could not obtain topology token for Netty "
+					+ "Client to use to authenticate with a Netty Server.");
+			saslClient = null;
+		}
+	}
+
+	public boolean isComplete() {
+		return saslClient.isComplete();
+	}
+
+	/**
+	 * Respond to server's SASL token.
+	 * 
+	 * @param saslTokenMessage
+	 *            contains server's SASL token
+	 * @return client's response SASL token
+	 */
+	public byte[] saslResponse(SaslMessageToken saslTokenMessage) {
+		try {
+			byte[] retval = saslClient.evaluateChallenge(saslTokenMessage
+					.getSaslToken());
+			return retval;
+		} catch (SaslException e) {
+			LOG.error(
+					"saslResponse: Failed to respond to SASL server's token:",
+					e);
+			return null;
+		}
+	}
+
+	/**
+	 * Implementation of javax.security.auth.callback.CallbackHandler that works
+	 * with Storm topology tokens.
+	 */
+	private static class SaslClientCallbackHandler implements CallbackHandler {
+		/** Generated username contained in TopologyToken */
+		private final String userName;
+		/** Generated password contained in TopologyToken */
+		private final char[] userPassword;
+
+		/**
+		 * Set private members using topology token.
+		 * 
+		 * @param topologyToken
+		 */
+		public SaslClientCallbackHandler(String topologyToken) {
+			this.userName = SaslUtils
+					.encodeIdentifier(topologyToken.getBytes());
+			this.userPassword = SaslUtils.encodePassword(topologyToken
+					.getBytes());
+		}
+
+		/**
+		 * Implementation used to respond to SASL tokens from server.
+		 * 
+		 * @param callbacks
+		 *            objects that indicate what credential information the
+		 *            server's SaslServer requires from the client.
+		 * @throws UnsupportedCallbackException
+		 */
+		public void handle(Callback[] callbacks)
+				throws UnsupportedCallbackException {
+			NameCallback nc = null;
+			PasswordCallback pc = null;
+			RealmCallback rc = null;
+			for (Callback callback : callbacks) {
+				if (callback instanceof RealmChoiceCallback) {
+					continue;
+				} else if (callback instanceof NameCallback) {
+					nc = (NameCallback) callback;
+				} else if (callback instanceof PasswordCallback) {
+					pc = (PasswordCallback) callback;
+				} else if (callback instanceof RealmCallback) {
+					rc = (RealmCallback) callback;
+				} else {
+					throw new UnsupportedCallbackException(callback,
+							"handle: Unrecognized SASL client callback");
+				}
+			}
+			if (nc != null) {
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("handle: SASL client callback: setting username: "
+							+ userName);
+				}
+				nc.setName(userName);
+			}
+			if (pc != null) {
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("handle: SASL client callback: setting userPassword");
+				}
+				pc.setPassword(userPassword);
+			}
+			if (rc != null) {
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("handle: SASL client callback: setting realm: "
+							+ rc.getDefaultText());
+				}
+				rc.setText(rc.getDefaultText());
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/41986445/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClientState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClientState.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClientState.java
new file mode 100644
index 0000000..6df6c53
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClientState.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelLocal;
+
+final class SaslNettyClientState {
+
+	public static final ChannelLocal<SaslNettyClient> getSaslNettyClient = new ChannelLocal<SaslNettyClient>() {
+		protected SaslNettyClient initialValue(Channel channel) {
+			return null;
+		}
+	};
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/41986445/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServer.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServer.java
new file mode 100644
index 0000000..1178bd6
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServer.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+import org.apache.commons.codec.binary.Base64;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class SaslNettyServer {
+
+	private static final Logger LOG = LoggerFactory
+			.getLogger(SaslNettyServer.class);
+
+	private SaslServer saslServer;
+
+	SaslNettyServer(String topologyToken) throws IOException {
+		LOG.debug("SaslNettyServer: Topology token is: " + topologyToken
+				+ " with authmethod " + SaslUtils.AUTH_DIGEST_MD5);
+
+		try {
+
+			SaslDigestCallbackHandler ch = new SaslNettyServer.SaslDigestCallbackHandler(
+					topologyToken);
+
+			saslServer = Sasl.createSaslServer(SaslUtils.AUTH_DIGEST_MD5, null,
+					SaslUtils.DEFAULT_REALM, SaslUtils.getSaslProps(), ch);
+
+		} catch (SaslException e) {
+			LOG.error("SaslNettyServer: Could not create SaslServer: " + e);
+		}
+
+	}
+
+	public boolean isComplete() {
+		return saslServer.isComplete();
+	}
+
+	public String getUserName() {
+		return saslServer.getAuthorizationID();
+	}
+
+	
+
+	/** CallbackHandler for SASL DIGEST-MD5 mechanism */
+	public static class SaslDigestCallbackHandler implements CallbackHandler {
+
+		/** Used to authenticate the clients */
+		private String topologyToken;
+
+		public SaslDigestCallbackHandler(String topologyToken) {
+			LOG.debug("SaslDigestCallback: Creating SaslDigestCallback handler "
+					+ "with topology token: " + topologyToken);
+			this.topologyToken = topologyToken;
+		}
+
+		@Override
+		public void handle(Callback[] callbacks) throws IOException,
+				UnsupportedCallbackException {
+			NameCallback nc = null;
+			PasswordCallback pc = null;
+			AuthorizeCallback ac = null;
+
+			for (Callback callback : callbacks) {
+				if (callback instanceof AuthorizeCallback) {
+					ac = (AuthorizeCallback) callback;
+				} else if (callback instanceof NameCallback) {
+					nc = (NameCallback) callback;
+				} else if (callback instanceof PasswordCallback) {
+					pc = (PasswordCallback) callback;
+				} else if (callback instanceof RealmCallback) {
+					continue; // realm is ignored
+				} else {
+					throw new UnsupportedCallbackException(callback,
+							"handle: Unrecognized SASL DIGEST-MD5 Callback");
+				}
+			}
+			
+			if(nc!=null) {
+				LOG.debug("handle: SASL server DIGEST-MD5 callback: setting "
+						+ "username for client: " + topologyToken);
+
+				nc.setName(topologyToken);
+			}
+
+			if (pc != null) {
+				char[] password = SaslUtils.encodePassword(topologyToken.getBytes());
+
+				LOG.debug("handle: SASL server DIGEST-MD5 callback: setting "
+						+ "password for client: " + topologyToken);
+
+				pc.setPassword(password);
+			}
+			if (ac != null) {
+
+				String authid = ac.getAuthenticationID();
+				String authzid = ac.getAuthorizationID();
+
+				if (authid.equals(authzid)) {
+					ac.setAuthorized(true);
+				} else {
+					ac.setAuthorized(false);
+				}
+
+				if (ac.isAuthorized()) {
+					if (LOG.isDebugEnabled()) {
+						String username = topologyToken;
+						LOG.debug("handle: SASL server DIGEST-MD5 callback: setting "
+								+ "canonicalized client ID: " + username);
+					}
+					ac.setAuthorizedID(authzid);
+				}
+			}
+		}
+	}
+
+	/**
+	 * Used by SaslTokenMessage::processToken() to respond to server SASL
+	 * tokens.
+	 * 
+	 * @param token
+	 *            Server's SASL token
+	 * @return token to send back to the server.
+	 */
+	public byte[] response(byte[] token) {
+		try {
+			LOG.debug("response: Responding to input token of length: "
+					+ token.length);
+			byte[] retval = saslServer.evaluateResponse(token);
+			LOG.debug("response: Response token length: " + retval.length);
+			return retval;
+		} catch (SaslException e) {
+			LOG.error("response: Failed to evaluate client token of length: "
+					+ token.length + " : " + e);
+			return null;
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/41986445/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServerState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServerState.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServerState.java
new file mode 100644
index 0000000..9800959
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServerState.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelLocal;
+
+final class SaslNettyServerState {
+
+	public static final ChannelLocal<SaslNettyServer> getSaslNettyServer = new ChannelLocal<SaslNettyServer>() {
+		protected SaslNettyServer initialValue(Channel channel) {
+			return null;
+		}
+	};
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/41986445/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
new file mode 100644
index 0000000..ca38c96
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import java.io.IOException;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.Config;
+
+public class SaslStormClientHandler extends SimpleChannelUpstreamHandler {
+
+	private static final Logger LOG = LoggerFactory
+			.getLogger(SaslStormClientHandler.class);
+	private Client client;
+	long start_time;
+	/** Used for client or server's token to send or receive from each other. */
+	private byte[] token;
+	private String topologyUser;
+
+	public SaslStormClientHandler(Client client) throws IOException {
+		this.client = client;
+		start_time = System.currentTimeMillis();
+		loadTopologyToken();
+	}
+
+	@Override
+	public void channelConnected(ChannelHandlerContext ctx,
+			ChannelStateEvent event) {
+		// register the newly established channel
+		Channel channel = ctx.getChannel();
+
+		LOG.info("Connection established from " + channel.getLocalAddress()
+				+ " to " + channel.getRemoteAddress());
+
+		try {
+			SaslNettyClient saslNettyClient = SaslNettyClientState.getSaslNettyClient
+					.get(channel);
+
+			if (saslNettyClient == null) {
+				LOG.debug("Creating saslNettyClient now " + "for channel: "
+						+ channel);
+				saslNettyClient = new SaslNettyClient(topologyUser);
+				SaslNettyClientState.getSaslNettyClient.set(channel,
+						saslNettyClient);
+			}
+			channel.write(ControlMessage.SASL_TOKEN_MESSAGE_REQUEST);
+		} catch (Exception e) {
+			LOG.error("Failed to authenticate with server " + "due to error: "
+					+ e);
+		}
+		return;
+
+	}
+
+	@Override
+	public void messageReceived(ChannelHandlerContext ctx, MessageEvent event)
+			throws Exception {
+		LOG.debug("send/recv time (ms): {}",
+				(System.currentTimeMillis() - start_time));
+
+		Channel channel = ctx.getChannel();
+
+		// Generate SASL response to server using Channel-local SASL client.
+		SaslNettyClient saslNettyClient = SaslNettyClientState.getSaslNettyClient
+				.get(channel);
+		if (saslNettyClient == null) {
+			throw new Exception("saslNettyClient was unexpectedly "
+					+ "null for channel: " + channel);
+		}
+
+		// examine the response message from server
+		if (event.getMessage() instanceof ControlMessage) {
+			ControlMessage msg = (ControlMessage) event.getMessage();
+			if (msg == ControlMessage.SASL_COMPLETE_REQUEST) {
+				LOG.debug("Server has sent us the SaslComplete "
+						+ "message. Allowing normal work to proceed.");
+
+				if (!saslNettyClient.isComplete()) {
+					LOG.error("Server returned a Sasl-complete message, "
+							+ "but as far as we can tell, we are not authenticated yet.");
+					throw new Exception("Server returned a "
+							+ "Sasl-complete message, but as far as "
+							+ "we can tell, we are not authenticated yet.");
+				}
+				ctx.getPipeline().remove(this);
+				// We call fireMessageReceived since the client is allowed to
+				// perform this request. The client's request will now proceed
+				// to the next pipeline component namely StormClientHandler.
+				Channels.fireMessageReceived(ctx, msg);
+				return;
+			}
+		}
+		SaslMessageToken saslTokenMessage = (SaslMessageToken) event
+				.getMessage();
+		LOG.debug("Responding to server's token of length: "
+				+ saslTokenMessage.getSaslToken().length);
+
+		// Generate SASL response (but we only actually send the response if
+		// it's non-null.
+		byte[] responseToServer = saslNettyClient
+				.saslResponse(saslTokenMessage);
+		if (responseToServer == null) {
+			// If we generate a null response, then authentication has completed
+			// (if not, warn), and return without sending a response back to the
+			// server.
+			LOG.debug("Response to server is null: "
+					+ "authentication should now be complete.");
+			if (!saslNettyClient.isComplete()) {
+				LOG.warn("Generated a null response, "
+						+ "but authentication is not complete.");
+			}
+			return;
+		} else {
+			LOG.debug("Response to server token has length:"
+					+ responseToServer.length);
+		}
+		// Construct a message containing the SASL response and send it to the
+		// server.
+		SaslMessageToken saslResponse = new SaslMessageToken(responseToServer);
+		channel.write(saslResponse);
+	}
+
+	/**
+	 * Load Storm Topology Token.
+	 * 
+	 * @param conf
+	 *            Configuration
+	 * @throws IOException
+	 */
+	private void loadTopologyToken() throws IOException {
+		topologyUser = (String) this.client.storm_conf
+				.get(Config.TOPOLOGY_NAME);
+		LOG.debug("SASL credentials is the storm user name: " + topologyUser);
+		token = topologyUser.getBytes();
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/41986445/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerAuthorizeHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerAuthorizeHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerAuthorizeHandler.java
new file mode 100644
index 0000000..04cd66e
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerAuthorizeHandler.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Authorize or deny client requests based on existence and completeness of
+ * client's SASL authentication.
+ */
+public class SaslStormServerAuthorizeHandler extends SimpleChannelUpstreamHandler {
+
+	private static final Logger LOG = LoggerFactory
+			.getLogger(SaslStormServerHandler.class);
+
+	/**
+	 * Constructor.
+	 */
+	public SaslStormServerAuthorizeHandler() {
+	}
+
+	@Override
+	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+		Object msg = e.getMessage();
+		if (msg == null)
+			return;
+
+		Channel channel = ctx.getChannel();
+		LOG.debug("messageReceived: Checking whether the client is authorized to send messages to the server ");
+
+		// Authorize: client is allowed to doRequest() if and only if the client
+		// has successfully authenticated with this server.
+		SaslNettyServer saslNettyServer = SaslNettyServerState.getSaslNettyServer
+				.get(channel);
+
+		if (saslNettyServer == null) {
+			LOG.warn("messageReceived: This client is *NOT* authorized to perform "
+					+ "this action since there's no saslNettyServer to "
+					+ "authenticate the client: "
+					+ "refusing to perform requested action: " + msg);
+			return;
+		}
+
+		if (!saslNettyServer.isComplete()) {
+			LOG.warn("messageReceived: This client is *NOT* authorized to perform "
+					+ "this action because SASL authentication did not complete: "
+					+ "refusing to perform requested action: " + msg);
+			// Return now *WITHOUT* sending upstream here, since client
+			// not authorized.
+			return;
+		}
+
+		LOG.debug("messageReceived: authenticated client: "
+				+ saslNettyServer.getUserName()
+				+ " is authorized to do request " + "on server.");
+
+		// We call fireMessageReceived since the client is allowed to perform
+		// this request. The client's request will now proceed to the next
+		// pipeline component.
+		Channels.fireMessageReceived(ctx, msg);
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/41986445/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
new file mode 100644
index 0000000..2e8bcac
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import java.io.IOException;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.Config;
+
+public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
+
+	Server server;
+	/** Used for client or server's token to send or receive from each other. */
+	private byte[] token;
+	private String topologyUser;
+
+	private static final Logger LOG = LoggerFactory
+			.getLogger(SaslStormServerHandler.class);
+
+	public SaslStormServerHandler(Server server) throws IOException {
+		this.server = server;
+		loadTopologyToken();
+	}
+
+	@Override
+	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+			throws Exception {
+		Object msg = e.getMessage();
+		if (msg == null)
+			return;
+
+		Channel channel = ctx.getChannel();
+		LOG.debug("messageReceived: Got " + msg.getClass());
+
+		if (msg instanceof ControlMessage
+				&& ((ControlMessage) e.getMessage()) == ControlMessage.SASL_TOKEN_MESSAGE_REQUEST) {
+			// initialize server-side SASL functionality, if we haven't yet
+			// (in which case we are looking at the first SASL message from the
+			// client).
+			SaslNettyServer saslNettyServer = SaslNettyServerState.getSaslNettyServer
+					.get(channel);
+			if (saslNettyServer == null) {
+				LOG.debug("No saslNettyServer for " + channel
+						+ " yet; creating now, with topology token: ");
+				try {
+					saslNettyServer = new SaslNettyServer(topologyUser);
+				} catch (IOException ioe) {
+					LOG.error("Error occurred while creating saslNettyServer on server "
+							+ channel.getLocalAddress()
+							+ " for client "
+							+ channel.getRemoteAddress());
+					throw new IOException(ioe);
+				}
+
+				SaslNettyServerState.getSaslNettyServer.set(channel,
+						saslNettyServer);
+			} else {
+				LOG.debug("Found existing saslNettyServer on server:"
+						+ channel.getLocalAddress() + " for client "
+						+ channel.getRemoteAddress());
+			}
+
+			LOG.debug("processToken:  With nettyServer: " + saslNettyServer
+					+ " and token length: " + token.length);
+
+			SaslMessageToken saslTokenMessageRequest = null;
+			saslTokenMessageRequest = new SaslMessageToken(
+					saslNettyServer.response(new byte[0]));
+			// Send response to client.
+			channel.write(saslTokenMessageRequest);
+			// do not send upstream to other handlers: no further action needs
+			// to be done for SASL_TOKEN_MESSAGE_REQUEST requests.
+			return;
+		}
+
+		if (msg instanceof SaslMessageToken) {
+			// initialize server-side SASL functionality, if we haven't yet
+			// (in which case we are looking at the first SASL message from the
+			// client).
+			SaslNettyServer saslNettyServer = SaslNettyServerState.getSaslNettyServer
+					.get(channel);
+			if (saslNettyServer == null) {
+				if (saslNettyServer == null) {
+					throw new Exception("saslNettyServer was unexpectedly "
+							+ "null for channel: " + channel);
+				}
+			}
+			SaslMessageToken saslTokenMessageRequest = new SaslMessageToken(
+					saslNettyServer.response(((SaslMessageToken) msg)
+							.getSaslToken()));
+
+			// Send response to client.
+			channel.write(saslTokenMessageRequest);
+
+			if (saslNettyServer.isComplete()) {
+				// If authentication of client is complete, we will also send a
+				// SASL-Complete message to the client.
+				LOG.debug("SASL authentication is complete for client with "
+						+ "username: " + saslNettyServer.getUserName());
+				channel.write(ControlMessage.SASL_COMPLETE_REQUEST);
+				LOG.debug("Removing SaslServerHandler from pipeline since SASL "
+						+ "authentication is complete.");
+				ctx.getPipeline().remove(this);
+			}
+			return;
+		} else {
+			// Client should not be sending other-than-SASL messages before
+			// SaslServerHandler has removed itself from the pipeline. Such
+			// non-SASL requests will be denied by the Authorize channel handler
+			// (the next handler upstream in the server pipeline) if SASL
+			// authentication has not completed.
+			LOG.warn("Sending upstream an unexpected non-SASL message :  "
+					+ msg);
+			Channels.fireMessageReceived(ctx, msg);
+		}
+	}
+
+	@Override
+	public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+		server.closeChannel(e.getChannel());
+	}
+
+	/**
+	 * Load Storm Topology Token.
+	 * 
+	 * @param conf
+	 *            Configuration
+	 * @throws IOException
+	 */
+	private void loadTopologyToken() throws IOException {
+		topologyUser = (String) this.server.storm_conf
+				.get(Config.TOPOLOGY_NAME);
+		LOG.debug("SASL credentials for the storm topology: " + topologyUser);
+		token = topologyUser.getBytes();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/41986445/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
new file mode 100644
index 0000000..0077cf3
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.security.sasl.Sasl;
+
+import org.apache.commons.codec.binary.Base64;
+
+class SaslUtils {
+	public static final String AUTH_DIGEST_MD5 = "DIGEST-MD5";
+	public static final String DEFAULT_REALM = "default";
+
+	static Map<String, String> getSaslProps() {
+		Map<String, String> props = new HashMap<String, String>();
+		props.put(Sasl.POLICY_NOPLAINTEXT, "true");
+		return props;
+	}
+
+	/**
+	 * Encode a password as a base64-encoded char[] array.
+	 * 
+	 * @param password
+	 *            as a byte array.
+	 * @return password as a char array.
+	 */
+	static char[] encodePassword(byte[] password) {
+		return new String(Base64.encodeBase64(password),
+				Charset.defaultCharset()).toCharArray();
+	}
+
+	/**
+	 * Encode a identifier as a base64-encoded char[] array.
+	 * 
+	 * @param identifier
+	 *            as a byte array.
+	 * @return identifier as a char array.
+	 */
+	static String encodeIdentifier(byte[] identifier) {
+		return new String(Base64.encodeBase64(identifier),
+				Charset.defaultCharset());
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/41986445/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
index 20a147d..1b2590a 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
@@ -17,19 +17,6 @@
  */
 package backtype.storm.messaging.netty;
 
-import backtype.storm.Config;
-import backtype.storm.messaging.IConnection;
-import backtype.storm.messaging.TaskMessage;
-import backtype.storm.utils.Utils;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -41,6 +28,20 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.Config;
+import backtype.storm.messaging.IConnection;
+import backtype.storm.messaging.TaskMessage;
+import backtype.storm.utils.Utils;
+
 class Server implements IConnection {
     private static final Logger LOG = LoggerFactory.getLogger(Server.class);
     @SuppressWarnings("rawtypes")

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/41986445/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
index e6e8b3d..4fdaee9 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
@@ -21,6 +21,8 @@ import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.Channels;
 
+import backtype.storm.Config;
+
 class StormClientPipelineFactory implements ChannelPipelineFactory {
     private Client client;
 
@@ -32,12 +34,24 @@ class StormClientPipelineFactory implements ChannelPipelineFactory {
         // Create a default pipeline implementation.
         ChannelPipeline pipeline = Channels.pipeline();
 
-        // Decoder
-        pipeline.addLast("decoder", new MessageDecoder());
-        // Encoder
-        pipeline.addLast("encoder", new MessageEncoder());
-        // business logic.
-        pipeline.addLast("handler", new StormClientErrorHandler(client.name()));
+        boolean isNettyAuth = (Boolean) this.client.storm_conf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION);
+        if(isNettyAuth) {
+	        // Decoder
+	        pipeline.addLast("decoder", new MessageDecoder());
+	        // Encoder
+	        pipeline.addLast("encoder", new MessageEncoder());
+	        // Authenticate: Removed after authentication completes
+	        pipeline.addLast("saslClientHandler", new SaslStormClientHandler(client));
+	        // business logic.
+	        pipeline.addLast("handler", new StormClientErrorHandler(client.name()));
+        } else {
+        	// Decoder
+	        pipeline.addLast("decoder", new MessageDecoder());
+	        // Encoder
+	        pipeline.addLast("encoder", new MessageEncoder());
+	        // business logic.
+	        pipeline.addLast("handler", new StormClientErrorHandler(client.name()));
+        }
 
         return pipeline;
     }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/41986445/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
index df29ba8..c2b4c53 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
@@ -21,6 +21,9 @@ import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.Channels;
 
+import backtype.storm.Config;
+import backtype.storm.utils.Utils;
+
 
 class StormServerPipelineFactory implements  ChannelPipelineFactory {
     private Server server;
@@ -33,13 +36,27 @@ class StormServerPipelineFactory implements  ChannelPipelineFactory {
         // Create a default pipeline implementation.
         ChannelPipeline pipeline = Channels.pipeline();
 
-        // Decoder
-        pipeline.addLast("decoder", new MessageDecoder());
-        // Encoder
-        pipeline.addLast("encoder", new MessageEncoder());
-        // business logic.
-        pipeline.addLast("handler", new StormServerHandler(server));
-
+        boolean isNettyAuth = (Boolean) this.server.storm_conf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION);
+        if(isNettyAuth) {
+        	// Decoder
+            pipeline.addLast("decoder", new MessageDecoder());
+            // Encoder
+            pipeline.addLast("encoder", new MessageEncoder());
+            // Authenticate: Removed after authentication completes
+            pipeline.addLast("saslServerHandler", new SaslStormServerHandler(server));
+            // Authorize
+            pipeline.addLast("authorizeServerHandler", new SaslStormServerAuthorizeHandler());
+            // business logic.
+            pipeline.addLast("handler", new StormServerHandler(server));
+        } else {
+        	// Decoder
+            pipeline.addLast("decoder", new MessageDecoder());
+            // Encoder
+            pipeline.addLast("encoder", new MessageEncoder());
+            // business logic.
+            pipeline.addLast("handler", new StormServerHandler(server));
+        }
+        
         return pipeline;
     }
 }


[5/9] git commit: STORM-348: Netty SASL Authentication

Posted by bo...@apache.org.
STORM-348: Netty SASL Authentication


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/19ad1351
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/19ad1351
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/19ad1351

Branch: refs/heads/security
Commit: 19ad13510cb42963f868fd8bd785b4937dd348ab
Parents: 3bce04c
Author: Raghavendra Nandagopal <sp...@gmail.com>
Authored: Mon Jul 28 17:42:05 2014 -0700
Committer: Raghavendra Nandagopal <sp...@gmail.com>
Committed: Mon Jul 28 17:42:05 2014 -0700

----------------------------------------------------------------------
 .../src/jvm/backtype/storm/messaging/netty/SaslUtils.java    | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/19ad1351/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
index a4cc0ba..a2d0b26 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import javax.security.sasl.Sasl;
 
 import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.io.Charsets;
 
 import backtype.storm.Config;
 
@@ -45,8 +46,8 @@ class SaslUtils {
      * @return password as a char array.
      */
     static char[] encodePassword(byte[] password) {
-        return new String(Base64.encodeBase64(password),
-                Charset.defaultCharset()).toCharArray();
+        return new String(Base64.encodeBase64(password), Charsets.UTF_8)
+                .toCharArray();
     }
 
     /**
@@ -57,8 +58,7 @@ class SaslUtils {
      * @return identifier as a char array.
      */
     static String encodeIdentifier(byte[] identifier) {
-        return new String(Base64.encodeBase64(identifier),
-                Charset.defaultCharset());
+        return new String(Base64.encodeBase64(identifier), Charsets.UTF_8);
     }
 
     static String getSecretKey(Map conf) {


[9/9] git commit: Updated README for STORM-348

Posted by bo...@apache.org.
Updated README for STORM-348


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/cf5fc0c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/cf5fc0c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/cf5fc0c3

Branch: refs/heads/security
Commit: cf5fc0c3f0a71d8e81d344f247a477694e83a2a7
Parents: 00f7d63
Author: Robert (Bobby) Evans <bo...@apache.org>
Authored: Wed Jul 30 11:15:43 2014 -0500
Committer: Robert (Bobby) Evans <bo...@apache.org>
Committed: Wed Jul 30 11:15:43 2014 -0500

----------------------------------------------------------------------
 README.markdown | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/cf5fc0c3/README.markdown
----------------------------------------------------------------------
diff --git a/README.markdown b/README.markdown
index cf67f64..586f171 100644
--- a/README.markdown
+++ b/README.markdown
@@ -159,6 +159,7 @@ under the License.
 * Jo Liss ([@joliss](https://github.com/joliss))
 * averykhoo ([@averykhoo](https://github.com/averykhoo))
 * Curtis Allen ([@curtisallen](https://github.com/curtisallen))
+* Raghavendra Nandagopal ([@RaghavendraNandagopal](https://github.com/RaghavendraNandagopal)) 
 
 ## Acknowledgements
 


[7/9] git commit: STORM-348: Netty SASL Authentication

Posted by bo...@apache.org.
STORM-348: Netty SASL Authentication


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/90cf0e6c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/90cf0e6c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/90cf0e6c

Branch: refs/heads/security
Commit: 90cf0e6c21df978462d29fe906ad0f2a6adcdb1f
Parents: 02e7949
Author: Raghavendra Nandagopal <sp...@gmail.com>
Authored: Tue Jul 29 17:32:38 2014 -0700
Committer: Raghavendra Nandagopal <sp...@gmail.com>
Committed: Tue Jul 29 17:32:38 2014 -0700

----------------------------------------------------------------------
 .../test/clj/backtype/storm/messaging/netty_integration_test.clj | 1 +
 storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj | 4 ++++
 2 files changed, 5 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/90cf0e6c/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj b/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
index 8534c82..dea4abe 100644
--- a/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
@@ -25,6 +25,7 @@
   (with-simulated-time-local-cluster [cluster :supervisors 4 :supervisor-slot-port-min 6710
                                       :daemon-conf {STORM-LOCAL-MODE-ZMQ true 
                                                     STORM-MESSAGING-TRANSPORT  "backtype.storm.messaging.netty.Context"
+                                                    STORM_MESSAGING_NETTY_AUTHENTICATION false
                                                     STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
                                                     STORM-MESSAGING-NETTY-MAX-RETRIES 10
                                                     STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/90cf0e6c/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
index ea7b8dc..04d25ec 100644
--- a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
@@ -26,6 +26,7 @@
 (deftest test-basic
   (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
         storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
+                    STORM_MESSAGING_NETTY_AUTHENTICATION false
                     STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
                     STORM-MESSAGING-NETTY-MAX-RETRIES 10
                     STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 
@@ -48,6 +49,7 @@
 (deftest test-large-msg
   (let [req_msg (apply str (repeat 2048000 'c')) 
         storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
+                    STORM_MESSAGING_NETTY_AUTHENTICATION false
                     STORM-MESSAGING-NETTY-BUFFER-SIZE 102400
                     STORM-MESSAGING-NETTY-MAX-RETRIES 10
                     STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 
@@ -70,6 +72,7 @@
 (deftest test-server-delayed
     (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
        storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
+                    STORM_MESSAGING_NETTY_AUTHENTICATION false
                     STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
                     STORM-MESSAGING-NETTY-MAX-RETRIES 10
                     STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 
@@ -99,6 +102,7 @@
 
 (deftest test-batch
   (let [storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
+                    STORM_MESSAGING_NETTY_AUTHENTICATION false
                     STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
                     STORM-MESSAGING-NETTY-MAX-RETRIES 10
                     STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 


[6/9] git commit: Merge remote-tracking branch 'upstream/security' into security

Posted by bo...@apache.org.
Merge remote-tracking branch 'upstream/security' into security

Conflicts:
	conf/defaults.yaml
	storm-core/src/jvm/backtype/storm/Config.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/02e79499
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/02e79499
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/02e79499

Branch: refs/heads/security
Commit: 02e79499fa381b56f8a5333e00cf0cc0bd7aa252
Parents: 19ad135 559c883
Author: Raghavendra Nandagopal <sp...@gmail.com>
Authored: Tue Jul 29 16:37:50 2014 -0700
Committer: Raghavendra Nandagopal <sp...@gmail.com>
Committed: Tue Jul 29 16:37:50 2014 -0700

----------------------------------------------------------------------
 BYLAWS.md                                       |  96 ++++
 CHANGELOG.md                                    |  31 +
 LICENSE                                         |  30 +-
 README.markdown                                 |  14 +
 STORM-UI-REST-API.md                            | 567 +++++++++++++++++++
 bin/storm                                       |  77 ++-
 conf/defaults.yaml                              |  11 +-
 conf/storm_env.ini                              |   2 +-
 dev-tools/github/__init__.py                    | 109 ++++
 dev-tools/jira-github-join.py                   |  80 +++
 dev-tools/jira/__init__.py                      | 232 ++++++++
 examples/storm-starter/README.markdown          |  30 +-
 .../storm-starter/multilang/resources/storm.py  |   2 +-
 .../src/jvm/storm/starter/RollingTopWords.java  |  62 +-
 .../src/jvm/storm/starter/util/StormRunner.java |   9 +
 .../storm-kafka/src/jvm/storm/kafka/Broker.java |   9 +-
 .../src/jvm/storm/kafka/KafkaConfig.java        |   2 +-
 .../src/jvm/storm/kafka/Partition.java          |   9 +-
 pom.xml                                         |   6 +-
 .../src/clj/backtype/storm/LocalCluster.clj     |   7 +-
 storm-core/src/clj/backtype/storm/cluster.clj   |  31 +-
 .../src/clj/backtype/storm/command/monitor.clj  |  37 ++
 .../src/clj/backtype/storm/daemon/common.clj    |   2 +-
 .../src/clj/backtype/storm/daemon/drpc.clj      |   6 +-
 .../src/clj/backtype/storm/daemon/executor.clj  |   4 +-
 .../src/clj/backtype/storm/daemon/nimbus.clj    |   7 +-
 .../clj/backtype/storm/daemon/supervisor.clj    | 104 +++-
 .../src/clj/backtype/storm/daemon/worker.clj    |  72 +--
 storm-core/src/clj/backtype/storm/disruptor.clj |   2 +-
 storm-core/src/clj/backtype/storm/event.clj     |   2 +-
 storm-core/src/clj/backtype/storm/testing.clj   |  49 +-
 storm-core/src/clj/backtype/storm/ui/core.clj   |   2 +-
 .../src/clj/backtype/storm/ui/helpers.clj       |   5 -
 storm-core/src/clj/backtype/storm/util.clj      |  52 +-
 storm-core/src/clj/backtype/storm/zookeeper.clj |  25 +
 .../src/dev/resources/tester_bolt_metrics.py    |  35 ++
 .../src/dev/resources/tester_spout_metrics.py   |  51 ++
 storm-core/src/jvm/backtype/storm/Config.java   | 306 +++++-----
 .../jvm/backtype/storm/ConfigValidation.java    |  70 +++
 .../backtype/storm/messaging/netty/Client.java  |  13 +-
 .../metric/api/rpc/AssignableShellMetric.java   |  30 +
 .../metric/api/rpc/CombinedShellMetric.java     |  31 +
 .../storm/metric/api/rpc/CountShellMetric.java  |  38 ++
 .../storm/metric/api/rpc/IShellMetric.java      |  31 +
 .../metric/api/rpc/ReducedShellMetric.java      |  32 ++
 .../storm/multilang/JsonSerializer.java         |  15 +
 .../jvm/backtype/storm/multilang/ShellMsg.java  |  46 ++
 .../backtype/storm/security/auth/AuthUtils.java |  49 +-
 .../auth/IGroupMappingServiceProvider.java      |  42 ++
 .../security/auth/ShellBasedGroupsMapping.java  |  94 +++
 .../auth/authorizer/SimpleACLAuthorizer.java    |  29 +-
 .../src/jvm/backtype/storm/spout/ISpout.java    |   2 +-
 .../jvm/backtype/storm/spout/ShellSpout.java    |  72 ++-
 .../src/jvm/backtype/storm/task/ShellBolt.java  |  75 ++-
 .../backtype/storm/task/TopologyContext.java    |  28 +
 .../storm/testing/PythonShellMetricsBolt.java   |  32 ++
 .../storm/testing/PythonShellMetricsSpout.java  |  35 ++
 .../src/jvm/backtype/storm/utils/Monitor.java   | 249 ++++++++
 .../jvm/backtype/storm/utils/ShellProcess.java  |  46 +-
 .../jvm/backtype/storm/utils/ShellUtils.java    | 498 ++++++++++++++++
 .../src/jvm/backtype/storm/utils/Utils.java     |  23 +-
 storm-core/src/multilang/py/storm.py            |  30 +-
 storm-core/src/multilang/rb/storm.rb            |  24 +-
 storm-core/src/ui/public/component.html         |   3 +-
 .../src/ui/public/js/jquery.tablesorter.min.js  |   9 +-
 storm-core/src/ui/public/js/moment.min.js       |   6 +
 storm-core/src/ui/public/js/script.js           |   9 +
 .../test/clj/backtype/storm/cluster_test.clj    |   3 +-
 .../test/clj/backtype/storm/config_test.clj     |  41 +-
 .../test/clj/backtype/storm/metrics_test.clj    | 206 ++++---
 .../backtype/storm/security/auth/auth_test.clj  |  11 +-
 .../test/clj/backtype/storm/supervisor_test.clj | 135 ++++-
 72 files changed, 3780 insertions(+), 454 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/02e79499/conf/defaults.yaml
----------------------------------------------------------------------
diff --cc conf/defaults.yaml
index ee66717,05948e1..3e00b3d
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@@ -153,9 -155,9 +155,12 @@@ storm.messaging.netty.transfer.batch.si
  # We check with this interval that whether the Netty channel is writable and try to write pending messages if it is.
  storm.messaging.netty.flush.check.interval.ms: 10
  
 +# By default, the Netty SASL authentication is set to false.  Users can override and set it true for a specific topology.
 +storm.messaging.netty.authentication: false
 +
+ # default number of seconds group mapping service will cache user group
+ storm.group.mapping.service.cache.duration.secs: 120
+ 
  ### topology.* configs are for specific executing storms
  topology.enable.message.timeouts: true
  topology.debug: false

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/02e79499/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/Config.java
index 46b120c,d6b45ea..94d1cb3
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@@ -103,13 -102,7 +102,13 @@@ public class Config extends HashMap<Str
       * We check with this interval that whether the Netty channel is writable and try to write pending messages
       */
      public static final String STORM_NETTY_FLUSH_CHECK_INTERVAL_MS = "storm.messaging.netty.flush.check.interval.ms";
-     public static final Object STORM_NETTY_FLUSH_CHECK_INTERVAL_MS_SCHEMA = Number.class;
-     
+     public static final Object STORM_NETTY_FLUSH_CHECK_INTERVAL_MS_SCHEMA = ConfigValidation.IntegerValidator;
++
 +    /**
 +     * Netty based messaging: Is authentication required for Netty messaging from client worker process to server worker process.
 +     */
 +    public static final String STORM_MESSAGING_NETTY_AUTHENTICATION = "storm.messaging.netty.authentication"; 
 +    public static final Object STORM_MESSAGING_NETTY_AUTHENTICATION_SCHEMA = Boolean.class;
      
      /**
       * A list of hosts of ZooKeeper servers used to manage the cluster.

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/02e79499/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------


[8/9] git commit: STORM-348: Netty SASL Authentication

Posted by bo...@apache.org.
STORM-348: Netty SASL Authentication


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/00f7d632
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/00f7d632
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/00f7d632

Branch: refs/heads/security
Commit: 00f7d632eb48bf967b5a7b72f5460352d0a8dbd9
Parents: 90cf0e6
Author: Raghavendra Nandagopal <sp...@gmail.com>
Authored: Tue Jul 29 17:50:24 2014 -0700
Committer: Raghavendra Nandagopal <sp...@gmail.com>
Committed: Tue Jul 29 17:50:24 2014 -0700

----------------------------------------------------------------------
 .../clj/backtype/storm/messaging/netty_integration_test.clj  | 2 +-
 .../test/clj/backtype/storm/messaging/netty_unit_test.clj    | 8 ++++----
 2 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/00f7d632/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj b/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
index dea4abe..98144cc 100644
--- a/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
@@ -25,7 +25,7 @@
   (with-simulated-time-local-cluster [cluster :supervisors 4 :supervisor-slot-port-min 6710
                                       :daemon-conf {STORM-LOCAL-MODE-ZMQ true 
                                                     STORM-MESSAGING-TRANSPORT  "backtype.storm.messaging.netty.Context"
-                                                    STORM_MESSAGING_NETTY_AUTHENTICATION false
+                                                    STORM-MESSAGING-NETTY-AUTHENTICATION false
                                                     STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
                                                     STORM-MESSAGING-NETTY-MAX-RETRIES 10
                                                     STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/00f7d632/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
index 04d25ec..8aaa7e5 100644
--- a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
@@ -26,7 +26,7 @@
 (deftest test-basic
   (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
         storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
-                    STORM_MESSAGING_NETTY_AUTHENTICATION false
+                    STORM-MESSAGING-NETTY-AUTHENTICATION false
                     STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
                     STORM-MESSAGING-NETTY-MAX-RETRIES 10
                     STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 
@@ -49,7 +49,7 @@
 (deftest test-large-msg
   (let [req_msg (apply str (repeat 2048000 'c')) 
         storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
-                    STORM_MESSAGING_NETTY_AUTHENTICATION false
+                    STORM-MESSAGING-NETTY-AUTHENTICATION false
                     STORM-MESSAGING-NETTY-BUFFER-SIZE 102400
                     STORM-MESSAGING-NETTY-MAX-RETRIES 10
                     STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 
@@ -72,7 +72,7 @@
 (deftest test-server-delayed
     (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
        storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
-                    STORM_MESSAGING_NETTY_AUTHENTICATION false
+                    STORM-MESSAGING-NETTY-AUTHENTICATION false
                     STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
                     STORM-MESSAGING-NETTY-MAX-RETRIES 10
                     STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 
@@ -102,7 +102,7 @@
 
 (deftest test-batch
   (let [storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
-                    STORM_MESSAGING_NETTY_AUTHENTICATION false
+                    STORM-MESSAGING-NETTY-AUTHENTICATION false
                     STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
                     STORM-MESSAGING-NETTY-MAX-RETRIES 10
                     STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 


[4/9] git commit: STORM-348: Netty SASL Authentication

Posted by bo...@apache.org.
STORM-348: Netty SASL Authentication


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/3bce04ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/3bce04ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/3bce04ce

Branch: refs/heads/security
Commit: 3bce04ceab9abfd6e6da211cb5a792d9df6c96c6
Parents: 63e9082
Author: Raghavendra Nandagopal <sp...@gmail.com>
Authored: Mon Jul 28 16:28:45 2014 -0700
Committer: Raghavendra Nandagopal <sp...@gmail.com>
Committed: Mon Jul 28 16:28:45 2014 -0700

----------------------------------------------------------------------
 .../backtype/storm/messaging/netty/Client.java  |   2 +-
 .../storm/messaging/netty/SaslMessageToken.java | 123 +++++-----
 .../storm/messaging/netty/SaslNettyClient.java  | 230 +++++++++---------
 .../messaging/netty/SaslStormClientHandler.java | 243 ++++++++++---------
 .../messaging/netty/SaslStormServerHandler.java | 240 +++++++++---------
 .../storm/messaging/netty/SaslUtils.java        |  75 +++---
 .../netty/StormClientPipelineFactory.java       |  32 ++-
 .../netty/StormServerPipelineFactory.java       |  43 ++--
 8 files changed, 490 insertions(+), 498 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/3bce04ce/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 64a1757..0cf1809 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -62,7 +62,7 @@ public class Client implements IConnection {
     
     Map storm_conf;
 
-	MessageBatch messageBatch = null;
+    private MessageBatch messageBatch = null;
     private AtomicLong flushCheckTimer;
     private int flushCheckInterval;
     private ScheduledExecutorService scheduler;

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/3bce04ce/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java
index 8383d2c..d0d3ca1 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java
@@ -27,74 +27,73 @@ import org.slf4j.LoggerFactory;
  * Send and receive SASL tokens.
  */
 public class SaslMessageToken {
-	/** Class logger */
-	private static final Logger LOG = LoggerFactory
-			.getLogger(SaslMessageToken.class);
+    /** Class logger */
+    private static final Logger LOG = LoggerFactory
+            .getLogger(SaslMessageToken.class);
 
-	/** Used for client or server's token to send or receive from each other. */
-	private byte[] token;
+    /** Used for client or server's token to send or receive from each other. */
+    private byte[] token;
 
-	/**
-	 * Constructor used for reflection only.
-	 */
-	public SaslMessageToken() {
-	}
+    /**
+     * Constructor used for reflection only.
+     */
+    public SaslMessageToken() {
+    }
 
-	/**
-	 * Constructor used to send request.
-	 * 
-	 * @param token
-	 *            the SASL token, generated by a SaslClient or SaslServer.
-	 */
-	public SaslMessageToken(byte[] token) {
-		this.token = token;
-	}
+    /**
+     * Constructor used to send request.
+     * 
+     * @param token
+     *            the SASL token, generated by a SaslClient or SaslServer.
+     */
+    public SaslMessageToken(byte[] token) {
+        this.token = token;
+    }
 
-	/**
-	 * Read accessor for SASL token
-	 * 
-	 * @return saslToken SASL token
-	 */
-	public byte[] getSaslToken() {
-		return token;
-	}
+    /**
+     * Read accessor for SASL token
+     * 
+     * @return saslToken SASL token
+     */
+    public byte[] getSaslToken() {
+        return token;
+    }
 
-	/**
-	 * Write accessor for SASL token
-	 * 
-	 * @param token
-	 *            SASL token
-	 */
-	public void setSaslToken(byte[] token) {
-		this.token = token;
-	}
+    /**
+     * Write accessor for SASL token
+     * 
+     * @param token
+     *            SASL token
+     */
+    public void setSaslToken(byte[] token) {
+        this.token = token;
+    }
 
-	int encodeLength() {
-		return 2+4+token.length;
-	}
+    int encodeLength() {
+        return 2 + 4 + token.length;
+    }
 
-	/**
-	 * encode the current SaslToken Message into a channel buffer
-	 * SaslTokenMessageRequest is encoded as:
-	 * identifier .... short(2) always it is -500
-	 * payload length .... int
-	 * payload .... byte[] 
-	 * @throws Exception
-	 */
-	ChannelBuffer buffer() throws Exception {
-		ChannelBufferOutputStream bout = new ChannelBufferOutputStream(
-				ChannelBuffers.directBuffer(encodeLength()));
-		short identifier = -500;
-		int payload_len = 0;
+    /**
+     * encode the current SaslToken Message into a channel buffer
+     * SaslTokenMessageRequest is encoded as: identifier .... short(2) always it
+     * is -500 payload length .... int payload .... byte[]
+     * 
+     * @throws Exception
+     */
+    ChannelBuffer buffer() throws Exception {
+        ChannelBufferOutputStream bout = new ChannelBufferOutputStream(
+                ChannelBuffers.directBuffer(encodeLength()));
+        short identifier = -500;
+        int payload_len = 0;
         if (token != null)
-            payload_len =  token.length;
-		
-		bout.writeShort((short)identifier);
-		bout.writeInt((int)payload_len);
-		if(payload_len>0) {
-			bout.write(token);
-		}
-		bout.close();
-		return bout.buffer();
-	}
+            payload_len = token.length;
+
+        bout.writeShort((short) identifier);
+        bout.writeInt((int) payload_len);
+        if (payload_len > 0) {
+            bout.write(token);
+        }
+        bout.close();
+        return bout.buffer();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/3bce04ce/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java
index fedcfff..023e950 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java
@@ -38,129 +38,129 @@ import org.slf4j.LoggerFactory;
  */
 public class SaslNettyClient {
 
-	private static final Logger LOG = LoggerFactory
-			.getLogger(SaslNettyClient.class);
+    private static final Logger LOG = LoggerFactory
+            .getLogger(SaslNettyClient.class);
 
-	/**
-	 * Used to respond to server's counterpart, SaslServer with SASL tokens
-	 * represented as byte arrays.
-	 */
-	private SaslClient saslClient;
+    /**
+     * Used to respond to server's counterpart, SaslServer with SASL tokens
+     * represented as byte arrays.
+     */
+    private SaslClient saslClient;
 
-	/**
-	 * Create a SaslNettyClient for authentication with servers.
-	 */
-	public SaslNettyClient(String topologyName, byte[] token) {
-		try {
-			LOG.debug("SaslNettyClient: Creating SASL "
-					+ SaslUtils.AUTH_DIGEST_MD5
-					+ " client to authenticate to server ");
+    /**
+     * Create a SaslNettyClient for authentication with servers.
+     */
+    public SaslNettyClient(String topologyName, byte[] token) {
+        try {
+            LOG.debug("SaslNettyClient: Creating SASL "
+                    + SaslUtils.AUTH_DIGEST_MD5
+                    + " client to authenticate to server ");
 
-			saslClient = Sasl.createSaslClient(
-					new String[] { SaslUtils.AUTH_DIGEST_MD5 }, null, null,
-					SaslUtils.DEFAULT_REALM, SaslUtils.getSaslProps(),
-					new SaslClientCallbackHandler(topologyName, token));
+            saslClient = Sasl.createSaslClient(
+                    new String[] { SaslUtils.AUTH_DIGEST_MD5 }, null, null,
+                    SaslUtils.DEFAULT_REALM, SaslUtils.getSaslProps(),
+                    new SaslClientCallbackHandler(topologyName, token));
 
-		} catch (IOException e) {
-			LOG.error("SaslNettyClient: Could not obtain topology token for Netty "
-					+ "Client to use to authenticate with a Netty Server.");
-			saslClient = null;
-		}
-	}
+        } catch (IOException e) {
+            LOG.error("SaslNettyClient: Could not obtain topology token for Netty "
+                    + "Client to use to authenticate with a Netty Server.");
+            saslClient = null;
+        }
+    }
 
-	public boolean isComplete() {
-		return saslClient.isComplete();
-	}
+    public boolean isComplete() {
+        return saslClient.isComplete();
+    }
 
-	/**
-	 * Respond to server's SASL token.
-	 * 
-	 * @param saslTokenMessage
-	 *            contains server's SASL token
-	 * @return client's response SASL token
-	 */
-	public byte[] saslResponse(SaslMessageToken saslTokenMessage) {
-		try {
-			byte[] retval = saslClient.evaluateChallenge(saslTokenMessage
-					.getSaslToken());
-			return retval;
-		} catch (SaslException e) {
-			LOG.error(
-					"saslResponse: Failed to respond to SASL server's token:",
-					e);
-			return null;
-		}
-	}
+    /**
+     * Respond to server's SASL token.
+     * 
+     * @param saslTokenMessage
+     *            contains server's SASL token
+     * @return client's response SASL token
+     */
+    public byte[] saslResponse(SaslMessageToken saslTokenMessage) {
+        try {
+            byte[] retval = saslClient.evaluateChallenge(saslTokenMessage
+                    .getSaslToken());
+            return retval;
+        } catch (SaslException e) {
+            LOG.error(
+                    "saslResponse: Failed to respond to SASL server's token:",
+                    e);
+            return null;
+        }
+    }
 
-	/**
-	 * Implementation of javax.security.auth.callback.CallbackHandler that works
-	 * with Storm topology tokens.
-	 */
-	private static class SaslClientCallbackHandler implements CallbackHandler {
-		/** Generated username contained in TopologyToken */
-		private final String userName;
-		/** Generated password contained in TopologyToken */
-		private final char[] userPassword;
+    /**
+     * Implementation of javax.security.auth.callback.CallbackHandler that works
+     * with Storm topology tokens.
+     */
+    private static class SaslClientCallbackHandler implements CallbackHandler {
+        /** Generated username contained in TopologyToken */
+        private final String userName;
+        /** Generated password contained in TopologyToken */
+        private final char[] userPassword;
 
-		/**
-		 * Set private members using topology token.
-		 * 
-		 * @param topologyToken
-		 */
-		public SaslClientCallbackHandler(String topologyToken, byte[] token) {
-			this.userName = SaslUtils
-					.encodeIdentifier(topologyToken.getBytes());
-			this.userPassword = SaslUtils.encodePassword(token);
-		}
+        /**
+         * Set private members using topology token.
+         * 
+         * @param topologyToken
+         */
+        public SaslClientCallbackHandler(String topologyToken, byte[] token) {
+            this.userName = SaslUtils
+                    .encodeIdentifier(topologyToken.getBytes());
+            this.userPassword = SaslUtils.encodePassword(token);
+        }
 
-		/**
-		 * Implementation used to respond to SASL tokens from server.
-		 * 
-		 * @param callbacks
-		 *            objects that indicate what credential information the
-		 *            server's SaslServer requires from the client.
-		 * @throws UnsupportedCallbackException
-		 */
-		public void handle(Callback[] callbacks)
-				throws UnsupportedCallbackException {
-			NameCallback nc = null;
-			PasswordCallback pc = null;
-			RealmCallback rc = null;
-			for (Callback callback : callbacks) {
-				if (callback instanceof RealmChoiceCallback) {
-					continue;
-				} else if (callback instanceof NameCallback) {
-					nc = (NameCallback) callback;
-				} else if (callback instanceof PasswordCallback) {
-					pc = (PasswordCallback) callback;
-				} else if (callback instanceof RealmCallback) {
-					rc = (RealmCallback) callback;
-				} else {
-					throw new UnsupportedCallbackException(callback,
-							"handle: Unrecognized SASL client callback");
-				}
-			}
-			if (nc != null) {
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("handle: SASL client callback: setting username: "
-							+ userName);
-				}
-				nc.setName(userName);
-			}
-			if (pc != null) {
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("handle: SASL client callback: setting userPassword");
-				}
-				pc.setPassword(userPassword);
-			}
-			if (rc != null) {
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("handle: SASL client callback: setting realm: "
-							+ rc.getDefaultText());
-				}
-				rc.setText(rc.getDefaultText());
-			}
-		}
-	}
+        /**
+         * Implementation used to respond to SASL tokens from server.
+         * 
+         * @param callbacks
+         *            objects that indicate what credential information the
+         *            server's SaslServer requires from the client.
+         * @throws UnsupportedCallbackException
+         */
+        public void handle(Callback[] callbacks)
+                throws UnsupportedCallbackException {
+            NameCallback nc = null;
+            PasswordCallback pc = null;
+            RealmCallback rc = null;
+            for (Callback callback : callbacks) {
+                if (callback instanceof RealmChoiceCallback) {
+                    continue;
+                } else if (callback instanceof NameCallback) {
+                    nc = (NameCallback) callback;
+                } else if (callback instanceof PasswordCallback) {
+                    pc = (PasswordCallback) callback;
+                } else if (callback instanceof RealmCallback) {
+                    rc = (RealmCallback) callback;
+                } else {
+                    throw new UnsupportedCallbackException(callback,
+                            "handle: Unrecognized SASL client callback");
+                }
+            }
+            if (nc != null) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("handle: SASL client callback: setting username: "
+                            + userName);
+                }
+                nc.setName(userName);
+            }
+            if (pc != null) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("handle: SASL client callback: setting userPassword");
+                }
+                pc.setPassword(userPassword);
+            }
+            if (rc != null) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("handle: SASL client callback: setting realm: "
+                            + rc.getDefaultText());
+                }
+                rc.setText(rc.getDefaultText());
+            }
+        }
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/3bce04ce/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
index 59c4abd..f94cbc3 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
@@ -32,124 +32,127 @@ import backtype.storm.Config;
 
 public class SaslStormClientHandler extends SimpleChannelUpstreamHandler {
 
-	private static final Logger LOG = LoggerFactory
-			.getLogger(SaslStormClientHandler.class);
-	private Client client;
-	long start_time;
-	/** Used for client or server's token to send or receive from each other. */
-	private byte[] token;
-	private String topologyName;
-
-	public SaslStormClientHandler(Client client) throws IOException {
-		this.client = client;
-		start_time = System.currentTimeMillis();
-		getSASLCredentials();
-	}
-
-	@Override
-	public void channelConnected(ChannelHandlerContext ctx,
-			ChannelStateEvent event) {
-		// register the newly established channel
-		Channel channel = ctx.getChannel();
-
-		LOG.info("Connection established from " + channel.getLocalAddress()
-				+ " to " + channel.getRemoteAddress());
-
-		try {
-			SaslNettyClient saslNettyClient = SaslNettyClientState.getSaslNettyClient
-					.get(channel);
-
-			if (saslNettyClient == null) {
-				LOG.debug("Creating saslNettyClient now " + "for channel: "
-						+ channel);
-				saslNettyClient = new SaslNettyClient(topologyName, token);
-				SaslNettyClientState.getSaslNettyClient.set(channel,
-						saslNettyClient);
-			}
-			channel.write(ControlMessage.SASL_TOKEN_MESSAGE_REQUEST);
-		} catch (Exception e) {
-			LOG.error("Failed to authenticate with server " + "due to error: "
-					+ e);
-		}
-		return;
-
-	}
-
-	@Override
-	public void messageReceived(ChannelHandlerContext ctx, MessageEvent event)
-			throws Exception {
-		LOG.debug("send/recv time (ms): {}",
-				(System.currentTimeMillis() - start_time));
-
-		Channel channel = ctx.getChannel();
-
-		// Generate SASL response to server using Channel-local SASL client.
-		SaslNettyClient saslNettyClient = SaslNettyClientState.getSaslNettyClient
-				.get(channel);
-		if (saslNettyClient == null) {
-			throw new Exception("saslNettyClient was unexpectedly "
-					+ "null for channel: " + channel);
-		}
-
-		// examine the response message from server
-		if (event.getMessage() instanceof ControlMessage) {
-			ControlMessage msg = (ControlMessage) event.getMessage();
-			if (msg == ControlMessage.SASL_COMPLETE_REQUEST) {
-				LOG.debug("Server has sent us the SaslComplete "
-						+ "message. Allowing normal work to proceed.");
-
-				if (!saslNettyClient.isComplete()) {
-					LOG.error("Server returned a Sasl-complete message, "
-							+ "but as far as we can tell, we are not authenticated yet.");
-					throw new Exception("Server returned a "
-							+ "Sasl-complete message, but as far as "
-							+ "we can tell, we are not authenticated yet.");
-				}
-				ctx.getPipeline().remove(this);
-				// We call fireMessageReceived since the client is allowed to
-				// perform this request. The client's request will now proceed
-				// to the next pipeline component namely StormClientHandler.
-				Channels.fireMessageReceived(ctx, msg);
-				return;
-			}
-		}
-		SaslMessageToken saslTokenMessage = (SaslMessageToken) event
-				.getMessage();
-		LOG.debug("Responding to server's token of length: "
-				+ saslTokenMessage.getSaslToken().length);
-
-		// Generate SASL response (but we only actually send the response if
-		// it's non-null.
-		byte[] responseToServer = saslNettyClient
-				.saslResponse(saslTokenMessage);
-		if (responseToServer == null) {
-			// If we generate a null response, then authentication has completed
-			// (if not, warn), and return without sending a response back to the
-			// server.
-			LOG.debug("Response to server is null: "
-					+ "authentication should now be complete.");
-			if (!saslNettyClient.isComplete()) {
-				LOG.warn("Generated a null response, "
-						+ "but authentication is not complete.");
-			}
-			return;
-		} else {
-			LOG.debug("Response to server token has length:"
-					+ responseToServer.length);
-		}
-		// Construct a message containing the SASL response and send it to the
-		// server.
-		SaslMessageToken saslResponse = new SaslMessageToken(responseToServer);
-		channel.write(saslResponse);
-	}
-
-	private void getSASLCredentials() throws IOException {
-		topologyName = (String) this.client.storm_conf
-				.get(Config.TOPOLOGY_NAME);
-		String secretKey = SaslUtils.getSecretKey(this.client.storm_conf);
-		if(secretKey!=null) {
-			token = secretKey.getBytes();	
-		}
-		LOG.debug("SASL credentials for storm topology "+topologyName+ " is "+secretKey);
-	}
+    private static final Logger LOG = LoggerFactory
+            .getLogger(SaslStormClientHandler.class);
+    private Client client;
+    long start_time;
+    /** Used for client or server's token to send or receive from each other. */
+    private byte[] token;
+    private String topologyName;
+
+    public SaslStormClientHandler(Client client) throws IOException {
+        this.client = client;
+        start_time = System.currentTimeMillis();
+        getSASLCredentials();
+    }
+
+    @Override
+    public void channelConnected(ChannelHandlerContext ctx,
+            ChannelStateEvent event) {
+        // register the newly established channel
+        Channel channel = ctx.getChannel();
+
+        LOG.info("Connection established from " + channel.getLocalAddress()
+                + " to " + channel.getRemoteAddress());
+
+        try {
+            SaslNettyClient saslNettyClient = SaslNettyClientState.getSaslNettyClient
+                    .get(channel);
+
+            if (saslNettyClient == null) {
+                LOG.debug("Creating saslNettyClient now " + "for channel: "
+                        + channel);
+                saslNettyClient = new SaslNettyClient(topologyName, token);
+                SaslNettyClientState.getSaslNettyClient.set(channel,
+                        saslNettyClient);
+            }
+            channel.write(ControlMessage.SASL_TOKEN_MESSAGE_REQUEST);
+        } catch (Exception e) {
+            LOG.error("Failed to authenticate with server " + "due to error: ",
+                    e);
+        }
+        return;
+
+    }
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent event)
+            throws Exception {
+        LOG.debug("send/recv time (ms): {}",
+                (System.currentTimeMillis() - start_time));
+
+        Channel channel = ctx.getChannel();
+
+        // Generate SASL response to server using Channel-local SASL client.
+        SaslNettyClient saslNettyClient = SaslNettyClientState.getSaslNettyClient
+                .get(channel);
+        if (saslNettyClient == null) {
+            throw new Exception("saslNettyClient was unexpectedly "
+                    + "null for channel: " + channel);
+        }
+
+        // examine the response message from server
+        if (event.getMessage() instanceof ControlMessage) {
+            ControlMessage msg = (ControlMessage) event.getMessage();
+            if (msg == ControlMessage.SASL_COMPLETE_REQUEST) {
+                LOG.debug("Server has sent us the SaslComplete "
+                        + "message. Allowing normal work to proceed.");
+
+                if (!saslNettyClient.isComplete()) {
+                    LOG.error("Server returned a Sasl-complete message, "
+                            + "but as far as we can tell, we are not authenticated yet.");
+                    throw new Exception("Server returned a "
+                            + "Sasl-complete message, but as far as "
+                            + "we can tell, we are not authenticated yet.");
+                }
+                ctx.getPipeline().remove(this);
+                // We call fireMessageReceived since the client is allowed to
+                // perform this request. The client's request will now proceed
+                // to the next pipeline component namely StormClientHandler.
+                Channels.fireMessageReceived(ctx, msg);
+                return;
+            }
+        }
+        SaslMessageToken saslTokenMessage = (SaslMessageToken) event
+                .getMessage();
+        LOG.debug("Responding to server's token of length: "
+                + saslTokenMessage.getSaslToken().length);
+
+        // Generate SASL response (but we only actually send the response if
+        // it's non-null.
+        byte[] responseToServer = saslNettyClient
+                .saslResponse(saslTokenMessage);
+        if (responseToServer == null) {
+            // If we generate a null response, then authentication has completed
+            // (if not, warn), and return without sending a response back to the
+            // server.
+            LOG.debug("Response to server is null: "
+                    + "authentication should now be complete.");
+            if (!saslNettyClient.isComplete()) {
+                LOG.warn("Generated a null response, "
+                        + "but authentication is not complete.");
+                throw new Exception("Server reponse is null, but as far as "
+                        + "we can tell, we are not authenticated yet.");
+            }
+            return;
+        } else {
+            LOG.debug("Response to server token has length:"
+                    + responseToServer.length);
+        }
+        // Construct a message containing the SASL response and send it to the
+        // server.
+        SaslMessageToken saslResponse = new SaslMessageToken(responseToServer);
+        channel.write(saslResponse);
+    }
+
+    private void getSASLCredentials() throws IOException {
+        topologyName = (String) this.client.storm_conf
+                .get(Config.TOPOLOGY_NAME);
+        String secretKey = SaslUtils.getSecretKey(this.client.storm_conf);
+        if (secretKey != null) {
+            token = secretKey.getBytes();
+        }
+        LOG.debug("SASL credentials for storm topology " + topologyName
+                + " is " + secretKey);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/3bce04ce/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
index d06e960..02448e2 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
@@ -32,124 +32,124 @@ import backtype.storm.Config;
 
 public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
 
-	Server server;
-	/** Used for client or server's token to send or receive from each other. */
-	private byte[] token;
-	private String topologyName;
-
-	private static final Logger LOG = LoggerFactory
-			.getLogger(SaslStormServerHandler.class);
-
-	public SaslStormServerHandler(Server server) throws IOException {
-		this.server = server;
-		getSASLCredentials();
-	}
-
-	@Override
-	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
-			throws Exception {
-		Object msg = e.getMessage();
-		if (msg == null)
-			return;
-
-		Channel channel = ctx.getChannel();
-
-		if (msg instanceof ControlMessage
-				&& ((ControlMessage) e.getMessage()) == ControlMessage.SASL_TOKEN_MESSAGE_REQUEST) {
-			// initialize server-side SASL functionality, if we haven't yet
-			// (in which case we are looking at the first SASL message from the
-			// client).
-			SaslNettyServer saslNettyServer = SaslNettyServerState.getSaslNettyServer
-					.get(channel);
-			if (saslNettyServer == null) {
-				LOG.debug("No saslNettyServer for " + channel
-						+ " yet; creating now, with topology token: ");
-				try {
-					saslNettyServer = new SaslNettyServer(topologyName, token);
-				} catch (IOException ioe) {
-					LOG.error("Error occurred while creating saslNettyServer on server "
-							+ channel.getLocalAddress()
-							+ " for client "
-							+ channel.getRemoteAddress());
-					throw new IOException(ioe);
-				}
-
-				SaslNettyServerState.getSaslNettyServer.set(channel,
-						saslNettyServer);
-			} else {
-				LOG.debug("Found existing saslNettyServer on server:"
-						+ channel.getLocalAddress() + " for client "
-						+ channel.getRemoteAddress());
-			}
-
-			LOG.debug("processToken:  With nettyServer: " + saslNettyServer
-					+ " and token length: " + token.length);
-
-			SaslMessageToken saslTokenMessageRequest = null;
-			saslTokenMessageRequest = new SaslMessageToken(
-					saslNettyServer.response(new byte[0]));
-			// Send response to client.
-			channel.write(saslTokenMessageRequest);
-			// do not send upstream to other handlers: no further action needs
-			// to be done for SASL_TOKEN_MESSAGE_REQUEST requests.
-			return;
-		}
-
-		if (msg instanceof SaslMessageToken) {
-			// initialize server-side SASL functionality, if we haven't yet
-			// (in which case we are looking at the first SASL message from the
-			// client).
-			SaslNettyServer saslNettyServer = SaslNettyServerState.getSaslNettyServer
-					.get(channel);
-			if (saslNettyServer == null) {
-				if (saslNettyServer == null) {
-					throw new Exception("saslNettyServer was unexpectedly "
-							+ "null for channel: " + channel);
-				}
-			}
-			SaslMessageToken saslTokenMessageRequest = new SaslMessageToken(
-					saslNettyServer.response(((SaslMessageToken) msg)
-							.getSaslToken()));
-
-			// Send response to client.
-			channel.write(saslTokenMessageRequest);
-
-			if (saslNettyServer.isComplete()) {
-				// If authentication of client is complete, we will also send a
-				// SASL-Complete message to the client.
-				LOG.debug("SASL authentication is complete for client with "
-						+ "username: " + saslNettyServer.getUserName());
-				channel.write(ControlMessage.SASL_COMPLETE_REQUEST);
-				LOG.debug("Removing SaslServerHandler from pipeline since SASL "
-						+ "authentication is complete.");
-				ctx.getPipeline().remove(this);
-			}
-			return;
-		} else {
-			// Client should not be sending other-than-SASL messages before
-			// SaslServerHandler has removed itself from the pipeline. Such
-			// non-SASL requests will be denied by the Authorize channel handler
-			// (the next handler upstream in the server pipeline) if SASL
-			// authentication has not completed.
-			LOG.warn("Sending upstream an unexpected non-SASL message :  "
-					+ msg);
-			Channels.fireMessageReceived(ctx, msg);
-		}
-	}
-
-	@Override
-	public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
-		server.closeChannel(e.getChannel());
-	}
-
-	private void getSASLCredentials() throws IOException {
-		topologyName = (String) this.server.storm_conf
-				.get(Config.TOPOLOGY_NAME);
-		String secretKey = SaslUtils.getSecretKey(this.server.storm_conf);
-		if (secretKey != null) {
-			token = secretKey.getBytes();
-		}
-		LOG.debug("SASL credentials for storm topology " + topologyName
-				+ " is " + secretKey);
-	}
+    Server server;
+    /** Used for client or server's token to send or receive from each other. */
+    private byte[] token;
+    private String topologyName;
+
+    private static final Logger LOG = LoggerFactory
+            .getLogger(SaslStormServerHandler.class);
+
+    public SaslStormServerHandler(Server server) throws IOException {
+        this.server = server;
+        getSASLCredentials();
+    }
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+            throws Exception {
+        Object msg = e.getMessage();
+        if (msg == null)
+            return;
+
+        Channel channel = ctx.getChannel();
+
+        if (msg instanceof ControlMessage
+                && ((ControlMessage) e.getMessage()) == ControlMessage.SASL_TOKEN_MESSAGE_REQUEST) {
+            // initialize server-side SASL functionality, if we haven't yet
+            // (in which case we are looking at the first SASL message from the
+            // client).
+            SaslNettyServer saslNettyServer = SaslNettyServerState.getSaslNettyServer
+                    .get(channel);
+            if (saslNettyServer == null) {
+                LOG.debug("No saslNettyServer for " + channel
+                        + " yet; creating now, with topology token: ");
+                try {
+                    saslNettyServer = new SaslNettyServer(topologyName, token);
+                } catch (IOException ioe) {
+                    LOG.error("Error occurred while creating saslNettyServer on server "
+                            + channel.getLocalAddress()
+                            + " for client "
+                            + channel.getRemoteAddress());
+                    saslNettyServer = null;
+                }
+
+                SaslNettyServerState.getSaslNettyServer.set(channel,
+                        saslNettyServer);
+            } else {
+                LOG.debug("Found existing saslNettyServer on server:"
+                        + channel.getLocalAddress() + " for client "
+                        + channel.getRemoteAddress());
+            }
+
+            LOG.debug("processToken:  With nettyServer: " + saslNettyServer
+                    + " and token length: " + token.length);
+
+            SaslMessageToken saslTokenMessageRequest = null;
+            saslTokenMessageRequest = new SaslMessageToken(
+                    saslNettyServer.response(new byte[0]));
+            // Send response to client.
+            channel.write(saslTokenMessageRequest);
+            // do not send upstream to other handlers: no further action needs
+            // to be done for SASL_TOKEN_MESSAGE_REQUEST requests.
+            return;
+        }
+
+        if (msg instanceof SaslMessageToken) {
+            // initialize server-side SASL functionality, if we haven't yet
+            // (in which case we are looking at the first SASL message from the
+            // client).
+            SaslNettyServer saslNettyServer = SaslNettyServerState.getSaslNettyServer
+                    .get(channel);
+            if (saslNettyServer == null) {
+                if (saslNettyServer == null) {
+                    throw new Exception("saslNettyServer was unexpectedly "
+                            + "null for channel: " + channel);
+                }
+            }
+            SaslMessageToken saslTokenMessageRequest = new SaslMessageToken(
+                    saslNettyServer.response(((SaslMessageToken) msg)
+                            .getSaslToken()));
+
+            // Send response to client.
+            channel.write(saslTokenMessageRequest);
+
+            if (saslNettyServer.isComplete()) {
+                // If authentication of client is complete, we will also send a
+                // SASL-Complete message to the client.
+                LOG.debug("SASL authentication is complete for client with "
+                        + "username: " + saslNettyServer.getUserName());
+                channel.write(ControlMessage.SASL_COMPLETE_REQUEST);
+                LOG.debug("Removing SaslServerHandler from pipeline since SASL "
+                        + "authentication is complete.");
+                ctx.getPipeline().remove(this);
+            }
+            return;
+        } else {
+            // Client should not be sending other-than-SASL messages before
+            // SaslServerHandler has removed itself from the pipeline. Such
+            // non-SASL requests will be denied by the Authorize channel handler
+            // (the next handler upstream in the server pipeline) if SASL
+            // authentication has not completed.
+            LOG.warn("Sending upstream an unexpected non-SASL message :  "
+                    + msg);
+            Channels.fireMessageReceived(ctx, msg);
+        }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+        server.closeChannel(e.getChannel());
+    }
+
+    private void getSASLCredentials() throws IOException {
+        topologyName = (String) this.server.storm_conf
+                .get(Config.TOPOLOGY_NAME);
+        String secretKey = SaslUtils.getSecretKey(this.server.storm_conf);
+        if (secretKey != null) {
+            token = secretKey.getBytes();
+        }
+        LOG.debug("SASL credentials for storm topology " + topologyName
+                + " is " + secretKey);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/3bce04ce/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
index 0f96233..a4cc0ba 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
@@ -28,48 +28,47 @@ import org.apache.commons.codec.binary.Base64;
 import backtype.storm.Config;
 
 class SaslUtils {
-	public static final String AUTH_DIGEST_MD5 = "DIGEST-MD5";
-	public static final String DEFAULT_REALM = "default";
+    public static final String AUTH_DIGEST_MD5 = "DIGEST-MD5";
+    public static final String DEFAULT_REALM = "default";
 
-	static Map<String, String> getSaslProps() {
-		Map<String, String> props = new HashMap<String, String>();
-		props.put(Sasl.POLICY_NOPLAINTEXT, "true");
-		return props;
-	}
+    static Map<String, String> getSaslProps() {
+        Map<String, String> props = new HashMap<String, String>();
+        props.put(Sasl.POLICY_NOPLAINTEXT, "true");
+        return props;
+    }
 
-	/**
-	 * Encode a password as a base64-encoded char[] array.
-	 * 
-	 * @param password
-	 *            as a byte array.
-	 * @return password as a char array.
-	 */
-	static char[] encodePassword(byte[] password) {
-		return new String(Base64.encodeBase64(password),
-				Charset.defaultCharset()).toCharArray();
-	}
+    /**
+     * Encode a password as a base64-encoded char[] array.
+     * 
+     * @param password
+     *            as a byte array.
+     * @return password as a char array.
+     */
+    static char[] encodePassword(byte[] password) {
+        return new String(Base64.encodeBase64(password),
+                Charset.defaultCharset()).toCharArray();
+    }
 
-	/**
-	 * Encode a identifier as a base64-encoded char[] array.
-	 * 
-	 * @param identifier
-	 *            as a byte array.
-	 * @return identifier as a char array.
-	 */
-	static String encodeIdentifier(byte[] identifier) {
-		return new String(Base64.encodeBase64(identifier),
-				Charset.defaultCharset());
-	}
+    /**
+     * Encode a identifier as a base64-encoded char[] array.
+     * 
+     * @param identifier
+     *            as a byte array.
+     * @return identifier as a char array.
+     */
+    static String encodeIdentifier(byte[] identifier) {
+        return new String(Base64.encodeBase64(identifier),
+                Charset.defaultCharset());
+    }
 
-	static String getSecretKey(Map conf) {
-		if (conf == null || conf.isEmpty())
-			return null;
+    static String getSecretKey(Map conf) {
+        if (conf == null || conf.isEmpty())
+            return null;
 
-		String secretPayLoad = (String) conf
-				.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD);
+        String secretPayLoad = (String) conf
+                .get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD);
+
+        return secretPayLoad;
+    }
 
-		return secretPayLoad;
-	}
-	
-	
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/3bce04ce/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
index 4fdaee9..1ea382b 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
@@ -27,31 +27,27 @@ class StormClientPipelineFactory implements ChannelPipelineFactory {
     private Client client;
 
     StormClientPipelineFactory(Client client) {
-        this.client = client;        
+        this.client = client;
     }
 
     public ChannelPipeline getPipeline() throws Exception {
         // Create a default pipeline implementation.
         ChannelPipeline pipeline = Channels.pipeline();
 
-        boolean isNettyAuth = (Boolean) this.client.storm_conf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION);
-        if(isNettyAuth) {
-	        // Decoder
-	        pipeline.addLast("decoder", new MessageDecoder());
-	        // Encoder
-	        pipeline.addLast("encoder", new MessageEncoder());
-	        // Authenticate: Removed after authentication completes
-	        pipeline.addLast("saslClientHandler", new SaslStormClientHandler(client));
-	        // business logic.
-	        pipeline.addLast("handler", new StormClientErrorHandler(client.name()));
-        } else {
-        	// Decoder
-	        pipeline.addLast("decoder", new MessageDecoder());
-	        // Encoder
-	        pipeline.addLast("encoder", new MessageEncoder());
-	        // business logic.
-	        pipeline.addLast("handler", new StormClientErrorHandler(client.name()));
+        // Decoder
+        pipeline.addLast("decoder", new MessageDecoder());
+        // Encoder
+        pipeline.addLast("encoder", new MessageEncoder());
+
+        boolean isNettyAuth = (Boolean) this.client.storm_conf
+                .get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION);
+        if (isNettyAuth) {
+            // Authenticate: Removed after authentication completes
+            pipeline.addLast("saslClientHandler", new SaslStormClientHandler(
+                    client));
         }
+        // business logic.
+        pipeline.addLast("handler", new StormClientErrorHandler(client.name()));
 
         return pipeline;
     }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/3bce04ce/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
index c2b4c53..f6e20c5 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
@@ -22,41 +22,36 @@ import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.Channels;
 
 import backtype.storm.Config;
-import backtype.storm.utils.Utils;
 
-
-class StormServerPipelineFactory implements  ChannelPipelineFactory {
+class StormServerPipelineFactory implements ChannelPipelineFactory {
     private Server server;
-    
+
     StormServerPipelineFactory(Server server) {
-        this.server = server;        
+        this.server = server;
     }
-    
+
     public ChannelPipeline getPipeline() throws Exception {
         // Create a default pipeline implementation.
         ChannelPipeline pipeline = Channels.pipeline();
 
-        boolean isNettyAuth = (Boolean) this.server.storm_conf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION);
-        if(isNettyAuth) {
-        	// Decoder
-            pipeline.addLast("decoder", new MessageDecoder());
-            // Encoder
-            pipeline.addLast("encoder", new MessageEncoder());
+        // Decoder
+        pipeline.addLast("decoder", new MessageDecoder());
+        // Encoder
+        pipeline.addLast("encoder", new MessageEncoder());
+
+        boolean isNettyAuth = (Boolean) this.server.storm_conf
+                .get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION);
+        if (isNettyAuth) {
             // Authenticate: Removed after authentication completes
-            pipeline.addLast("saslServerHandler", new SaslStormServerHandler(server));
+            pipeline.addLast("saslServerHandler", new SaslStormServerHandler(
+                    server));
             // Authorize
-            pipeline.addLast("authorizeServerHandler", new SaslStormServerAuthorizeHandler());
-            // business logic.
-            pipeline.addLast("handler", new StormServerHandler(server));
-        } else {
-        	// Decoder
-            pipeline.addLast("decoder", new MessageDecoder());
-            // Encoder
-            pipeline.addLast("encoder", new MessageEncoder());
-            // business logic.
-            pipeline.addLast("handler", new StormServerHandler(server));
+            pipeline.addLast("authorizeServerHandler",
+                    new SaslStormServerAuthorizeHandler());
         }
-        
+        // business logic.
+        pipeline.addLast("handler", new StormServerHandler(server));
+
         return pipeline;
     }
 }


[3/9] git commit: STORM-348: Netty SASL Authentication

Posted by bo...@apache.org.
STORM-348: Netty SASL Authentication


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/63e90822
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/63e90822
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/63e90822

Branch: refs/heads/security
Commit: 63e90822056daf0a41c0778c0984b7371459a3f2
Parents: 133c398
Author: Raghavendra Nandagopal <sp...@gmail.com>
Authored: Thu Jul 24 15:57:31 2014 -0700
Committer: Raghavendra Nandagopal <sp...@gmail.com>
Committed: Thu Jul 24 15:57:31 2014 -0700

----------------------------------------------------------------------
 conf/defaults.yaml | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/63e90822/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 7f17054..ee66717 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -153,6 +153,9 @@ storm.messaging.netty.transfer.batch.size: 262144
 # We check with this interval that whether the Netty channel is writable and try to write pending messages if it is.
 storm.messaging.netty.flush.check.interval.ms: 10
 
+# By default, the Netty SASL authentication is set to false.  Users can override and set it true for a specific topology.
+storm.messaging.netty.authentication: false
+
 ### topology.* configs are for specific executing storms
 topology.enable.message.timeouts: true
 topology.debug: false


[2/9] git commit: STORM-348: Netty SASL Authentication

Posted by bo...@apache.org.
STORM-348: Netty SASL Authentication


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/133c398e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/133c398e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/133c398e

Branch: refs/heads/security
Commit: 133c398ee9f5799fb3d702e27c8f83b969fc0341
Parents: 4198644
Author: Raghavendra Nandagopal <sp...@gmail.com>
Authored: Thu Jul 24 15:47:51 2014 -0700
Committer: Raghavendra Nandagopal <sp...@gmail.com>
Committed: Thu Jul 24 15:47:51 2014 -0700

----------------------------------------------------------------------
 .../storm/messaging/netty/SaslNettyClient.java  |  9 +++--
 .../storm/messaging/netty/SaslNettyServer.java  | 37 +++++++++-----------
 .../messaging/netty/SaslStormClientHandler.java | 24 ++++++-------
 .../messaging/netty/SaslStormServerHandler.java | 26 ++++++--------
 .../storm/messaging/netty/SaslUtils.java        | 14 ++++++++
 5 files changed, 56 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/133c398e/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java
index a4f1b5e..fedcfff 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java
@@ -50,7 +50,7 @@ public class SaslNettyClient {
 	/**
 	 * Create a SaslNettyClient for authentication with servers.
 	 */
-	public SaslNettyClient(String topologyUser) {
+	public SaslNettyClient(String topologyName, byte[] token) {
 		try {
 			LOG.debug("SaslNettyClient: Creating SASL "
 					+ SaslUtils.AUTH_DIGEST_MD5
@@ -59,7 +59,7 @@ public class SaslNettyClient {
 			saslClient = Sasl.createSaslClient(
 					new String[] { SaslUtils.AUTH_DIGEST_MD5 }, null, null,
 					SaslUtils.DEFAULT_REALM, SaslUtils.getSaslProps(),
-					new SaslClientCallbackHandler(topologyUser));
+					new SaslClientCallbackHandler(topologyName, token));
 
 		} catch (IOException e) {
 			LOG.error("SaslNettyClient: Could not obtain topology token for Netty "
@@ -107,11 +107,10 @@ public class SaslNettyClient {
 		 * 
 		 * @param topologyToken
 		 */
-		public SaslClientCallbackHandler(String topologyToken) {
+		public SaslClientCallbackHandler(String topologyToken, byte[] token) {
 			this.userName = SaslUtils
 					.encodeIdentifier(topologyToken.getBytes());
-			this.userPassword = SaslUtils.encodePassword(topologyToken
-					.getBytes());
+			this.userPassword = SaslUtils.encodePassword(token);
 		}
 
 		/**

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/133c398e/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServer.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServer.java
index 1178bd6..2cb47d9 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServer.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServer.java
@@ -44,14 +44,14 @@ class SaslNettyServer {
 
 	private SaslServer saslServer;
 
-	SaslNettyServer(String topologyToken) throws IOException {
-		LOG.debug("SaslNettyServer: Topology token is: " + topologyToken
+	SaslNettyServer(String topologyName, byte[] token) throws IOException {
+		LOG.debug("SaslNettyServer: Topology token is: " + topologyName
 				+ " with authmethod " + SaslUtils.AUTH_DIGEST_MD5);
 
 		try {
 
 			SaslDigestCallbackHandler ch = new SaslNettyServer.SaslDigestCallbackHandler(
-					topologyToken);
+					topologyName, token);
 
 			saslServer = Sasl.createSaslServer(SaslUtils.AUTH_DIGEST_MD5, null,
 					SaslUtils.DEFAULT_REALM, SaslUtils.getSaslProps(), ch);
@@ -70,18 +70,18 @@ class SaslNettyServer {
 		return saslServer.getAuthorizationID();
 	}
 
-	
-
 	/** CallbackHandler for SASL DIGEST-MD5 mechanism */
 	public static class SaslDigestCallbackHandler implements CallbackHandler {
 
 		/** Used to authenticate the clients */
-		private String topologyToken;
+		private byte[] userPassword;
+		private String userName;
 
-		public SaslDigestCallbackHandler(String topologyToken) {
+		public SaslDigestCallbackHandler(String topologyName, byte[] token) {
 			LOG.debug("SaslDigestCallback: Creating SaslDigestCallback handler "
-					+ "with topology token: " + topologyToken);
-			this.topologyToken = topologyToken;
+					+ "with topology token: " + topologyName);
+			this.userName = topologyName;
+			this.userPassword = token;
 		}
 
 		@Override
@@ -105,19 +105,19 @@ class SaslNettyServer {
 							"handle: Unrecognized SASL DIGEST-MD5 Callback");
 				}
 			}
-			
-			if(nc!=null) {
+
+			if (nc != null) {
 				LOG.debug("handle: SASL server DIGEST-MD5 callback: setting "
-						+ "username for client: " + topologyToken);
+						+ "username for client: " + userName);
 
-				nc.setName(topologyToken);
+				nc.setName(userName);
 			}
 
 			if (pc != null) {
-				char[] password = SaslUtils.encodePassword(topologyToken.getBytes());
+				char[] password = SaslUtils.encodePassword(userPassword);
 
 				LOG.debug("handle: SASL server DIGEST-MD5 callback: setting "
-						+ "password for client: " + topologyToken);
+						+ "password for client: " + userPassword);
 
 				pc.setPassword(password);
 			}
@@ -133,11 +133,8 @@ class SaslNettyServer {
 				}
 
 				if (ac.isAuthorized()) {
-					if (LOG.isDebugEnabled()) {
-						String username = topologyToken;
-						LOG.debug("handle: SASL server DIGEST-MD5 callback: setting "
-								+ "canonicalized client ID: " + username);
-					}
+					LOG.debug("handle: SASL server DIGEST-MD5 callback: setting "
+							+ "canonicalized client ID: " + userName);
 					ac.setAuthorizedID(authzid);
 				}
 			}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/133c398e/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
index ca38c96..59c4abd 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
@@ -38,12 +38,12 @@ public class SaslStormClientHandler extends SimpleChannelUpstreamHandler {
 	long start_time;
 	/** Used for client or server's token to send or receive from each other. */
 	private byte[] token;
-	private String topologyUser;
+	private String topologyName;
 
 	public SaslStormClientHandler(Client client) throws IOException {
 		this.client = client;
 		start_time = System.currentTimeMillis();
-		loadTopologyToken();
+		getSASLCredentials();
 	}
 
 	@Override
@@ -62,7 +62,7 @@ public class SaslStormClientHandler extends SimpleChannelUpstreamHandler {
 			if (saslNettyClient == null) {
 				LOG.debug("Creating saslNettyClient now " + "for channel: "
 						+ channel);
-				saslNettyClient = new SaslNettyClient(topologyUser);
+				saslNettyClient = new SaslNettyClient(topologyName, token);
 				SaslNettyClientState.getSaslNettyClient.set(channel,
 						saslNettyClient);
 			}
@@ -143,17 +143,13 @@ public class SaslStormClientHandler extends SimpleChannelUpstreamHandler {
 		channel.write(saslResponse);
 	}
 
-	/**
-	 * Load Storm Topology Token.
-	 * 
-	 * @param conf
-	 *            Configuration
-	 * @throws IOException
-	 */
-	private void loadTopologyToken() throws IOException {
-		topologyUser = (String) this.client.storm_conf
+	private void getSASLCredentials() throws IOException {
+		topologyName = (String) this.client.storm_conf
 				.get(Config.TOPOLOGY_NAME);
-		LOG.debug("SASL credentials is the storm user name: " + topologyUser);
-		token = topologyUser.getBytes();
+		String secretKey = SaslUtils.getSecretKey(this.client.storm_conf);
+		if(secretKey!=null) {
+			token = secretKey.getBytes();	
+		}
+		LOG.debug("SASL credentials for storm topology "+topologyName+ " is "+secretKey);
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/133c398e/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
index 2e8bcac..d06e960 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
@@ -35,14 +35,14 @@ public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
 	Server server;
 	/** Used for client or server's token to send or receive from each other. */
 	private byte[] token;
-	private String topologyUser;
+	private String topologyName;
 
 	private static final Logger LOG = LoggerFactory
 			.getLogger(SaslStormServerHandler.class);
 
 	public SaslStormServerHandler(Server server) throws IOException {
 		this.server = server;
-		loadTopologyToken();
+		getSASLCredentials();
 	}
 
 	@Override
@@ -53,7 +53,6 @@ public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
 			return;
 
 		Channel channel = ctx.getChannel();
-		LOG.debug("messageReceived: Got " + msg.getClass());
 
 		if (msg instanceof ControlMessage
 				&& ((ControlMessage) e.getMessage()) == ControlMessage.SASL_TOKEN_MESSAGE_REQUEST) {
@@ -66,7 +65,7 @@ public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
 				LOG.debug("No saslNettyServer for " + channel
 						+ " yet; creating now, with topology token: ");
 				try {
-					saslNettyServer = new SaslNettyServer(topologyUser);
+					saslNettyServer = new SaslNettyServer(topologyName, token);
 				} catch (IOException ioe) {
 					LOG.error("Error occurred while creating saslNettyServer on server "
 							+ channel.getLocalAddress()
@@ -143,17 +142,14 @@ public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
 		server.closeChannel(e.getChannel());
 	}
 
-	/**
-	 * Load Storm Topology Token.
-	 * 
-	 * @param conf
-	 *            Configuration
-	 * @throws IOException
-	 */
-	private void loadTopologyToken() throws IOException {
-		topologyUser = (String) this.server.storm_conf
+	private void getSASLCredentials() throws IOException {
+		topologyName = (String) this.server.storm_conf
 				.get(Config.TOPOLOGY_NAME);
-		LOG.debug("SASL credentials for the storm topology: " + topologyUser);
-		token = topologyUser.getBytes();
+		String secretKey = SaslUtils.getSecretKey(this.server.storm_conf);
+		if (secretKey != null) {
+			token = secretKey.getBytes();
+		}
+		LOG.debug("SASL credentials for storm topology " + topologyName
+				+ " is " + secretKey);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/133c398e/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
index 0077cf3..0f96233 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
@@ -25,6 +25,8 @@ import javax.security.sasl.Sasl;
 
 import org.apache.commons.codec.binary.Base64;
 
+import backtype.storm.Config;
+
 class SaslUtils {
 	public static final String AUTH_DIGEST_MD5 = "DIGEST-MD5";
 	public static final String DEFAULT_REALM = "default";
@@ -58,4 +60,16 @@ class SaslUtils {
 		return new String(Base64.encodeBase64(identifier),
 				Charset.defaultCharset());
 	}
+
+	static String getSecretKey(Map conf) {
+		if (conf == null || conf.isEmpty())
+			return null;
+
+		String secretPayLoad = (String) conf
+				.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD);
+
+		return secretPayLoad;
+	}
+	
+	
 }