You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/30 18:16:37 UTC
[1/9] git commit: STORM-348: Netty SASL Authentication
Repository: incubator-storm
Updated Branches:
refs/heads/security 559c883d5 -> cf5fc0c3f
STORM-348: Netty SASL Authentication
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/41986445
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/41986445
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/41986445
Branch: refs/heads/security
Commit: 41986445fb89ff77c101fcdd6daccb945160e8a9
Parents: 642ed74
Author: Raghavendra Nandagopal <sp...@gmail.com>
Authored: Tue Jul 22 13:20:54 2014 -0700
Committer: Raghavendra Nandagopal <sp...@gmail.com>
Committed: Tue Jul 22 13:20:54 2014 -0700
----------------------------------------------------------------------
storm-core/src/jvm/backtype/storm/Config.java | 5 +
.../backtype/storm/messaging/netty/Client.java | 29 ++--
.../storm/messaging/netty/ControlMessage.java | 4 +-
.../storm/messaging/netty/MessageDecoder.java | 32 +++-
.../storm/messaging/netty/MessageEncoder.java | 4 +
.../storm/messaging/netty/SaslMessageToken.java | 100 +++++++++++
.../storm/messaging/netty/SaslNettyClient.java | 167 ++++++++++++++++++
.../messaging/netty/SaslNettyClientState.java | 31 ++++
.../storm/messaging/netty/SaslNettyServer.java | 168 +++++++++++++++++++
.../messaging/netty/SaslNettyServerState.java | 31 ++++
.../messaging/netty/SaslStormClientHandler.java | 159 ++++++++++++++++++
.../netty/SaslStormServerAuthorizeHandler.java | 83 +++++++++
.../messaging/netty/SaslStormServerHandler.java | 159 ++++++++++++++++++
.../storm/messaging/netty/SaslUtils.java | 61 +++++++
.../backtype/storm/messaging/netty/Server.java | 27 +--
.../netty/StormClientPipelineFactory.java | 26 ++-
.../netty/StormServerPipelineFactory.java | 31 +++-
17 files changed, 1077 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/41986445/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 3b3f7e5..46b120c 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -105,6 +105,11 @@ public class Config extends HashMap<String, Object> {
public static final String STORM_NETTY_FLUSH_CHECK_INTERVAL_MS = "storm.messaging.netty.flush.check.interval.ms";
public static final Object STORM_NETTY_FLUSH_CHECK_INTERVAL_MS_SCHEMA = Number.class;
+ /**
+ * Netty based messaging: Is authentication required for Netty messaging from client worker process to server worker process.
+ */
+ public static final String STORM_MESSAGING_NETTY_AUTHENTICATION = "storm.messaging.netty.authentication";
+ public static final Object STORM_MESSAGING_NETTY_AUTHENTICATION_SCHEMA = Boolean.class;
/**
* A list of hosts of ZooKeeper servers used to manage the cluster.
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/41986445/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 8d2d221..64a1757 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -17,17 +17,6 @@
*/
package backtype.storm.messaging.netty;
-import backtype.storm.Config;
-import backtype.storm.messaging.IConnection;
-import backtype.storm.messaging.TaskMessage;
-import backtype.storm.utils.Utils;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
@@ -39,6 +28,19 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.Config;
+import backtype.storm.messaging.IConnection;
+import backtype.storm.messaging.TaskMessage;
+import backtype.storm.utils.Utils;
+
public class Client implements IConnection {
private static final Logger LOG = LoggerFactory.getLogger(Client.class);
private static final String PREFIX = "Netty-Client-";
@@ -57,8 +59,10 @@ public class Client implements IConnection {
private int messageBatchSize;
private AtomicLong pendings;
+
+ Map storm_conf;
- MessageBatch messageBatch = null;
+ MessageBatch messageBatch = null;
private AtomicLong flushCheckTimer;
private int flushCheckInterval;
private ScheduledExecutorService scheduler;
@@ -66,6 +70,7 @@ public class Client implements IConnection {
@SuppressWarnings("rawtypes")
Client(Map storm_conf, ChannelFactory factory,
ScheduledExecutorService scheduler, String host, int port) {
+ this.storm_conf = storm_conf;
this.factory = factory;
this.scheduler = scheduler;
channelRef = new AtomicReference<Channel>(null);
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/41986445/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java b/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
index b7335b3..fb3efe6 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
@@ -27,7 +27,9 @@ enum ControlMessage {
CLOSE_MESSAGE((short)-100),
EOB_MESSAGE((short)-201),
OK_RESPONSE((short)-200),
- FAILURE_RESPONSE((short)-400);
+ FAILURE_RESPONSE((short)-400),
+ SASL_TOKEN_MESSAGE_REQUEST((short)-202),
+ SASL_COMPLETE_REQUEST((short)-203);
private short code;
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/41986445/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
index 72c3cf7..7d8bf54 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
@@ -68,8 +68,38 @@ public class MessageDecoder extends FrameDecoder {
return ctrl_msg;
}
}
+
+ //case 2: SaslTokenMessageRequest
+ if(code==-500) {
+ // Make sure that we have received at least an integer (length)
+ if (buf.readableBytes() < 4) {
+ //need more data
+ buf.resetReaderIndex();
+ return null;
+ }
+
+ // Read the length field.
+ int length = buf.readInt();
+ if (length<=0) {
+ return new SaslMessageToken(null);
+ }
+
+ // Make sure if there's enough bytes in the buffer.
+ if (buf.readableBytes() < length) {
+ // The whole bytes were not received yet - return null.
+ buf.resetReaderIndex();
+ return null;
+ }
+
+ // There's enough bytes in the buffer. Read it.
+ ChannelBuffer payload = buf.readBytes(length);
+
+ // Successfully decoded a frame.
+ // Return a SaslTokenMessageRequest object
+ return new SaslMessageToken(payload.array());
+ }
- // case 2: task Message
+ // case 3: task Message
short task = code;
// Make sure that we have received at least an integer (length)
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/41986445/storm-core/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
index e6e65c3..e5dd22f 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
@@ -32,6 +32,10 @@ public class MessageEncoder extends OneToOneEncoder {
return ((MessageBatch)obj).buffer();
}
+ if (obj instanceof SaslMessageToken) {
+ return ((SaslMessageToken)obj).buffer();
+ }
+
throw new RuntimeException("Unsupported encoding of object of class "+obj.getClass().getName());
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/41986445/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java
new file mode 100644
index 0000000..8383d2c
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferOutputStream;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Send and receive SASL tokens.
+ */
+public class SaslMessageToken {
+ /** Class logger */
+ private static final Logger LOG = LoggerFactory
+ .getLogger(SaslMessageToken.class);
+
+ /** Used for client or server's token to send or receive from each other. */
+ private byte[] token;
+
+ /**
+ * Constructor used for reflection only.
+ */
+ public SaslMessageToken() {
+ }
+
+ /**
+ * Constructor used to send request.
+ *
+ * @param token
+ * the SASL token, generated by a SaslClient or SaslServer.
+ */
+ public SaslMessageToken(byte[] token) {
+ this.token = token;
+ }
+
+ /**
+ * Read accessor for SASL token
+ *
+ * @return saslToken SASL token
+ */
+ public byte[] getSaslToken() {
+ return token;
+ }
+
+ /**
+ * Write accessor for SASL token
+ *
+ * @param token
+ * SASL token
+ */
+ public void setSaslToken(byte[] token) {
+ this.token = token;
+ }
+
+ int encodeLength() {
+ return 2+4+token.length;
+ }
+
+ /**
+ * encode the current SaslToken Message into a channel buffer
+ * SaslTokenMessageRequest is encoded as:
+ * identifier .... short(2) always it is -500
+ * payload length .... int
+ * payload .... byte[]
+ * @throws Exception
+ */
+ ChannelBuffer buffer() throws Exception {
+ ChannelBufferOutputStream bout = new ChannelBufferOutputStream(
+ ChannelBuffers.directBuffer(encodeLength()));
+ short identifier = -500;
+ int payload_len = 0;
+ if (token != null)
+ payload_len = token.length;
+
+ bout.writeShort((short)identifier);
+ bout.writeInt((int)payload_len);
+ if(payload_len>0) {
+ bout.write(token);
+ }
+ bout.close();
+ return bout.buffer();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/41986445/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java
new file mode 100644
index 0000000..a4f1b5e
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import java.io.IOException;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements SASL logic for storm worker client processes.
+ */
+public class SaslNettyClient {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(SaslNettyClient.class);
+
+ /**
+ * Used to respond to server's counterpart, SaslServer with SASL tokens
+ * represented as byte arrays.
+ */
+ private SaslClient saslClient;
+
+ /**
+ * Create a SaslNettyClient for authentication with servers.
+ */
+ public SaslNettyClient(String topologyUser) {
+ try {
+ LOG.debug("SaslNettyClient: Creating SASL "
+ + SaslUtils.AUTH_DIGEST_MD5
+ + " client to authenticate to server ");
+
+ saslClient = Sasl.createSaslClient(
+ new String[] { SaslUtils.AUTH_DIGEST_MD5 }, null, null,
+ SaslUtils.DEFAULT_REALM, SaslUtils.getSaslProps(),
+ new SaslClientCallbackHandler(topologyUser));
+
+ } catch (IOException e) {
+ LOG.error("SaslNettyClient: Could not obtain topology token for Netty "
+ + "Client to use to authenticate with a Netty Server.");
+ saslClient = null;
+ }
+ }
+
+ public boolean isComplete() {
+ return saslClient.isComplete();
+ }
+
+ /**
+ * Respond to server's SASL token.
+ *
+ * @param saslTokenMessage
+ * contains server's SASL token
+ * @return client's response SASL token
+ */
+ public byte[] saslResponse(SaslMessageToken saslTokenMessage) {
+ try {
+ byte[] retval = saslClient.evaluateChallenge(saslTokenMessage
+ .getSaslToken());
+ return retval;
+ } catch (SaslException e) {
+ LOG.error(
+ "saslResponse: Failed to respond to SASL server's token:",
+ e);
+ return null;
+ }
+ }
+
+ /**
+ * Implementation of javax.security.auth.callback.CallbackHandler that works
+ * with Storm topology tokens.
+ */
+ private static class SaslClientCallbackHandler implements CallbackHandler {
+ /** Generated username contained in TopologyToken */
+ private final String userName;
+ /** Generated password contained in TopologyToken */
+ private final char[] userPassword;
+
+ /**
+ * Set private members using topology token.
+ *
+ * @param topologyToken
+ */
+ public SaslClientCallbackHandler(String topologyToken) {
+ this.userName = SaslUtils
+ .encodeIdentifier(topologyToken.getBytes());
+ this.userPassword = SaslUtils.encodePassword(topologyToken
+ .getBytes());
+ }
+
+ /**
+ * Implementation used to respond to SASL tokens from server.
+ *
+ * @param callbacks
+ * objects that indicate what credential information the
+ * server's SaslServer requires from the client.
+ * @throws UnsupportedCallbackException
+ */
+ public void handle(Callback[] callbacks)
+ throws UnsupportedCallbackException {
+ NameCallback nc = null;
+ PasswordCallback pc = null;
+ RealmCallback rc = null;
+ for (Callback callback : callbacks) {
+ if (callback instanceof RealmChoiceCallback) {
+ continue;
+ } else if (callback instanceof NameCallback) {
+ nc = (NameCallback) callback;
+ } else if (callback instanceof PasswordCallback) {
+ pc = (PasswordCallback) callback;
+ } else if (callback instanceof RealmCallback) {
+ rc = (RealmCallback) callback;
+ } else {
+ throw new UnsupportedCallbackException(callback,
+ "handle: Unrecognized SASL client callback");
+ }
+ }
+ if (nc != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("handle: SASL client callback: setting username: "
+ + userName);
+ }
+ nc.setName(userName);
+ }
+ if (pc != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("handle: SASL client callback: setting userPassword");
+ }
+ pc.setPassword(userPassword);
+ }
+ if (rc != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("handle: SASL client callback: setting realm: "
+ + rc.getDefaultText());
+ }
+ rc.setText(rc.getDefaultText());
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/41986445/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClientState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClientState.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClientState.java
new file mode 100644
index 0000000..6df6c53
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClientState.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelLocal;
+
+final class SaslNettyClientState {
+
+ public static final ChannelLocal<SaslNettyClient> getSaslNettyClient = new ChannelLocal<SaslNettyClient>() {
+ protected SaslNettyClient initialValue(Channel channel) {
+ return null;
+ }
+ };
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/41986445/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServer.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServer.java
new file mode 100644
index 0000000..1178bd6
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServer.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+import org.apache.commons.codec.binary.Base64;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class SaslNettyServer {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(SaslNettyServer.class);
+
+ private SaslServer saslServer;
+
+ SaslNettyServer(String topologyToken) throws IOException {
+ LOG.debug("SaslNettyServer: Topology token is: " + topologyToken
+ + " with authmethod " + SaslUtils.AUTH_DIGEST_MD5);
+
+ try {
+
+ SaslDigestCallbackHandler ch = new SaslNettyServer.SaslDigestCallbackHandler(
+ topologyToken);
+
+ saslServer = Sasl.createSaslServer(SaslUtils.AUTH_DIGEST_MD5, null,
+ SaslUtils.DEFAULT_REALM, SaslUtils.getSaslProps(), ch);
+
+ } catch (SaslException e) {
+ LOG.error("SaslNettyServer: Could not create SaslServer: " + e);
+ }
+
+ }
+
+ public boolean isComplete() {
+ return saslServer.isComplete();
+ }
+
+ public String getUserName() {
+ return saslServer.getAuthorizationID();
+ }
+
+
+
+ /** CallbackHandler for SASL DIGEST-MD5 mechanism */
+ public static class SaslDigestCallbackHandler implements CallbackHandler {
+
+ /** Used to authenticate the clients */
+ private String topologyToken;
+
+ public SaslDigestCallbackHandler(String topologyToken) {
+ LOG.debug("SaslDigestCallback: Creating SaslDigestCallback handler "
+ + "with topology token: " + topologyToken);
+ this.topologyToken = topologyToken;
+ }
+
+ @Override
+ public void handle(Callback[] callbacks) throws IOException,
+ UnsupportedCallbackException {
+ NameCallback nc = null;
+ PasswordCallback pc = null;
+ AuthorizeCallback ac = null;
+
+ for (Callback callback : callbacks) {
+ if (callback instanceof AuthorizeCallback) {
+ ac = (AuthorizeCallback) callback;
+ } else if (callback instanceof NameCallback) {
+ nc = (NameCallback) callback;
+ } else if (callback instanceof PasswordCallback) {
+ pc = (PasswordCallback) callback;
+ } else if (callback instanceof RealmCallback) {
+ continue; // realm is ignored
+ } else {
+ throw new UnsupportedCallbackException(callback,
+ "handle: Unrecognized SASL DIGEST-MD5 Callback");
+ }
+ }
+
+ if(nc!=null) {
+ LOG.debug("handle: SASL server DIGEST-MD5 callback: setting "
+ + "username for client: " + topologyToken);
+
+ nc.setName(topologyToken);
+ }
+
+ if (pc != null) {
+ char[] password = SaslUtils.encodePassword(topologyToken.getBytes());
+
+ LOG.debug("handle: SASL server DIGEST-MD5 callback: setting "
+ + "password for client: " + topologyToken);
+
+ pc.setPassword(password);
+ }
+ if (ac != null) {
+
+ String authid = ac.getAuthenticationID();
+ String authzid = ac.getAuthorizationID();
+
+ if (authid.equals(authzid)) {
+ ac.setAuthorized(true);
+ } else {
+ ac.setAuthorized(false);
+ }
+
+ if (ac.isAuthorized()) {
+ if (LOG.isDebugEnabled()) {
+ String username = topologyToken;
+ LOG.debug("handle: SASL server DIGEST-MD5 callback: setting "
+ + "canonicalized client ID: " + username);
+ }
+ ac.setAuthorizedID(authzid);
+ }
+ }
+ }
+ }
+
+ /**
+ * Used by SaslTokenMessage::processToken() to respond to server SASL
+ * tokens.
+ *
+ * @param token
+ * Server's SASL token
+ * @return token to send back to the server.
+ */
+ public byte[] response(byte[] token) {
+ try {
+ LOG.debug("response: Responding to input token of length: "
+ + token.length);
+ byte[] retval = saslServer.evaluateResponse(token);
+ LOG.debug("response: Response token length: " + retval.length);
+ return retval;
+ } catch (SaslException e) {
+ LOG.error("response: Failed to evaluate client token of length: "
+ + token.length + " : " + e);
+ return null;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/41986445/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServerState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServerState.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServerState.java
new file mode 100644
index 0000000..9800959
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServerState.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelLocal;
+
+final class SaslNettyServerState {
+
+ public static final ChannelLocal<SaslNettyServer> getSaslNettyServer = new ChannelLocal<SaslNettyServer>() {
+ protected SaslNettyServer initialValue(Channel channel) {
+ return null;
+ }
+ };
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/41986445/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
new file mode 100644
index 0000000..ca38c96
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import java.io.IOException;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.Config;
+
+public class SaslStormClientHandler extends SimpleChannelUpstreamHandler {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(SaslStormClientHandler.class);
+ private Client client;
+ long start_time;
+ /** Used for client or server's token to send or receive from each other. */
+ private byte[] token;
+ private String topologyUser;
+
+ public SaslStormClientHandler(Client client) throws IOException {
+ this.client = client;
+ start_time = System.currentTimeMillis();
+ loadTopologyToken();
+ }
+
+ @Override
+ public void channelConnected(ChannelHandlerContext ctx,
+ ChannelStateEvent event) {
+ // register the newly established channel
+ Channel channel = ctx.getChannel();
+
+ LOG.info("Connection established from " + channel.getLocalAddress()
+ + " to " + channel.getRemoteAddress());
+
+ try {
+ SaslNettyClient saslNettyClient = SaslNettyClientState.getSaslNettyClient
+ .get(channel);
+
+ if (saslNettyClient == null) {
+ LOG.debug("Creating saslNettyClient now " + "for channel: "
+ + channel);
+ saslNettyClient = new SaslNettyClient(topologyUser);
+ SaslNettyClientState.getSaslNettyClient.set(channel,
+ saslNettyClient);
+ }
+ channel.write(ControlMessage.SASL_TOKEN_MESSAGE_REQUEST);
+ } catch (Exception e) {
+ LOG.error("Failed to authenticate with server " + "due to error: "
+ + e);
+ }
+ return;
+
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent event)
+ throws Exception {
+ LOG.debug("send/recv time (ms): {}",
+ (System.currentTimeMillis() - start_time));
+
+ Channel channel = ctx.getChannel();
+
+ // Generate SASL response to server using Channel-local SASL client.
+ SaslNettyClient saslNettyClient = SaslNettyClientState.getSaslNettyClient
+ .get(channel);
+ if (saslNettyClient == null) {
+ throw new Exception("saslNettyClient was unexpectedly "
+ + "null for channel: " + channel);
+ }
+
+ // examine the response message from server
+ if (event.getMessage() instanceof ControlMessage) {
+ ControlMessage msg = (ControlMessage) event.getMessage();
+ if (msg == ControlMessage.SASL_COMPLETE_REQUEST) {
+ LOG.debug("Server has sent us the SaslComplete "
+ + "message. Allowing normal work to proceed.");
+
+ if (!saslNettyClient.isComplete()) {
+ LOG.error("Server returned a Sasl-complete message, "
+ + "but as far as we can tell, we are not authenticated yet.");
+ throw new Exception("Server returned a "
+ + "Sasl-complete message, but as far as "
+ + "we can tell, we are not authenticated yet.");
+ }
+ ctx.getPipeline().remove(this);
+ // We call fireMessageReceived since the client is allowed to
+ // perform this request. The client's request will now proceed
+ // to the next pipeline component namely StormClientHandler.
+ Channels.fireMessageReceived(ctx, msg);
+ return;
+ }
+ }
+ SaslMessageToken saslTokenMessage = (SaslMessageToken) event
+ .getMessage();
+ LOG.debug("Responding to server's token of length: "
+ + saslTokenMessage.getSaslToken().length);
+
+ // Generate SASL response (but we only actually send the response if
+ // it's non-null.
+ byte[] responseToServer = saslNettyClient
+ .saslResponse(saslTokenMessage);
+ if (responseToServer == null) {
+ // If we generate a null response, then authentication has completed
+ // (if not, warn), and return without sending a response back to the
+ // server.
+ LOG.debug("Response to server is null: "
+ + "authentication should now be complete.");
+ if (!saslNettyClient.isComplete()) {
+ LOG.warn("Generated a null response, "
+ + "but authentication is not complete.");
+ }
+ return;
+ } else {
+ LOG.debug("Response to server token has length:"
+ + responseToServer.length);
+ }
+ // Construct a message containing the SASL response and send it to the
+ // server.
+ SaslMessageToken saslResponse = new SaslMessageToken(responseToServer);
+ channel.write(saslResponse);
+ }
+
+ /**
+ * Load Storm Topology Token.
+ *
+ * @param conf
+ * Configuration
+ * @throws IOException
+ */
+ private void loadTopologyToken() throws IOException {
+ topologyUser = (String) this.client.storm_conf
+ .get(Config.TOPOLOGY_NAME);
+ LOG.debug("SASL credentials is the storm user name: " + topologyUser);
+ token = topologyUser.getBytes();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/41986445/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerAuthorizeHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerAuthorizeHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerAuthorizeHandler.java
new file mode 100644
index 0000000..04cd66e
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerAuthorizeHandler.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Authorize or deny client requests based on existence and completeness of
+ * client's SASL authentication.
+ */
+public class SaslStormServerAuthorizeHandler extends SimpleChannelUpstreamHandler {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(SaslStormServerHandler.class);
+
+ /**
+ * Constructor.
+ */
+ public SaslStormServerAuthorizeHandler() {
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+ Object msg = e.getMessage();
+ if (msg == null)
+ return;
+
+ Channel channel = ctx.getChannel();
+ LOG.debug("messageReceived: Checking whether the client is authorized to send messages to the server ");
+
+ // Authorize: client is allowed to doRequest() if and only if the client
+ // has successfully authenticated with this server.
+ SaslNettyServer saslNettyServer = SaslNettyServerState.getSaslNettyServer
+ .get(channel);
+
+ if (saslNettyServer == null) {
+ LOG.warn("messageReceived: This client is *NOT* authorized to perform "
+ + "this action since there's no saslNettyServer to "
+ + "authenticate the client: "
+ + "refusing to perform requested action: " + msg);
+ return;
+ }
+
+ if (!saslNettyServer.isComplete()) {
+ LOG.warn("messageReceived: This client is *NOT* authorized to perform "
+ + "this action because SASL authentication did not complete: "
+ + "refusing to perform requested action: " + msg);
+ // Return now *WITHOUT* sending upstream here, since client
+ // not authorized.
+ return;
+ }
+
+ LOG.debug("messageReceived: authenticated client: "
+ + saslNettyServer.getUserName()
+ + " is authorized to do request " + "on server.");
+
+ // We call fireMessageReceived since the client is allowed to perform
+ // this request. The client's request will now proceed to the next
+ // pipeline component.
+ Channels.fireMessageReceived(ctx, msg);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/41986445/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
new file mode 100644
index 0000000..2e8bcac
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import java.io.IOException;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.Config;
+
+public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
+
+ Server server;
+ /** Used for client or server's token to send or receive from each other. */
+ private byte[] token;
+ private String topologyUser;
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(SaslStormServerHandler.class);
+
+ public SaslStormServerHandler(Server server) throws IOException {
+ this.server = server;
+ loadTopologyToken();
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ throws Exception {
+ Object msg = e.getMessage();
+ if (msg == null)
+ return;
+
+ Channel channel = ctx.getChannel();
+ LOG.debug("messageReceived: Got " + msg.getClass());
+
+ if (msg instanceof ControlMessage
+ && ((ControlMessage) e.getMessage()) == ControlMessage.SASL_TOKEN_MESSAGE_REQUEST) {
+ // initialize server-side SASL functionality, if we haven't yet
+ // (in which case we are looking at the first SASL message from the
+ // client).
+ SaslNettyServer saslNettyServer = SaslNettyServerState.getSaslNettyServer
+ .get(channel);
+ if (saslNettyServer == null) {
+ LOG.debug("No saslNettyServer for " + channel
+ + " yet; creating now, with topology token: ");
+ try {
+ saslNettyServer = new SaslNettyServer(topologyUser);
+ } catch (IOException ioe) {
+ LOG.error("Error occurred while creating saslNettyServer on server "
+ + channel.getLocalAddress()
+ + " for client "
+ + channel.getRemoteAddress());
+ throw new IOException(ioe);
+ }
+
+ SaslNettyServerState.getSaslNettyServer.set(channel,
+ saslNettyServer);
+ } else {
+ LOG.debug("Found existing saslNettyServer on server:"
+ + channel.getLocalAddress() + " for client "
+ + channel.getRemoteAddress());
+ }
+
+ LOG.debug("processToken: With nettyServer: " + saslNettyServer
+ + " and token length: " + token.length);
+
+ SaslMessageToken saslTokenMessageRequest = null;
+ saslTokenMessageRequest = new SaslMessageToken(
+ saslNettyServer.response(new byte[0]));
+ // Send response to client.
+ channel.write(saslTokenMessageRequest);
+ // do not send upstream to other handlers: no further action needs
+ // to be done for SASL_TOKEN_MESSAGE_REQUEST requests.
+ return;
+ }
+
+ if (msg instanceof SaslMessageToken) {
+ // initialize server-side SASL functionality, if we haven't yet
+ // (in which case we are looking at the first SASL message from the
+ // client).
+ SaslNettyServer saslNettyServer = SaslNettyServerState.getSaslNettyServer
+ .get(channel);
+ if (saslNettyServer == null) {
+ if (saslNettyServer == null) {
+ throw new Exception("saslNettyServer was unexpectedly "
+ + "null for channel: " + channel);
+ }
+ }
+ SaslMessageToken saslTokenMessageRequest = new SaslMessageToken(
+ saslNettyServer.response(((SaslMessageToken) msg)
+ .getSaslToken()));
+
+ // Send response to client.
+ channel.write(saslTokenMessageRequest);
+
+ if (saslNettyServer.isComplete()) {
+ // If authentication of client is complete, we will also send a
+ // SASL-Complete message to the client.
+ LOG.debug("SASL authentication is complete for client with "
+ + "username: " + saslNettyServer.getUserName());
+ channel.write(ControlMessage.SASL_COMPLETE_REQUEST);
+ LOG.debug("Removing SaslServerHandler from pipeline since SASL "
+ + "authentication is complete.");
+ ctx.getPipeline().remove(this);
+ }
+ return;
+ } else {
+ // Client should not be sending other-than-SASL messages before
+ // SaslServerHandler has removed itself from the pipeline. Such
+ // non-SASL requests will be denied by the Authorize channel handler
+ // (the next handler upstream in the server pipeline) if SASL
+ // authentication has not completed.
+ LOG.warn("Sending upstream an unexpected non-SASL message : "
+ + msg);
+ Channels.fireMessageReceived(ctx, msg);
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+ server.closeChannel(e.getChannel());
+ }
+
+ /**
+ * Load Storm Topology Token.
+ *
+ * @param conf
+ * Configuration
+ * @throws IOException
+ */
+ private void loadTopologyToken() throws IOException {
+ topologyUser = (String) this.server.storm_conf
+ .get(Config.TOPOLOGY_NAME);
+ LOG.debug("SASL credentials for the storm topology: " + topologyUser);
+ token = topologyUser.getBytes();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/41986445/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
new file mode 100644
index 0000000..0077cf3
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.security.sasl.Sasl;
+
+import org.apache.commons.codec.binary.Base64;
+
+class SaslUtils {
+ public static final String AUTH_DIGEST_MD5 = "DIGEST-MD5";
+ public static final String DEFAULT_REALM = "default";
+
+ static Map<String, String> getSaslProps() {
+ Map<String, String> props = new HashMap<String, String>();
+ props.put(Sasl.POLICY_NOPLAINTEXT, "true");
+ return props;
+ }
+
+ /**
+ * Encode a password as a base64-encoded char[] array.
+ *
+ * @param password
+ * as a byte array.
+ * @return password as a char array.
+ */
+ static char[] encodePassword(byte[] password) {
+ return new String(Base64.encodeBase64(password),
+ Charset.defaultCharset()).toCharArray();
+ }
+
+ /**
+ * Encode a identifier as a base64-encoded char[] array.
+ *
+ * @param identifier
+ * as a byte array.
+ * @return identifier as a char array.
+ */
+ static String encodeIdentifier(byte[] identifier) {
+ return new String(Base64.encodeBase64(identifier),
+ Charset.defaultCharset());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/41986445/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
index 20a147d..1b2590a 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
@@ -17,19 +17,6 @@
*/
package backtype.storm.messaging.netty;
-import backtype.storm.Config;
-import backtype.storm.messaging.IConnection;
-import backtype.storm.messaging.TaskMessage;
-import backtype.storm.utils.Utils;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
@@ -41,6 +28,20 @@ import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.Config;
+import backtype.storm.messaging.IConnection;
+import backtype.storm.messaging.TaskMessage;
+import backtype.storm.utils.Utils;
+
class Server implements IConnection {
private static final Logger LOG = LoggerFactory.getLogger(Server.class);
@SuppressWarnings("rawtypes")
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/41986445/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
index e6e8b3d..4fdaee9 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
@@ -21,6 +21,8 @@ import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
+import backtype.storm.Config;
+
class StormClientPipelineFactory implements ChannelPipelineFactory {
private Client client;
@@ -32,12 +34,24 @@ class StormClientPipelineFactory implements ChannelPipelineFactory {
// Create a default pipeline implementation.
ChannelPipeline pipeline = Channels.pipeline();
- // Decoder
- pipeline.addLast("decoder", new MessageDecoder());
- // Encoder
- pipeline.addLast("encoder", new MessageEncoder());
- // business logic.
- pipeline.addLast("handler", new StormClientErrorHandler(client.name()));
+ boolean isNettyAuth = (Boolean) this.client.storm_conf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION);
+ if(isNettyAuth) {
+ // Decoder
+ pipeline.addLast("decoder", new MessageDecoder());
+ // Encoder
+ pipeline.addLast("encoder", new MessageEncoder());
+ // Authenticate: Removed after authentication completes
+ pipeline.addLast("saslClientHandler", new SaslStormClientHandler(client));
+ // business logic.
+ pipeline.addLast("handler", new StormClientErrorHandler(client.name()));
+ } else {
+ // Decoder
+ pipeline.addLast("decoder", new MessageDecoder());
+ // Encoder
+ pipeline.addLast("encoder", new MessageEncoder());
+ // business logic.
+ pipeline.addLast("handler", new StormClientErrorHandler(client.name()));
+ }
return pipeline;
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/41986445/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
index df29ba8..c2b4c53 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
@@ -21,6 +21,9 @@ import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
+import backtype.storm.Config;
+import backtype.storm.utils.Utils;
+
class StormServerPipelineFactory implements ChannelPipelineFactory {
private Server server;
@@ -33,13 +36,27 @@ class StormServerPipelineFactory implements ChannelPipelineFactory {
// Create a default pipeline implementation.
ChannelPipeline pipeline = Channels.pipeline();
- // Decoder
- pipeline.addLast("decoder", new MessageDecoder());
- // Encoder
- pipeline.addLast("encoder", new MessageEncoder());
- // business logic.
- pipeline.addLast("handler", new StormServerHandler(server));
-
+ boolean isNettyAuth = (Boolean) this.server.storm_conf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION);
+ if(isNettyAuth) {
+ // Decoder
+ pipeline.addLast("decoder", new MessageDecoder());
+ // Encoder
+ pipeline.addLast("encoder", new MessageEncoder());
+ // Authenticate: Removed after authentication completes
+ pipeline.addLast("saslServerHandler", new SaslStormServerHandler(server));
+ // Authorize
+ pipeline.addLast("authorizeServerHandler", new SaslStormServerAuthorizeHandler());
+ // business logic.
+ pipeline.addLast("handler", new StormServerHandler(server));
+ } else {
+ // Decoder
+ pipeline.addLast("decoder", new MessageDecoder());
+ // Encoder
+ pipeline.addLast("encoder", new MessageEncoder());
+ // business logic.
+ pipeline.addLast("handler", new StormServerHandler(server));
+ }
+
return pipeline;
}
}
[5/9] git commit: STORM-348: Netty SASL Authentication
Posted by bo...@apache.org.
STORM-348: Netty SASL Authentication
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/19ad1351
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/19ad1351
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/19ad1351
Branch: refs/heads/security
Commit: 19ad13510cb42963f868fd8bd785b4937dd348ab
Parents: 3bce04c
Author: Raghavendra Nandagopal <sp...@gmail.com>
Authored: Mon Jul 28 17:42:05 2014 -0700
Committer: Raghavendra Nandagopal <sp...@gmail.com>
Committed: Mon Jul 28 17:42:05 2014 -0700
----------------------------------------------------------------------
.../src/jvm/backtype/storm/messaging/netty/SaslUtils.java | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/19ad1351/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
index a4cc0ba..a2d0b26 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
@@ -24,6 +24,7 @@ import java.util.Map;
import javax.security.sasl.Sasl;
import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.io.Charsets;
import backtype.storm.Config;
@@ -45,8 +46,8 @@ class SaslUtils {
* @return password as a char array.
*/
static char[] encodePassword(byte[] password) {
- return new String(Base64.encodeBase64(password),
- Charset.defaultCharset()).toCharArray();
+ return new String(Base64.encodeBase64(password), Charsets.UTF_8)
+ .toCharArray();
}
/**
@@ -57,8 +58,7 @@ class SaslUtils {
* @return identifier as a char array.
*/
static String encodeIdentifier(byte[] identifier) {
- return new String(Base64.encodeBase64(identifier),
- Charset.defaultCharset());
+ return new String(Base64.encodeBase64(identifier), Charsets.UTF_8);
}
static String getSecretKey(Map conf) {
[9/9] git commit: Updated README for STORM-348
Posted by bo...@apache.org.
Updated README for STORM-348
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/cf5fc0c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/cf5fc0c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/cf5fc0c3
Branch: refs/heads/security
Commit: cf5fc0c3f0a71d8e81d344f247a477694e83a2a7
Parents: 00f7d63
Author: Robert (Bobby) Evans <bo...@apache.org>
Authored: Wed Jul 30 11:15:43 2014 -0500
Committer: Robert (Bobby) Evans <bo...@apache.org>
Committed: Wed Jul 30 11:15:43 2014 -0500
----------------------------------------------------------------------
README.markdown | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/cf5fc0c3/README.markdown
----------------------------------------------------------------------
diff --git a/README.markdown b/README.markdown
index cf67f64..586f171 100644
--- a/README.markdown
+++ b/README.markdown
@@ -159,6 +159,7 @@ under the License.
* Jo Liss ([@joliss](https://github.com/joliss))
* averykhoo ([@averykhoo](https://github.com/averykhoo))
* Curtis Allen ([@curtisallen](https://github.com/curtisallen))
+* Raghavendra Nandagopal ([@RaghavendraNandagopal](https://github.com/RaghavendraNandagopal))
## Acknowledgements
[7/9] git commit: STORM-348: Netty SASL Authentication
Posted by bo...@apache.org.
STORM-348: Netty SASL Authentication
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/90cf0e6c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/90cf0e6c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/90cf0e6c
Branch: refs/heads/security
Commit: 90cf0e6c21df978462d29fe906ad0f2a6adcdb1f
Parents: 02e7949
Author: Raghavendra Nandagopal <sp...@gmail.com>
Authored: Tue Jul 29 17:32:38 2014 -0700
Committer: Raghavendra Nandagopal <sp...@gmail.com>
Committed: Tue Jul 29 17:32:38 2014 -0700
----------------------------------------------------------------------
.../test/clj/backtype/storm/messaging/netty_integration_test.clj | 1 +
storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj | 4 ++++
2 files changed, 5 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/90cf0e6c/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj b/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
index 8534c82..dea4abe 100644
--- a/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
@@ -25,6 +25,7 @@
(with-simulated-time-local-cluster [cluster :supervisors 4 :supervisor-slot-port-min 6710
:daemon-conf {STORM-LOCAL-MODE-ZMQ true
STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
+ STORM_MESSAGING_NETTY_AUTHENTICATION false
STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
STORM-MESSAGING-NETTY-MAX-RETRIES 10
STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/90cf0e6c/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
index ea7b8dc..04d25ec 100644
--- a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
@@ -26,6 +26,7 @@
(deftest test-basic
(let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
+ STORM_MESSAGING_NETTY_AUTHENTICATION false
STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
STORM-MESSAGING-NETTY-MAX-RETRIES 10
STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
@@ -48,6 +49,7 @@
(deftest test-large-msg
(let [req_msg (apply str (repeat 2048000 'c'))
storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
+ STORM_MESSAGING_NETTY_AUTHENTICATION false
STORM-MESSAGING-NETTY-BUFFER-SIZE 102400
STORM-MESSAGING-NETTY-MAX-RETRIES 10
STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
@@ -70,6 +72,7 @@
(deftest test-server-delayed
(let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
+ STORM_MESSAGING_NETTY_AUTHENTICATION false
STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
STORM-MESSAGING-NETTY-MAX-RETRIES 10
STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
@@ -99,6 +102,7 @@
(deftest test-batch
(let [storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
+ STORM_MESSAGING_NETTY_AUTHENTICATION false
STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
STORM-MESSAGING-NETTY-MAX-RETRIES 10
STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
[6/9] git commit: Merge remote-tracking branch 'upstream/security'
into security
Posted by bo...@apache.org.
Merge remote-tracking branch 'upstream/security' into security
Conflicts:
conf/defaults.yaml
storm-core/src/jvm/backtype/storm/Config.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/02e79499
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/02e79499
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/02e79499
Branch: refs/heads/security
Commit: 02e79499fa381b56f8a5333e00cf0cc0bd7aa252
Parents: 19ad135 559c883
Author: Raghavendra Nandagopal <sp...@gmail.com>
Authored: Tue Jul 29 16:37:50 2014 -0700
Committer: Raghavendra Nandagopal <sp...@gmail.com>
Committed: Tue Jul 29 16:37:50 2014 -0700
----------------------------------------------------------------------
BYLAWS.md | 96 ++++
CHANGELOG.md | 31 +
LICENSE | 30 +-
README.markdown | 14 +
STORM-UI-REST-API.md | 567 +++++++++++++++++++
bin/storm | 77 ++-
conf/defaults.yaml | 11 +-
conf/storm_env.ini | 2 +-
dev-tools/github/__init__.py | 109 ++++
dev-tools/jira-github-join.py | 80 +++
dev-tools/jira/__init__.py | 232 ++++++++
examples/storm-starter/README.markdown | 30 +-
.../storm-starter/multilang/resources/storm.py | 2 +-
.../src/jvm/storm/starter/RollingTopWords.java | 62 +-
.../src/jvm/storm/starter/util/StormRunner.java | 9 +
.../storm-kafka/src/jvm/storm/kafka/Broker.java | 9 +-
.../src/jvm/storm/kafka/KafkaConfig.java | 2 +-
.../src/jvm/storm/kafka/Partition.java | 9 +-
pom.xml | 6 +-
.../src/clj/backtype/storm/LocalCluster.clj | 7 +-
storm-core/src/clj/backtype/storm/cluster.clj | 31 +-
.../src/clj/backtype/storm/command/monitor.clj | 37 ++
.../src/clj/backtype/storm/daemon/common.clj | 2 +-
.../src/clj/backtype/storm/daemon/drpc.clj | 6 +-
.../src/clj/backtype/storm/daemon/executor.clj | 4 +-
.../src/clj/backtype/storm/daemon/nimbus.clj | 7 +-
.../clj/backtype/storm/daemon/supervisor.clj | 104 +++-
.../src/clj/backtype/storm/daemon/worker.clj | 72 +--
storm-core/src/clj/backtype/storm/disruptor.clj | 2 +-
storm-core/src/clj/backtype/storm/event.clj | 2 +-
storm-core/src/clj/backtype/storm/testing.clj | 49 +-
storm-core/src/clj/backtype/storm/ui/core.clj | 2 +-
.../src/clj/backtype/storm/ui/helpers.clj | 5 -
storm-core/src/clj/backtype/storm/util.clj | 52 +-
storm-core/src/clj/backtype/storm/zookeeper.clj | 25 +
.../src/dev/resources/tester_bolt_metrics.py | 35 ++
.../src/dev/resources/tester_spout_metrics.py | 51 ++
storm-core/src/jvm/backtype/storm/Config.java | 306 +++++-----
.../jvm/backtype/storm/ConfigValidation.java | 70 +++
.../backtype/storm/messaging/netty/Client.java | 13 +-
.../metric/api/rpc/AssignableShellMetric.java | 30 +
.../metric/api/rpc/CombinedShellMetric.java | 31 +
.../storm/metric/api/rpc/CountShellMetric.java | 38 ++
.../storm/metric/api/rpc/IShellMetric.java | 31 +
.../metric/api/rpc/ReducedShellMetric.java | 32 ++
.../storm/multilang/JsonSerializer.java | 15 +
.../jvm/backtype/storm/multilang/ShellMsg.java | 46 ++
.../backtype/storm/security/auth/AuthUtils.java | 49 +-
.../auth/IGroupMappingServiceProvider.java | 42 ++
.../security/auth/ShellBasedGroupsMapping.java | 94 +++
.../auth/authorizer/SimpleACLAuthorizer.java | 29 +-
.../src/jvm/backtype/storm/spout/ISpout.java | 2 +-
.../jvm/backtype/storm/spout/ShellSpout.java | 72 ++-
.../src/jvm/backtype/storm/task/ShellBolt.java | 75 ++-
.../backtype/storm/task/TopologyContext.java | 28 +
.../storm/testing/PythonShellMetricsBolt.java | 32 ++
.../storm/testing/PythonShellMetricsSpout.java | 35 ++
.../src/jvm/backtype/storm/utils/Monitor.java | 249 ++++++++
.../jvm/backtype/storm/utils/ShellProcess.java | 46 +-
.../jvm/backtype/storm/utils/ShellUtils.java | 498 ++++++++++++++++
.../src/jvm/backtype/storm/utils/Utils.java | 23 +-
storm-core/src/multilang/py/storm.py | 30 +-
storm-core/src/multilang/rb/storm.rb | 24 +-
storm-core/src/ui/public/component.html | 3 +-
.../src/ui/public/js/jquery.tablesorter.min.js | 9 +-
storm-core/src/ui/public/js/moment.min.js | 6 +
storm-core/src/ui/public/js/script.js | 9 +
.../test/clj/backtype/storm/cluster_test.clj | 3 +-
.../test/clj/backtype/storm/config_test.clj | 41 +-
.../test/clj/backtype/storm/metrics_test.clj | 206 ++++---
.../backtype/storm/security/auth/auth_test.clj | 11 +-
.../test/clj/backtype/storm/supervisor_test.clj | 135 ++++-
72 files changed, 3780 insertions(+), 454 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/02e79499/conf/defaults.yaml
----------------------------------------------------------------------
diff --cc conf/defaults.yaml
index ee66717,05948e1..3e00b3d
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@@ -153,9 -155,9 +155,12 @@@ storm.messaging.netty.transfer.batch.si
# We check with this interval that whether the Netty channel is writable and try to write pending messages if it is.
storm.messaging.netty.flush.check.interval.ms: 10
+# By default, the Netty SASL authentication is set to false. Users can override and set it true for a specific topology.
+storm.messaging.netty.authentication: false
+
+ # default number of seconds group mapping service will cache user group
+ storm.group.mapping.service.cache.duration.secs: 120
+
### topology.* configs are for specific executing storms
topology.enable.message.timeouts: true
topology.debug: false
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/02e79499/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/Config.java
index 46b120c,d6b45ea..94d1cb3
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@@ -103,13 -102,7 +102,13 @@@ public class Config extends HashMap<Str
* We check with this interval that whether the Netty channel is writable and try to write pending messages
*/
public static final String STORM_NETTY_FLUSH_CHECK_INTERVAL_MS = "storm.messaging.netty.flush.check.interval.ms";
- public static final Object STORM_NETTY_FLUSH_CHECK_INTERVAL_MS_SCHEMA = Number.class;
-
+ public static final Object STORM_NETTY_FLUSH_CHECK_INTERVAL_MS_SCHEMA = ConfigValidation.IntegerValidator;
++
+ /**
+ * Netty based messaging: Is authentication required for Netty messaging from client worker process to server worker process.
+ */
+ public static final String STORM_MESSAGING_NETTY_AUTHENTICATION = "storm.messaging.netty.authentication";
+ public static final Object STORM_MESSAGING_NETTY_AUTHENTICATION_SCHEMA = Boolean.class;
/**
* A list of hosts of ZooKeeper servers used to manage the cluster.
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/02e79499/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
[8/9] git commit: STORM-348: Netty SASL Authentication
Posted by bo...@apache.org.
STORM-348: Netty SASL Authentication
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/00f7d632
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/00f7d632
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/00f7d632
Branch: refs/heads/security
Commit: 00f7d632eb48bf967b5a7b72f5460352d0a8dbd9
Parents: 90cf0e6
Author: Raghavendra Nandagopal <sp...@gmail.com>
Authored: Tue Jul 29 17:50:24 2014 -0700
Committer: Raghavendra Nandagopal <sp...@gmail.com>
Committed: Tue Jul 29 17:50:24 2014 -0700
----------------------------------------------------------------------
.../clj/backtype/storm/messaging/netty_integration_test.clj | 2 +-
.../test/clj/backtype/storm/messaging/netty_unit_test.clj | 8 ++++----
2 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/00f7d632/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj b/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
index dea4abe..98144cc 100644
--- a/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
@@ -25,7 +25,7 @@
(with-simulated-time-local-cluster [cluster :supervisors 4 :supervisor-slot-port-min 6710
:daemon-conf {STORM-LOCAL-MODE-ZMQ true
STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
- STORM_MESSAGING_NETTY_AUTHENTICATION false
+ STORM-MESSAGING-NETTY-AUTHENTICATION false
STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
STORM-MESSAGING-NETTY-MAX-RETRIES 10
STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/00f7d632/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
index 04d25ec..8aaa7e5 100644
--- a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
@@ -26,7 +26,7 @@
(deftest test-basic
(let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
- STORM_MESSAGING_NETTY_AUTHENTICATION false
+ STORM-MESSAGING-NETTY-AUTHENTICATION false
STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
STORM-MESSAGING-NETTY-MAX-RETRIES 10
STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
@@ -49,7 +49,7 @@
(deftest test-large-msg
(let [req_msg (apply str (repeat 2048000 'c'))
storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
- STORM_MESSAGING_NETTY_AUTHENTICATION false
+ STORM-MESSAGING-NETTY-AUTHENTICATION false
STORM-MESSAGING-NETTY-BUFFER-SIZE 102400
STORM-MESSAGING-NETTY-MAX-RETRIES 10
STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
@@ -72,7 +72,7 @@
(deftest test-server-delayed
(let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
- STORM_MESSAGING_NETTY_AUTHENTICATION false
+ STORM-MESSAGING-NETTY-AUTHENTICATION false
STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
STORM-MESSAGING-NETTY-MAX-RETRIES 10
STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
@@ -102,7 +102,7 @@
(deftest test-batch
(let [storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
- STORM_MESSAGING_NETTY_AUTHENTICATION false
+ STORM-MESSAGING-NETTY-AUTHENTICATION false
STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
STORM-MESSAGING-NETTY-MAX-RETRIES 10
STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
[4/9] git commit: STORM-348: Netty SASL Authentication
Posted by bo...@apache.org.
STORM-348: Netty SASL Authentication
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/3bce04ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/3bce04ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/3bce04ce
Branch: refs/heads/security
Commit: 3bce04ceab9abfd6e6da211cb5a792d9df6c96c6
Parents: 63e9082
Author: Raghavendra Nandagopal <sp...@gmail.com>
Authored: Mon Jul 28 16:28:45 2014 -0700
Committer: Raghavendra Nandagopal <sp...@gmail.com>
Committed: Mon Jul 28 16:28:45 2014 -0700
----------------------------------------------------------------------
.../backtype/storm/messaging/netty/Client.java | 2 +-
.../storm/messaging/netty/SaslMessageToken.java | 123 +++++-----
.../storm/messaging/netty/SaslNettyClient.java | 230 +++++++++---------
.../messaging/netty/SaslStormClientHandler.java | 243 ++++++++++---------
.../messaging/netty/SaslStormServerHandler.java | 240 +++++++++---------
.../storm/messaging/netty/SaslUtils.java | 75 +++---
.../netty/StormClientPipelineFactory.java | 32 ++-
.../netty/StormServerPipelineFactory.java | 43 ++--
8 files changed, 490 insertions(+), 498 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/3bce04ce/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 64a1757..0cf1809 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -62,7 +62,7 @@ public class Client implements IConnection {
Map storm_conf;
- MessageBatch messageBatch = null;
+ private MessageBatch messageBatch = null;
private AtomicLong flushCheckTimer;
private int flushCheckInterval;
private ScheduledExecutorService scheduler;
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/3bce04ce/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java
index 8383d2c..d0d3ca1 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java
@@ -27,74 +27,73 @@ import org.slf4j.LoggerFactory;
* Send and receive SASL tokens.
*/
public class SaslMessageToken {
- /** Class logger */
- private static final Logger LOG = LoggerFactory
- .getLogger(SaslMessageToken.class);
+ /** Class logger */
+ private static final Logger LOG = LoggerFactory
+ .getLogger(SaslMessageToken.class);
- /** Used for client or server's token to send or receive from each other. */
- private byte[] token;
+ /** Used for client or server's token to send or receive from each other. */
+ private byte[] token;
- /**
- * Constructor used for reflection only.
- */
- public SaslMessageToken() {
- }
+ /**
+ * Constructor used for reflection only.
+ */
+ public SaslMessageToken() {
+ }
- /**
- * Constructor used to send request.
- *
- * @param token
- * the SASL token, generated by a SaslClient or SaslServer.
- */
- public SaslMessageToken(byte[] token) {
- this.token = token;
- }
+ /**
+ * Constructor used to send request.
+ *
+ * @param token
+ * the SASL token, generated by a SaslClient or SaslServer.
+ */
+ public SaslMessageToken(byte[] token) {
+ this.token = token;
+ }
- /**
- * Read accessor for SASL token
- *
- * @return saslToken SASL token
- */
- public byte[] getSaslToken() {
- return token;
- }
+ /**
+ * Read accessor for SASL token
+ *
+ * @return saslToken SASL token
+ */
+ public byte[] getSaslToken() {
+ return token;
+ }
- /**
- * Write accessor for SASL token
- *
- * @param token
- * SASL token
- */
- public void setSaslToken(byte[] token) {
- this.token = token;
- }
+ /**
+ * Write accessor for SASL token
+ *
+ * @param token
+ * SASL token
+ */
+ public void setSaslToken(byte[] token) {
+ this.token = token;
+ }
- int encodeLength() {
- return 2+4+token.length;
- }
+ int encodeLength() {
+ return 2 + 4 + token.length;
+ }
- /**
- * encode the current SaslToken Message into a channel buffer
- * SaslTokenMessageRequest is encoded as:
- * identifier .... short(2) always it is -500
- * payload length .... int
- * payload .... byte[]
- * @throws Exception
- */
- ChannelBuffer buffer() throws Exception {
- ChannelBufferOutputStream bout = new ChannelBufferOutputStream(
- ChannelBuffers.directBuffer(encodeLength()));
- short identifier = -500;
- int payload_len = 0;
+ /**
+ * encode the current SaslToken Message into a channel buffer
+ * SaslTokenMessageRequest is encoded as: identifier .... short(2) always it
+ * is -500 payload length .... int payload .... byte[]
+ *
+ * @throws Exception
+ */
+ ChannelBuffer buffer() throws Exception {
+ ChannelBufferOutputStream bout = new ChannelBufferOutputStream(
+ ChannelBuffers.directBuffer(encodeLength()));
+ short identifier = -500;
+ int payload_len = 0;
if (token != null)
- payload_len = token.length;
-
- bout.writeShort((short)identifier);
- bout.writeInt((int)payload_len);
- if(payload_len>0) {
- bout.write(token);
- }
- bout.close();
- return bout.buffer();
- }
+ payload_len = token.length;
+
+ bout.writeShort((short) identifier);
+ bout.writeInt((int) payload_len);
+ if (payload_len > 0) {
+ bout.write(token);
+ }
+ bout.close();
+ return bout.buffer();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/3bce04ce/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java
index fedcfff..023e950 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java
@@ -38,129 +38,129 @@ import org.slf4j.LoggerFactory;
*/
public class SaslNettyClient {
- private static final Logger LOG = LoggerFactory
- .getLogger(SaslNettyClient.class);
+ private static final Logger LOG = LoggerFactory
+ .getLogger(SaslNettyClient.class);
- /**
- * Used to respond to server's counterpart, SaslServer with SASL tokens
- * represented as byte arrays.
- */
- private SaslClient saslClient;
+ /**
+ * Used to respond to server's counterpart, SaslServer with SASL tokens
+ * represented as byte arrays.
+ */
+ private SaslClient saslClient;
- /**
- * Create a SaslNettyClient for authentication with servers.
- */
- public SaslNettyClient(String topologyName, byte[] token) {
- try {
- LOG.debug("SaslNettyClient: Creating SASL "
- + SaslUtils.AUTH_DIGEST_MD5
- + " client to authenticate to server ");
+ /**
+ * Create a SaslNettyClient for authentication with servers.
+ */
+ public SaslNettyClient(String topologyName, byte[] token) {
+ try {
+ LOG.debug("SaslNettyClient: Creating SASL "
+ + SaslUtils.AUTH_DIGEST_MD5
+ + " client to authenticate to server ");
- saslClient = Sasl.createSaslClient(
- new String[] { SaslUtils.AUTH_DIGEST_MD5 }, null, null,
- SaslUtils.DEFAULT_REALM, SaslUtils.getSaslProps(),
- new SaslClientCallbackHandler(topologyName, token));
+ saslClient = Sasl.createSaslClient(
+ new String[] { SaslUtils.AUTH_DIGEST_MD5 }, null, null,
+ SaslUtils.DEFAULT_REALM, SaslUtils.getSaslProps(),
+ new SaslClientCallbackHandler(topologyName, token));
- } catch (IOException e) {
- LOG.error("SaslNettyClient: Could not obtain topology token for Netty "
- + "Client to use to authenticate with a Netty Server.");
- saslClient = null;
- }
- }
+ } catch (IOException e) {
+ LOG.error("SaslNettyClient: Could not obtain topology token for Netty "
+ + "Client to use to authenticate with a Netty Server.");
+ saslClient = null;
+ }
+ }
- public boolean isComplete() {
- return saslClient.isComplete();
- }
+ public boolean isComplete() {
+ return saslClient.isComplete();
+ }
- /**
- * Respond to server's SASL token.
- *
- * @param saslTokenMessage
- * contains server's SASL token
- * @return client's response SASL token
- */
- public byte[] saslResponse(SaslMessageToken saslTokenMessage) {
- try {
- byte[] retval = saslClient.evaluateChallenge(saslTokenMessage
- .getSaslToken());
- return retval;
- } catch (SaslException e) {
- LOG.error(
- "saslResponse: Failed to respond to SASL server's token:",
- e);
- return null;
- }
- }
+ /**
+ * Respond to server's SASL token.
+ *
+ * @param saslTokenMessage
+ * contains server's SASL token
+ * @return client's response SASL token
+ */
+ public byte[] saslResponse(SaslMessageToken saslTokenMessage) {
+ try {
+ byte[] retval = saslClient.evaluateChallenge(saslTokenMessage
+ .getSaslToken());
+ return retval;
+ } catch (SaslException e) {
+ LOG.error(
+ "saslResponse: Failed to respond to SASL server's token:",
+ e);
+ return null;
+ }
+ }
- /**
- * Implementation of javax.security.auth.callback.CallbackHandler that works
- * with Storm topology tokens.
- */
- private static class SaslClientCallbackHandler implements CallbackHandler {
- /** Generated username contained in TopologyToken */
- private final String userName;
- /** Generated password contained in TopologyToken */
- private final char[] userPassword;
+ /**
+ * Implementation of javax.security.auth.callback.CallbackHandler that works
+ * with Storm topology tokens.
+ */
+ private static class SaslClientCallbackHandler implements CallbackHandler {
+ /** Generated username contained in TopologyToken */
+ private final String userName;
+ /** Generated password contained in TopologyToken */
+ private final char[] userPassword;
- /**
- * Set private members using topology token.
- *
- * @param topologyToken
- */
- public SaslClientCallbackHandler(String topologyToken, byte[] token) {
- this.userName = SaslUtils
- .encodeIdentifier(topologyToken.getBytes());
- this.userPassword = SaslUtils.encodePassword(token);
- }
+ /**
+ * Set private members using topology token.
+ *
+ * @param topologyToken
+ */
+ public SaslClientCallbackHandler(String topologyToken, byte[] token) {
+ this.userName = SaslUtils
+ .encodeIdentifier(topologyToken.getBytes());
+ this.userPassword = SaslUtils.encodePassword(token);
+ }
- /**
- * Implementation used to respond to SASL tokens from server.
- *
- * @param callbacks
- * objects that indicate what credential information the
- * server's SaslServer requires from the client.
- * @throws UnsupportedCallbackException
- */
- public void handle(Callback[] callbacks)
- throws UnsupportedCallbackException {
- NameCallback nc = null;
- PasswordCallback pc = null;
- RealmCallback rc = null;
- for (Callback callback : callbacks) {
- if (callback instanceof RealmChoiceCallback) {
- continue;
- } else if (callback instanceof NameCallback) {
- nc = (NameCallback) callback;
- } else if (callback instanceof PasswordCallback) {
- pc = (PasswordCallback) callback;
- } else if (callback instanceof RealmCallback) {
- rc = (RealmCallback) callback;
- } else {
- throw new UnsupportedCallbackException(callback,
- "handle: Unrecognized SASL client callback");
- }
- }
- if (nc != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("handle: SASL client callback: setting username: "
- + userName);
- }
- nc.setName(userName);
- }
- if (pc != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("handle: SASL client callback: setting userPassword");
- }
- pc.setPassword(userPassword);
- }
- if (rc != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("handle: SASL client callback: setting realm: "
- + rc.getDefaultText());
- }
- rc.setText(rc.getDefaultText());
- }
- }
- }
+ /**
+ * Implementation used to respond to SASL tokens from server.
+ *
+ * @param callbacks
+ * objects that indicate what credential information the
+ * server's SaslServer requires from the client.
+ * @throws UnsupportedCallbackException
+ */
+ public void handle(Callback[] callbacks)
+ throws UnsupportedCallbackException {
+ NameCallback nc = null;
+ PasswordCallback pc = null;
+ RealmCallback rc = null;
+ for (Callback callback : callbacks) {
+ if (callback instanceof RealmChoiceCallback) {
+ continue;
+ } else if (callback instanceof NameCallback) {
+ nc = (NameCallback) callback;
+ } else if (callback instanceof PasswordCallback) {
+ pc = (PasswordCallback) callback;
+ } else if (callback instanceof RealmCallback) {
+ rc = (RealmCallback) callback;
+ } else {
+ throw new UnsupportedCallbackException(callback,
+ "handle: Unrecognized SASL client callback");
+ }
+ }
+ if (nc != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("handle: SASL client callback: setting username: "
+ + userName);
+ }
+ nc.setName(userName);
+ }
+ if (pc != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("handle: SASL client callback: setting userPassword");
+ }
+ pc.setPassword(userPassword);
+ }
+ if (rc != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("handle: SASL client callback: setting realm: "
+ + rc.getDefaultText());
+ }
+ rc.setText(rc.getDefaultText());
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/3bce04ce/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
index 59c4abd..f94cbc3 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
@@ -32,124 +32,127 @@ import backtype.storm.Config;
public class SaslStormClientHandler extends SimpleChannelUpstreamHandler {
- private static final Logger LOG = LoggerFactory
- .getLogger(SaslStormClientHandler.class);
- private Client client;
- long start_time;
- /** Used for client or server's token to send or receive from each other. */
- private byte[] token;
- private String topologyName;
-
- public SaslStormClientHandler(Client client) throws IOException {
- this.client = client;
- start_time = System.currentTimeMillis();
- getSASLCredentials();
- }
-
- @Override
- public void channelConnected(ChannelHandlerContext ctx,
- ChannelStateEvent event) {
- // register the newly established channel
- Channel channel = ctx.getChannel();
-
- LOG.info("Connection established from " + channel.getLocalAddress()
- + " to " + channel.getRemoteAddress());
-
- try {
- SaslNettyClient saslNettyClient = SaslNettyClientState.getSaslNettyClient
- .get(channel);
-
- if (saslNettyClient == null) {
- LOG.debug("Creating saslNettyClient now " + "for channel: "
- + channel);
- saslNettyClient = new SaslNettyClient(topologyName, token);
- SaslNettyClientState.getSaslNettyClient.set(channel,
- saslNettyClient);
- }
- channel.write(ControlMessage.SASL_TOKEN_MESSAGE_REQUEST);
- } catch (Exception e) {
- LOG.error("Failed to authenticate with server " + "due to error: "
- + e);
- }
- return;
-
- }
-
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent event)
- throws Exception {
- LOG.debug("send/recv time (ms): {}",
- (System.currentTimeMillis() - start_time));
-
- Channel channel = ctx.getChannel();
-
- // Generate SASL response to server using Channel-local SASL client.
- SaslNettyClient saslNettyClient = SaslNettyClientState.getSaslNettyClient
- .get(channel);
- if (saslNettyClient == null) {
- throw new Exception("saslNettyClient was unexpectedly "
- + "null for channel: " + channel);
- }
-
- // examine the response message from server
- if (event.getMessage() instanceof ControlMessage) {
- ControlMessage msg = (ControlMessage) event.getMessage();
- if (msg == ControlMessage.SASL_COMPLETE_REQUEST) {
- LOG.debug("Server has sent us the SaslComplete "
- + "message. Allowing normal work to proceed.");
-
- if (!saslNettyClient.isComplete()) {
- LOG.error("Server returned a Sasl-complete message, "
- + "but as far as we can tell, we are not authenticated yet.");
- throw new Exception("Server returned a "
- + "Sasl-complete message, but as far as "
- + "we can tell, we are not authenticated yet.");
- }
- ctx.getPipeline().remove(this);
- // We call fireMessageReceived since the client is allowed to
- // perform this request. The client's request will now proceed
- // to the next pipeline component namely StormClientHandler.
- Channels.fireMessageReceived(ctx, msg);
- return;
- }
- }
- SaslMessageToken saslTokenMessage = (SaslMessageToken) event
- .getMessage();
- LOG.debug("Responding to server's token of length: "
- + saslTokenMessage.getSaslToken().length);
-
- // Generate SASL response (but we only actually send the response if
- // it's non-null.
- byte[] responseToServer = saslNettyClient
- .saslResponse(saslTokenMessage);
- if (responseToServer == null) {
- // If we generate a null response, then authentication has completed
- // (if not, warn), and return without sending a response back to the
- // server.
- LOG.debug("Response to server is null: "
- + "authentication should now be complete.");
- if (!saslNettyClient.isComplete()) {
- LOG.warn("Generated a null response, "
- + "but authentication is not complete.");
- }
- return;
- } else {
- LOG.debug("Response to server token has length:"
- + responseToServer.length);
- }
- // Construct a message containing the SASL response and send it to the
- // server.
- SaslMessageToken saslResponse = new SaslMessageToken(responseToServer);
- channel.write(saslResponse);
- }
-
- private void getSASLCredentials() throws IOException {
- topologyName = (String) this.client.storm_conf
- .get(Config.TOPOLOGY_NAME);
- String secretKey = SaslUtils.getSecretKey(this.client.storm_conf);
- if(secretKey!=null) {
- token = secretKey.getBytes();
- }
- LOG.debug("SASL credentials for storm topology "+topologyName+ " is "+secretKey);
- }
+ private static final Logger LOG = LoggerFactory
+ .getLogger(SaslStormClientHandler.class);
+ private Client client;
+ long start_time;
+ /** Used for client or server's token to send or receive from each other. */
+ private byte[] token;
+ private String topologyName;
+
+ public SaslStormClientHandler(Client client) throws IOException {
+ this.client = client;
+ start_time = System.currentTimeMillis();
+ getSASLCredentials();
+ }
+
+ @Override
+ public void channelConnected(ChannelHandlerContext ctx,
+ ChannelStateEvent event) {
+ // register the newly established channel
+ Channel channel = ctx.getChannel();
+
+ LOG.info("Connection established from " + channel.getLocalAddress()
+ + " to " + channel.getRemoteAddress());
+
+ try {
+ SaslNettyClient saslNettyClient = SaslNettyClientState.getSaslNettyClient
+ .get(channel);
+
+ if (saslNettyClient == null) {
+ LOG.debug("Creating saslNettyClient now " + "for channel: "
+ + channel);
+ saslNettyClient = new SaslNettyClient(topologyName, token);
+ SaslNettyClientState.getSaslNettyClient.set(channel,
+ saslNettyClient);
+ }
+ channel.write(ControlMessage.SASL_TOKEN_MESSAGE_REQUEST);
+ } catch (Exception e) {
+ LOG.error("Failed to authenticate with server " + "due to error: ",
+ e);
+ }
+ return;
+
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent event)
+ throws Exception {
+ LOG.debug("send/recv time (ms): {}",
+ (System.currentTimeMillis() - start_time));
+
+ Channel channel = ctx.getChannel();
+
+ // Generate SASL response to server using Channel-local SASL client.
+ SaslNettyClient saslNettyClient = SaslNettyClientState.getSaslNettyClient
+ .get(channel);
+ if (saslNettyClient == null) {
+ throw new Exception("saslNettyClient was unexpectedly "
+ + "null for channel: " + channel);
+ }
+
+ // examine the response message from server
+ if (event.getMessage() instanceof ControlMessage) {
+ ControlMessage msg = (ControlMessage) event.getMessage();
+ if (msg == ControlMessage.SASL_COMPLETE_REQUEST) {
+ LOG.debug("Server has sent us the SaslComplete "
+ + "message. Allowing normal work to proceed.");
+
+ if (!saslNettyClient.isComplete()) {
+ LOG.error("Server returned a Sasl-complete message, "
+ + "but as far as we can tell, we are not authenticated yet.");
+ throw new Exception("Server returned a "
+ + "Sasl-complete message, but as far as "
+ + "we can tell, we are not authenticated yet.");
+ }
+ ctx.getPipeline().remove(this);
+ // We call fireMessageReceived since the client is allowed to
+ // perform this request. The client's request will now proceed
+ // to the next pipeline component namely StormClientHandler.
+ Channels.fireMessageReceived(ctx, msg);
+ return;
+ }
+ }
+ SaslMessageToken saslTokenMessage = (SaslMessageToken) event
+ .getMessage();
+ LOG.debug("Responding to server's token of length: "
+ + saslTokenMessage.getSaslToken().length);
+
+ // Generate SASL response (but we only actually send the response if
+ // it's non-null.
+ byte[] responseToServer = saslNettyClient
+ .saslResponse(saslTokenMessage);
+ if (responseToServer == null) {
+ // If we generate a null response, then authentication has completed
+ // (if not, warn), and return without sending a response back to the
+ // server.
+ LOG.debug("Response to server is null: "
+ + "authentication should now be complete.");
+ if (!saslNettyClient.isComplete()) {
+ LOG.warn("Generated a null response, "
+ + "but authentication is not complete.");
+ throw new Exception("Server reponse is null, but as far as "
+ + "we can tell, we are not authenticated yet.");
+ }
+ return;
+ } else {
+ LOG.debug("Response to server token has length:"
+ + responseToServer.length);
+ }
+ // Construct a message containing the SASL response and send it to the
+ // server.
+ SaslMessageToken saslResponse = new SaslMessageToken(responseToServer);
+ channel.write(saslResponse);
+ }
+
+ private void getSASLCredentials() throws IOException {
+ topologyName = (String) this.client.storm_conf
+ .get(Config.TOPOLOGY_NAME);
+ String secretKey = SaslUtils.getSecretKey(this.client.storm_conf);
+ if (secretKey != null) {
+ token = secretKey.getBytes();
+ }
+ LOG.debug("SASL credentials for storm topology " + topologyName
+ + " is " + secretKey);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/3bce04ce/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
index d06e960..02448e2 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
@@ -32,124 +32,124 @@ import backtype.storm.Config;
public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
- Server server;
- /** Used for client or server's token to send or receive from each other. */
- private byte[] token;
- private String topologyName;
-
- private static final Logger LOG = LoggerFactory
- .getLogger(SaslStormServerHandler.class);
-
- public SaslStormServerHandler(Server server) throws IOException {
- this.server = server;
- getSASLCredentials();
- }
-
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
- throws Exception {
- Object msg = e.getMessage();
- if (msg == null)
- return;
-
- Channel channel = ctx.getChannel();
-
- if (msg instanceof ControlMessage
- && ((ControlMessage) e.getMessage()) == ControlMessage.SASL_TOKEN_MESSAGE_REQUEST) {
- // initialize server-side SASL functionality, if we haven't yet
- // (in which case we are looking at the first SASL message from the
- // client).
- SaslNettyServer saslNettyServer = SaslNettyServerState.getSaslNettyServer
- .get(channel);
- if (saslNettyServer == null) {
- LOG.debug("No saslNettyServer for " + channel
- + " yet; creating now, with topology token: ");
- try {
- saslNettyServer = new SaslNettyServer(topologyName, token);
- } catch (IOException ioe) {
- LOG.error("Error occurred while creating saslNettyServer on server "
- + channel.getLocalAddress()
- + " for client "
- + channel.getRemoteAddress());
- throw new IOException(ioe);
- }
-
- SaslNettyServerState.getSaslNettyServer.set(channel,
- saslNettyServer);
- } else {
- LOG.debug("Found existing saslNettyServer on server:"
- + channel.getLocalAddress() + " for client "
- + channel.getRemoteAddress());
- }
-
- LOG.debug("processToken: With nettyServer: " + saslNettyServer
- + " and token length: " + token.length);
-
- SaslMessageToken saslTokenMessageRequest = null;
- saslTokenMessageRequest = new SaslMessageToken(
- saslNettyServer.response(new byte[0]));
- // Send response to client.
- channel.write(saslTokenMessageRequest);
- // do not send upstream to other handlers: no further action needs
- // to be done for SASL_TOKEN_MESSAGE_REQUEST requests.
- return;
- }
-
- if (msg instanceof SaslMessageToken) {
- // initialize server-side SASL functionality, if we haven't yet
- // (in which case we are looking at the first SASL message from the
- // client).
- SaslNettyServer saslNettyServer = SaslNettyServerState.getSaslNettyServer
- .get(channel);
- if (saslNettyServer == null) {
- if (saslNettyServer == null) {
- throw new Exception("saslNettyServer was unexpectedly "
- + "null for channel: " + channel);
- }
- }
- SaslMessageToken saslTokenMessageRequest = new SaslMessageToken(
- saslNettyServer.response(((SaslMessageToken) msg)
- .getSaslToken()));
-
- // Send response to client.
- channel.write(saslTokenMessageRequest);
-
- if (saslNettyServer.isComplete()) {
- // If authentication of client is complete, we will also send a
- // SASL-Complete message to the client.
- LOG.debug("SASL authentication is complete for client with "
- + "username: " + saslNettyServer.getUserName());
- channel.write(ControlMessage.SASL_COMPLETE_REQUEST);
- LOG.debug("Removing SaslServerHandler from pipeline since SASL "
- + "authentication is complete.");
- ctx.getPipeline().remove(this);
- }
- return;
- } else {
- // Client should not be sending other-than-SASL messages before
- // SaslServerHandler has removed itself from the pipeline. Such
- // non-SASL requests will be denied by the Authorize channel handler
- // (the next handler upstream in the server pipeline) if SASL
- // authentication has not completed.
- LOG.warn("Sending upstream an unexpected non-SASL message : "
- + msg);
- Channels.fireMessageReceived(ctx, msg);
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
- server.closeChannel(e.getChannel());
- }
-
- private void getSASLCredentials() throws IOException {
- topologyName = (String) this.server.storm_conf
- .get(Config.TOPOLOGY_NAME);
- String secretKey = SaslUtils.getSecretKey(this.server.storm_conf);
- if (secretKey != null) {
- token = secretKey.getBytes();
- }
- LOG.debug("SASL credentials for storm topology " + topologyName
- + " is " + secretKey);
- }
+ Server server;
+ /** Used for client or server's token to send or receive from each other. */
+ private byte[] token;
+ private String topologyName;
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(SaslStormServerHandler.class);
+
+ public SaslStormServerHandler(Server server) throws IOException {
+ this.server = server;
+ getSASLCredentials();
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ throws Exception {
+ Object msg = e.getMessage();
+ if (msg == null)
+ return;
+
+ Channel channel = ctx.getChannel();
+
+ if (msg instanceof ControlMessage
+ && ((ControlMessage) e.getMessage()) == ControlMessage.SASL_TOKEN_MESSAGE_REQUEST) {
+ // initialize server-side SASL functionality, if we haven't yet
+ // (in which case we are looking at the first SASL message from the
+ // client).
+ SaslNettyServer saslNettyServer = SaslNettyServerState.getSaslNettyServer
+ .get(channel);
+ if (saslNettyServer == null) {
+ LOG.debug("No saslNettyServer for " + channel
+ + " yet; creating now, with topology token: ");
+ try {
+ saslNettyServer = new SaslNettyServer(topologyName, token);
+ } catch (IOException ioe) {
+ LOG.error("Error occurred while creating saslNettyServer on server "
+ + channel.getLocalAddress()
+ + " for client "
+ + channel.getRemoteAddress());
+ saslNettyServer = null;
+ }
+
+ SaslNettyServerState.getSaslNettyServer.set(channel,
+ saslNettyServer);
+ } else {
+ LOG.debug("Found existing saslNettyServer on server:"
+ + channel.getLocalAddress() + " for client "
+ + channel.getRemoteAddress());
+ }
+
+ LOG.debug("processToken: With nettyServer: " + saslNettyServer
+ + " and token length: " + token.length);
+
+ SaslMessageToken saslTokenMessageRequest = null;
+ saslTokenMessageRequest = new SaslMessageToken(
+ saslNettyServer.response(new byte[0]));
+ // Send response to client.
+ channel.write(saslTokenMessageRequest);
+ // do not send upstream to other handlers: no further action needs
+ // to be done for SASL_TOKEN_MESSAGE_REQUEST requests.
+ return;
+ }
+
+ if (msg instanceof SaslMessageToken) {
+ // initialize server-side SASL functionality, if we haven't yet
+ // (in which case we are looking at the first SASL message from the
+ // client).
+ SaslNettyServer saslNettyServer = SaslNettyServerState.getSaslNettyServer
+ .get(channel);
+ if (saslNettyServer == null) {
+ if (saslNettyServer == null) {
+ throw new Exception("saslNettyServer was unexpectedly "
+ + "null for channel: " + channel);
+ }
+ }
+ SaslMessageToken saslTokenMessageRequest = new SaslMessageToken(
+ saslNettyServer.response(((SaslMessageToken) msg)
+ .getSaslToken()));
+
+ // Send response to client.
+ channel.write(saslTokenMessageRequest);
+
+ if (saslNettyServer.isComplete()) {
+ // If authentication of client is complete, we will also send a
+ // SASL-Complete message to the client.
+ LOG.debug("SASL authentication is complete for client with "
+ + "username: " + saslNettyServer.getUserName());
+ channel.write(ControlMessage.SASL_COMPLETE_REQUEST);
+ LOG.debug("Removing SaslServerHandler from pipeline since SASL "
+ + "authentication is complete.");
+ ctx.getPipeline().remove(this);
+ }
+ return;
+ } else {
+ // Client should not be sending other-than-SASL messages before
+ // SaslServerHandler has removed itself from the pipeline. Such
+ // non-SASL requests will be denied by the Authorize channel handler
+ // (the next handler upstream in the server pipeline) if SASL
+ // authentication has not completed.
+ LOG.warn("Sending upstream an unexpected non-SASL message : "
+ + msg);
+ Channels.fireMessageReceived(ctx, msg);
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+ server.closeChannel(e.getChannel());
+ }
+
+ private void getSASLCredentials() throws IOException {
+ topologyName = (String) this.server.storm_conf
+ .get(Config.TOPOLOGY_NAME);
+ String secretKey = SaslUtils.getSecretKey(this.server.storm_conf);
+ if (secretKey != null) {
+ token = secretKey.getBytes();
+ }
+ LOG.debug("SASL credentials for storm topology " + topologyName
+ + " is " + secretKey);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/3bce04ce/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
index 0f96233..a4cc0ba 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
@@ -28,48 +28,47 @@ import org.apache.commons.codec.binary.Base64;
import backtype.storm.Config;
class SaslUtils {
- public static final String AUTH_DIGEST_MD5 = "DIGEST-MD5";
- public static final String DEFAULT_REALM = "default";
+ public static final String AUTH_DIGEST_MD5 = "DIGEST-MD5";
+ public static final String DEFAULT_REALM = "default";
- static Map<String, String> getSaslProps() {
- Map<String, String> props = new HashMap<String, String>();
- props.put(Sasl.POLICY_NOPLAINTEXT, "true");
- return props;
- }
+ static Map<String, String> getSaslProps() {
+ Map<String, String> props = new HashMap<String, String>();
+ props.put(Sasl.POLICY_NOPLAINTEXT, "true");
+ return props;
+ }
- /**
- * Encode a password as a base64-encoded char[] array.
- *
- * @param password
- * as a byte array.
- * @return password as a char array.
- */
- static char[] encodePassword(byte[] password) {
- return new String(Base64.encodeBase64(password),
- Charset.defaultCharset()).toCharArray();
- }
+ /**
+ * Encode a password as a base64-encoded char[] array.
+ *
+ * @param password
+ * as a byte array.
+ * @return password as a char array.
+ */
+ static char[] encodePassword(byte[] password) {
+ return new String(Base64.encodeBase64(password),
+ Charset.defaultCharset()).toCharArray();
+ }
- /**
- * Encode a identifier as a base64-encoded char[] array.
- *
- * @param identifier
- * as a byte array.
- * @return identifier as a char array.
- */
- static String encodeIdentifier(byte[] identifier) {
- return new String(Base64.encodeBase64(identifier),
- Charset.defaultCharset());
- }
+ /**
+ * Encode a identifier as a base64-encoded char[] array.
+ *
+ * @param identifier
+ * as a byte array.
+ * @return identifier as a char array.
+ */
+ static String encodeIdentifier(byte[] identifier) {
+ return new String(Base64.encodeBase64(identifier),
+ Charset.defaultCharset());
+ }
- static String getSecretKey(Map conf) {
- if (conf == null || conf.isEmpty())
- return null;
+ static String getSecretKey(Map conf) {
+ if (conf == null || conf.isEmpty())
+ return null;
- String secretPayLoad = (String) conf
- .get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD);
+ String secretPayLoad = (String) conf
+ .get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD);
+
+ return secretPayLoad;
+ }
- return secretPayLoad;
- }
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/3bce04ce/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
index 4fdaee9..1ea382b 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
@@ -27,31 +27,27 @@ class StormClientPipelineFactory implements ChannelPipelineFactory {
private Client client;
StormClientPipelineFactory(Client client) {
- this.client = client;
+ this.client = client;
}
public ChannelPipeline getPipeline() throws Exception {
// Create a default pipeline implementation.
ChannelPipeline pipeline = Channels.pipeline();
- boolean isNettyAuth = (Boolean) this.client.storm_conf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION);
- if(isNettyAuth) {
- // Decoder
- pipeline.addLast("decoder", new MessageDecoder());
- // Encoder
- pipeline.addLast("encoder", new MessageEncoder());
- // Authenticate: Removed after authentication completes
- pipeline.addLast("saslClientHandler", new SaslStormClientHandler(client));
- // business logic.
- pipeline.addLast("handler", new StormClientErrorHandler(client.name()));
- } else {
- // Decoder
- pipeline.addLast("decoder", new MessageDecoder());
- // Encoder
- pipeline.addLast("encoder", new MessageEncoder());
- // business logic.
- pipeline.addLast("handler", new StormClientErrorHandler(client.name()));
+ // Decoder
+ pipeline.addLast("decoder", new MessageDecoder());
+ // Encoder
+ pipeline.addLast("encoder", new MessageEncoder());
+
+ boolean isNettyAuth = (Boolean) this.client.storm_conf
+ .get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION);
+ if (isNettyAuth) {
+ // Authenticate: Removed after authentication completes
+ pipeline.addLast("saslClientHandler", new SaslStormClientHandler(
+ client));
}
+ // business logic.
+ pipeline.addLast("handler", new StormClientErrorHandler(client.name()));
return pipeline;
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/3bce04ce/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
index c2b4c53..f6e20c5 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
@@ -22,41 +22,36 @@ import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import backtype.storm.Config;
-import backtype.storm.utils.Utils;
-
-class StormServerPipelineFactory implements ChannelPipelineFactory {
+class StormServerPipelineFactory implements ChannelPipelineFactory {
private Server server;
-
+
StormServerPipelineFactory(Server server) {
- this.server = server;
+ this.server = server;
}
-
+
public ChannelPipeline getPipeline() throws Exception {
// Create a default pipeline implementation.
ChannelPipeline pipeline = Channels.pipeline();
- boolean isNettyAuth = (Boolean) this.server.storm_conf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION);
- if(isNettyAuth) {
- // Decoder
- pipeline.addLast("decoder", new MessageDecoder());
- // Encoder
- pipeline.addLast("encoder", new MessageEncoder());
+ // Decoder
+ pipeline.addLast("decoder", new MessageDecoder());
+ // Encoder
+ pipeline.addLast("encoder", new MessageEncoder());
+
+ boolean isNettyAuth = (Boolean) this.server.storm_conf
+ .get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION);
+ if (isNettyAuth) {
// Authenticate: Removed after authentication completes
- pipeline.addLast("saslServerHandler", new SaslStormServerHandler(server));
+ pipeline.addLast("saslServerHandler", new SaslStormServerHandler(
+ server));
// Authorize
- pipeline.addLast("authorizeServerHandler", new SaslStormServerAuthorizeHandler());
- // business logic.
- pipeline.addLast("handler", new StormServerHandler(server));
- } else {
- // Decoder
- pipeline.addLast("decoder", new MessageDecoder());
- // Encoder
- pipeline.addLast("encoder", new MessageEncoder());
- // business logic.
- pipeline.addLast("handler", new StormServerHandler(server));
+ pipeline.addLast("authorizeServerHandler",
+ new SaslStormServerAuthorizeHandler());
}
-
+ // business logic.
+ pipeline.addLast("handler", new StormServerHandler(server));
+
return pipeline;
}
}
[3/9] git commit: STORM-348: Netty SASL Authentication
Posted by bo...@apache.org.
STORM-348: Netty SASL Authentication
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/63e90822
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/63e90822
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/63e90822
Branch: refs/heads/security
Commit: 63e90822056daf0a41c0778c0984b7371459a3f2
Parents: 133c398
Author: Raghavendra Nandagopal <sp...@gmail.com>
Authored: Thu Jul 24 15:57:31 2014 -0700
Committer: Raghavendra Nandagopal <sp...@gmail.com>
Committed: Thu Jul 24 15:57:31 2014 -0700
----------------------------------------------------------------------
conf/defaults.yaml | 3 +++
1 file changed, 3 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/63e90822/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 7f17054..ee66717 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -153,6 +153,9 @@ storm.messaging.netty.transfer.batch.size: 262144
# We check with this interval that whether the Netty channel is writable and try to write pending messages if it is.
storm.messaging.netty.flush.check.interval.ms: 10
+# By default, the Netty SASL authentication is set to false. Users can override and set it true for a specific topology.
+storm.messaging.netty.authentication: false
+
### topology.* configs are for specific executing storms
topology.enable.message.timeouts: true
topology.debug: false
[2/9] git commit: STORM-348: Netty SASL Authentication
Posted by bo...@apache.org.
STORM-348: Netty SASL Authentication
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/133c398e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/133c398e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/133c398e
Branch: refs/heads/security
Commit: 133c398ee9f5799fb3d702e27c8f83b969fc0341
Parents: 4198644
Author: Raghavendra Nandagopal <sp...@gmail.com>
Authored: Thu Jul 24 15:47:51 2014 -0700
Committer: Raghavendra Nandagopal <sp...@gmail.com>
Committed: Thu Jul 24 15:47:51 2014 -0700
----------------------------------------------------------------------
.../storm/messaging/netty/SaslNettyClient.java | 9 +++--
.../storm/messaging/netty/SaslNettyServer.java | 37 +++++++++-----------
.../messaging/netty/SaslStormClientHandler.java | 24 ++++++-------
.../messaging/netty/SaslStormServerHandler.java | 26 ++++++--------
.../storm/messaging/netty/SaslUtils.java | 14 ++++++++
5 files changed, 56 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/133c398e/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java
index a4f1b5e..fedcfff 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java
@@ -50,7 +50,7 @@ public class SaslNettyClient {
/**
* Create a SaslNettyClient for authentication with servers.
*/
- public SaslNettyClient(String topologyUser) {
+ public SaslNettyClient(String topologyName, byte[] token) {
try {
LOG.debug("SaslNettyClient: Creating SASL "
+ SaslUtils.AUTH_DIGEST_MD5
@@ -59,7 +59,7 @@ public class SaslNettyClient {
saslClient = Sasl.createSaslClient(
new String[] { SaslUtils.AUTH_DIGEST_MD5 }, null, null,
SaslUtils.DEFAULT_REALM, SaslUtils.getSaslProps(),
- new SaslClientCallbackHandler(topologyUser));
+ new SaslClientCallbackHandler(topologyName, token));
} catch (IOException e) {
LOG.error("SaslNettyClient: Could not obtain topology token for Netty "
@@ -107,11 +107,10 @@ public class SaslNettyClient {
*
* @param topologyToken
*/
- public SaslClientCallbackHandler(String topologyToken) {
+ public SaslClientCallbackHandler(String topologyToken, byte[] token) {
this.userName = SaslUtils
.encodeIdentifier(topologyToken.getBytes());
- this.userPassword = SaslUtils.encodePassword(topologyToken
- .getBytes());
+ this.userPassword = SaslUtils.encodePassword(token);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/133c398e/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServer.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServer.java
index 1178bd6..2cb47d9 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServer.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServer.java
@@ -44,14 +44,14 @@ class SaslNettyServer {
private SaslServer saslServer;
- SaslNettyServer(String topologyToken) throws IOException {
- LOG.debug("SaslNettyServer: Topology token is: " + topologyToken
+ SaslNettyServer(String topologyName, byte[] token) throws IOException {
+ LOG.debug("SaslNettyServer: Topology token is: " + topologyName
+ " with authmethod " + SaslUtils.AUTH_DIGEST_MD5);
try {
SaslDigestCallbackHandler ch = new SaslNettyServer.SaslDigestCallbackHandler(
- topologyToken);
+ topologyName, token);
saslServer = Sasl.createSaslServer(SaslUtils.AUTH_DIGEST_MD5, null,
SaslUtils.DEFAULT_REALM, SaslUtils.getSaslProps(), ch);
@@ -70,18 +70,18 @@ class SaslNettyServer {
return saslServer.getAuthorizationID();
}
-
-
/** CallbackHandler for SASL DIGEST-MD5 mechanism */
public static class SaslDigestCallbackHandler implements CallbackHandler {
/** Used to authenticate the clients */
- private String topologyToken;
+ private byte[] userPassword;
+ private String userName;
- public SaslDigestCallbackHandler(String topologyToken) {
+ public SaslDigestCallbackHandler(String topologyName, byte[] token) {
LOG.debug("SaslDigestCallback: Creating SaslDigestCallback handler "
- + "with topology token: " + topologyToken);
- this.topologyToken = topologyToken;
+ + "with topology token: " + topologyName);
+ this.userName = topologyName;
+ this.userPassword = token;
}
@Override
@@ -105,19 +105,19 @@ class SaslNettyServer {
"handle: Unrecognized SASL DIGEST-MD5 Callback");
}
}
-
- if(nc!=null) {
+
+ if (nc != null) {
LOG.debug("handle: SASL server DIGEST-MD5 callback: setting "
- + "username for client: " + topologyToken);
+ + "username for client: " + userName);
- nc.setName(topologyToken);
+ nc.setName(userName);
}
if (pc != null) {
- char[] password = SaslUtils.encodePassword(topologyToken.getBytes());
+ char[] password = SaslUtils.encodePassword(userPassword);
LOG.debug("handle: SASL server DIGEST-MD5 callback: setting "
- + "password for client: " + topologyToken);
+ + "password for client: " + userPassword);
pc.setPassword(password);
}
@@ -133,11 +133,8 @@ class SaslNettyServer {
}
if (ac.isAuthorized()) {
- if (LOG.isDebugEnabled()) {
- String username = topologyToken;
- LOG.debug("handle: SASL server DIGEST-MD5 callback: setting "
- + "canonicalized client ID: " + username);
- }
+ LOG.debug("handle: SASL server DIGEST-MD5 callback: setting "
+ + "canonicalized client ID: " + userName);
ac.setAuthorizedID(authzid);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/133c398e/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
index ca38c96..59c4abd 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
@@ -38,12 +38,12 @@ public class SaslStormClientHandler extends SimpleChannelUpstreamHandler {
long start_time;
/** Used for client or server's token to send or receive from each other. */
private byte[] token;
- private String topologyUser;
+ private String topologyName;
public SaslStormClientHandler(Client client) throws IOException {
this.client = client;
start_time = System.currentTimeMillis();
- loadTopologyToken();
+ getSASLCredentials();
}
@Override
@@ -62,7 +62,7 @@ public class SaslStormClientHandler extends SimpleChannelUpstreamHandler {
if (saslNettyClient == null) {
LOG.debug("Creating saslNettyClient now " + "for channel: "
+ channel);
- saslNettyClient = new SaslNettyClient(topologyUser);
+ saslNettyClient = new SaslNettyClient(topologyName, token);
SaslNettyClientState.getSaslNettyClient.set(channel,
saslNettyClient);
}
@@ -143,17 +143,13 @@ public class SaslStormClientHandler extends SimpleChannelUpstreamHandler {
channel.write(saslResponse);
}
- /**
- * Load Storm Topology Token.
- *
- * @param conf
- * Configuration
- * @throws IOException
- */
- private void loadTopologyToken() throws IOException {
- topologyUser = (String) this.client.storm_conf
+ private void getSASLCredentials() throws IOException {
+ topologyName = (String) this.client.storm_conf
.get(Config.TOPOLOGY_NAME);
- LOG.debug("SASL credentials is the storm user name: " + topologyUser);
- token = topologyUser.getBytes();
+ String secretKey = SaslUtils.getSecretKey(this.client.storm_conf);
+ if(secretKey!=null) {
+ token = secretKey.getBytes();
+ }
+ LOG.debug("SASL credentials for storm topology "+topologyName+ " is "+secretKey);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/133c398e/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
index 2e8bcac..d06e960 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
@@ -35,14 +35,14 @@ public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
Server server;
/** Used for client or server's token to send or receive from each other. */
private byte[] token;
- private String topologyUser;
+ private String topologyName;
private static final Logger LOG = LoggerFactory
.getLogger(SaslStormServerHandler.class);
public SaslStormServerHandler(Server server) throws IOException {
this.server = server;
- loadTopologyToken();
+ getSASLCredentials();
}
@Override
@@ -53,7 +53,6 @@ public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
return;
Channel channel = ctx.getChannel();
- LOG.debug("messageReceived: Got " + msg.getClass());
if (msg instanceof ControlMessage
&& ((ControlMessage) e.getMessage()) == ControlMessage.SASL_TOKEN_MESSAGE_REQUEST) {
@@ -66,7 +65,7 @@ public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
LOG.debug("No saslNettyServer for " + channel
+ " yet; creating now, with topology token: ");
try {
- saslNettyServer = new SaslNettyServer(topologyUser);
+ saslNettyServer = new SaslNettyServer(topologyName, token);
} catch (IOException ioe) {
LOG.error("Error occurred while creating saslNettyServer on server "
+ channel.getLocalAddress()
@@ -143,17 +142,14 @@ public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
server.closeChannel(e.getChannel());
}
- /**
- * Load Storm Topology Token.
- *
- * @param conf
- * Configuration
- * @throws IOException
- */
- private void loadTopologyToken() throws IOException {
- topologyUser = (String) this.server.storm_conf
+ private void getSASLCredentials() throws IOException {
+ topologyName = (String) this.server.storm_conf
.get(Config.TOPOLOGY_NAME);
- LOG.debug("SASL credentials for the storm topology: " + topologyUser);
- token = topologyUser.getBytes();
+ String secretKey = SaslUtils.getSecretKey(this.server.storm_conf);
+ if (secretKey != null) {
+ token = secretKey.getBytes();
+ }
+ LOG.debug("SASL credentials for storm topology " + topologyName
+ + " is " + secretKey);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/133c398e/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
index 0077cf3..0f96233 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
@@ -25,6 +25,8 @@ import javax.security.sasl.Sasl;
import org.apache.commons.codec.binary.Base64;
+import backtype.storm.Config;
+
class SaslUtils {
public static final String AUTH_DIGEST_MD5 = "DIGEST-MD5";
public static final String DEFAULT_REALM = "default";
@@ -58,4 +60,16 @@ class SaslUtils {
return new String(Base64.encodeBase64(identifier),
Charset.defaultCharset());
}
+
+ static String getSecretKey(Map conf) {
+ if (conf == null || conf.isEmpty())
+ return null;
+
+ String secretPayLoad = (String) conf
+ .get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD);
+
+ return secretPayLoad;
+ }
+
+
}