You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/30 18:16:38 UTC

[2/9] git commit: STORM-348: Netty SASL Authentication

STORM-348: Netty SASL Authentication


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/133c398e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/133c398e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/133c398e

Branch: refs/heads/security
Commit: 133c398ee9f5799fb3d702e27c8f83b969fc0341
Parents: 4198644
Author: Raghavendra Nandagopal <sp...@gmail.com>
Authored: Thu Jul 24 15:47:51 2014 -0700
Committer: Raghavendra Nandagopal <sp...@gmail.com>
Committed: Thu Jul 24 15:47:51 2014 -0700

----------------------------------------------------------------------
 .../storm/messaging/netty/SaslNettyClient.java  |  9 +++--
 .../storm/messaging/netty/SaslNettyServer.java  | 37 +++++++++-----------
 .../messaging/netty/SaslStormClientHandler.java | 24 ++++++-------
 .../messaging/netty/SaslStormServerHandler.java | 26 ++++++--------
 .../storm/messaging/netty/SaslUtils.java        | 14 ++++++++
 5 files changed, 56 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/133c398e/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java
index a4f1b5e..fedcfff 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java
@@ -50,7 +50,7 @@ public class SaslNettyClient {
 	/**
 	 * Create a SaslNettyClient for authentication with servers.
 	 */
-	public SaslNettyClient(String topologyUser) {
+	public SaslNettyClient(String topologyName, byte[] token) {
 		try {
 			LOG.debug("SaslNettyClient: Creating SASL "
 					+ SaslUtils.AUTH_DIGEST_MD5
@@ -59,7 +59,7 @@ public class SaslNettyClient {
 			saslClient = Sasl.createSaslClient(
 					new String[] { SaslUtils.AUTH_DIGEST_MD5 }, null, null,
 					SaslUtils.DEFAULT_REALM, SaslUtils.getSaslProps(),
-					new SaslClientCallbackHandler(topologyUser));
+					new SaslClientCallbackHandler(topologyName, token));
 
 		} catch (IOException e) {
 			LOG.error("SaslNettyClient: Could not obtain topology token for Netty "
@@ -107,11 +107,10 @@ public class SaslNettyClient {
 		 * 
 		 * @param topologyToken
 		 */
-		public SaslClientCallbackHandler(String topologyToken) {
+		public SaslClientCallbackHandler(String topologyToken, byte[] token) {
 			this.userName = SaslUtils
 					.encodeIdentifier(topologyToken.getBytes());
-			this.userPassword = SaslUtils.encodePassword(topologyToken
-					.getBytes());
+			this.userPassword = SaslUtils.encodePassword(token);
 		}
 
 		/**

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/133c398e/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServer.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServer.java
index 1178bd6..2cb47d9 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServer.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServer.java
@@ -44,14 +44,14 @@ class SaslNettyServer {
 
 	private SaslServer saslServer;
 
-	SaslNettyServer(String topologyToken) throws IOException {
-		LOG.debug("SaslNettyServer: Topology token is: " + topologyToken
+	SaslNettyServer(String topologyName, byte[] token) throws IOException {
+		LOG.debug("SaslNettyServer: Topology token is: " + topologyName
 				+ " with authmethod " + SaslUtils.AUTH_DIGEST_MD5);
 
 		try {
 
 			SaslDigestCallbackHandler ch = new SaslNettyServer.SaslDigestCallbackHandler(
-					topologyToken);
+					topologyName, token);
 
 			saslServer = Sasl.createSaslServer(SaslUtils.AUTH_DIGEST_MD5, null,
 					SaslUtils.DEFAULT_REALM, SaslUtils.getSaslProps(), ch);
@@ -70,18 +70,18 @@ class SaslNettyServer {
 		return saslServer.getAuthorizationID();
 	}
 
-	
-
 	/** CallbackHandler for SASL DIGEST-MD5 mechanism */
 	public static class SaslDigestCallbackHandler implements CallbackHandler {
 
 		/** Used to authenticate the clients */
-		private String topologyToken;
+		private byte[] userPassword;
+		private String userName;
 
-		public SaslDigestCallbackHandler(String topologyToken) {
+		public SaslDigestCallbackHandler(String topologyName, byte[] token) {
 			LOG.debug("SaslDigestCallback: Creating SaslDigestCallback handler "
-					+ "with topology token: " + topologyToken);
-			this.topologyToken = topologyToken;
+					+ "with topology token: " + topologyName);
+			this.userName = topologyName;
+			this.userPassword = token;
 		}
 
 		@Override
@@ -105,19 +105,19 @@ class SaslNettyServer {
 							"handle: Unrecognized SASL DIGEST-MD5 Callback");
 				}
 			}
-			
-			if(nc!=null) {
+
+			if (nc != null) {
 				LOG.debug("handle: SASL server DIGEST-MD5 callback: setting "
-						+ "username for client: " + topologyToken);
+						+ "username for client: " + userName);
 
-				nc.setName(topologyToken);
+				nc.setName(userName);
 			}
 
 			if (pc != null) {
-				char[] password = SaslUtils.encodePassword(topologyToken.getBytes());
+				char[] password = SaslUtils.encodePassword(userPassword);
 
 				LOG.debug("handle: SASL server DIGEST-MD5 callback: setting "
-						+ "password for client: " + topologyToken);
+						+ "password for client: " + userPassword);
 
 				pc.setPassword(password);
 			}
@@ -133,11 +133,8 @@ class SaslNettyServer {
 				}
 
 				if (ac.isAuthorized()) {
-					if (LOG.isDebugEnabled()) {
-						String username = topologyToken;
-						LOG.debug("handle: SASL server DIGEST-MD5 callback: setting "
-								+ "canonicalized client ID: " + username);
-					}
+					LOG.debug("handle: SASL server DIGEST-MD5 callback: setting "
+							+ "canonicalized client ID: " + userName);
 					ac.setAuthorizedID(authzid);
 				}
 			}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/133c398e/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
index ca38c96..59c4abd 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
@@ -38,12 +38,12 @@ public class SaslStormClientHandler extends SimpleChannelUpstreamHandler {
 	long start_time;
 	/** Used for client or server's token to send or receive from each other. */
 	private byte[] token;
-	private String topologyUser;
+	private String topologyName;
 
 	public SaslStormClientHandler(Client client) throws IOException {
 		this.client = client;
 		start_time = System.currentTimeMillis();
-		loadTopologyToken();
+		getSASLCredentials();
 	}
 
 	@Override
@@ -62,7 +62,7 @@ public class SaslStormClientHandler extends SimpleChannelUpstreamHandler {
 			if (saslNettyClient == null) {
 				LOG.debug("Creating saslNettyClient now " + "for channel: "
 						+ channel);
-				saslNettyClient = new SaslNettyClient(topologyUser);
+				saslNettyClient = new SaslNettyClient(topologyName, token);
 				SaslNettyClientState.getSaslNettyClient.set(channel,
 						saslNettyClient);
 			}
@@ -143,17 +143,13 @@ public class SaslStormClientHandler extends SimpleChannelUpstreamHandler {
 		channel.write(saslResponse);
 	}
 
-	/**
-	 * Load Storm Topology Token.
-	 * 
-	 * @param conf
-	 *            Configuration
-	 * @throws IOException
-	 */
-	private void loadTopologyToken() throws IOException {
-		topologyUser = (String) this.client.storm_conf
+	private void getSASLCredentials() throws IOException {
+		topologyName = (String) this.client.storm_conf
 				.get(Config.TOPOLOGY_NAME);
-		LOG.debug("SASL credentials is the storm user name: " + topologyUser);
-		token = topologyUser.getBytes();
+		String secretKey = SaslUtils.getSecretKey(this.client.storm_conf);
+		if(secretKey!=null) {
+			token = secretKey.getBytes();	
+		}
+		LOG.debug("SASL credentials for storm topology "+topologyName+ " is "+secretKey);
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/133c398e/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
index 2e8bcac..d06e960 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
@@ -35,14 +35,14 @@ public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
 	Server server;
 	/** Used for client or server's token to send or receive from each other. */
 	private byte[] token;
-	private String topologyUser;
+	private String topologyName;
 
 	private static final Logger LOG = LoggerFactory
 			.getLogger(SaslStormServerHandler.class);
 
 	public SaslStormServerHandler(Server server) throws IOException {
 		this.server = server;
-		loadTopologyToken();
+		getSASLCredentials();
 	}
 
 	@Override
@@ -53,7 +53,6 @@ public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
 			return;
 
 		Channel channel = ctx.getChannel();
-		LOG.debug("messageReceived: Got " + msg.getClass());
 
 		if (msg instanceof ControlMessage
 				&& ((ControlMessage) e.getMessage()) == ControlMessage.SASL_TOKEN_MESSAGE_REQUEST) {
@@ -66,7 +65,7 @@ public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
 				LOG.debug("No saslNettyServer for " + channel
 						+ " yet; creating now, with topology token: ");
 				try {
-					saslNettyServer = new SaslNettyServer(topologyUser);
+					saslNettyServer = new SaslNettyServer(topologyName, token);
 				} catch (IOException ioe) {
 					LOG.error("Error occurred while creating saslNettyServer on server "
 							+ channel.getLocalAddress()
@@ -143,17 +142,14 @@ public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
 		server.closeChannel(e.getChannel());
 	}
 
-	/**
-	 * Load Storm Topology Token.
-	 * 
-	 * @param conf
-	 *            Configuration
-	 * @throws IOException
-	 */
-	private void loadTopologyToken() throws IOException {
-		topologyUser = (String) this.server.storm_conf
+	private void getSASLCredentials() throws IOException {
+		topologyName = (String) this.server.storm_conf
 				.get(Config.TOPOLOGY_NAME);
-		LOG.debug("SASL credentials for the storm topology: " + topologyUser);
-		token = topologyUser.getBytes();
+		String secretKey = SaslUtils.getSecretKey(this.server.storm_conf);
+		if (secretKey != null) {
+			token = secretKey.getBytes();
+		}
+		LOG.debug("SASL credentials for storm topology " + topologyName
+				+ " is " + secretKey);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/133c398e/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
index 0077cf3..0f96233 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
@@ -25,6 +25,8 @@ import javax.security.sasl.Sasl;
 
 import org.apache.commons.codec.binary.Base64;
 
+import backtype.storm.Config;
+
 class SaslUtils {
 	public static final String AUTH_DIGEST_MD5 = "DIGEST-MD5";
 	public static final String DEFAULT_REALM = "default";
@@ -58,4 +60,16 @@ class SaslUtils {
 		return new String(Base64.encodeBase64(identifier),
 				Charset.defaultCharset());
 	}
+
+	static String getSecretKey(Map conf) {
+		if (conf == null || conf.isEmpty())
+			return null;
+
+		String secretPayLoad = (String) conf
+				.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD);
+
+		return secretPayLoad;
+	}
+	
+	
 }