You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/04/06 17:32:51 UTC
[02/52] [partial] storm git commit: STORM-2441 Break down
'storm-core' to extract client (worker) artifacts
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslClientHandler.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslClientHandler.java
new file mode 100644
index 0000000..d7b20d6
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslClientHandler.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import java.io.IOException;
+import java.util.Map;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KerberosSaslClientHandler extends SimpleChannelUpstreamHandler {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(KerberosSaslClientHandler.class);
+ private ISaslClient client;
+ long start_time;
+ /** Used for client or server's token to send or receive from each other. */
+ private Map storm_conf;
+ private String jaas_section;
+ private String host;
+
+ public KerberosSaslClientHandler(ISaslClient client, Map storm_conf, String jaas_section, String host) throws IOException {
+ this.client = client;
+ this.storm_conf = storm_conf;
+ this.jaas_section = jaas_section;
+ this.host = host;
+ start_time = System.currentTimeMillis();
+ }
+
+ @Override
+ public void channelConnected(ChannelHandlerContext ctx,
+ ChannelStateEvent event) {
+ // register the newly established channel
+ Channel channel = ctx.getChannel();
+ client.channelConnected(channel);
+
+ LOG.info("Connection established from {} to {}",
+ channel.getLocalAddress(), channel.getRemoteAddress());
+
+ try {
+ KerberosSaslNettyClient saslNettyClient = KerberosSaslNettyClientState.getKerberosSaslNettyClient
+ .get(channel);
+
+ if (saslNettyClient == null) {
+ LOG.debug("Creating saslNettyClient now for channel: {}",
+ channel);
+ saslNettyClient = new KerberosSaslNettyClient(storm_conf, jaas_section, host);
+ KerberosSaslNettyClientState.getKerberosSaslNettyClient.set(channel,
+ saslNettyClient);
+ }
+ LOG.debug("Going to initiate Kerberos negotiations.");
+ byte[] initialChallenge = saslNettyClient.saslResponse(new SaslMessageToken(new byte[0]));
+ LOG.debug("Sending initial challenge: {}", initialChallenge);
+ channel.write(new SaslMessageToken(initialChallenge));
+ } catch (Exception e) {
+ LOG.error("Failed to authenticate with server due to error: ",
+ e);
+ }
+ return;
+
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent event)
+ throws Exception {
+ LOG.debug("send/recv time (ms): {}",
+ (System.currentTimeMillis() - start_time));
+
+ Channel channel = ctx.getChannel();
+
+ // Generate SASL response to server using Channel-local SASL client.
+ KerberosSaslNettyClient saslNettyClient = KerberosSaslNettyClientState.getKerberosSaslNettyClient
+ .get(channel);
+ if (saslNettyClient == null) {
+ throw new Exception("saslNettyClient was unexpectedly null for channel:" + channel);
+ }
+
+ // examine the response message from server
+ if (event.getMessage() instanceof ControlMessage) {
+ ControlMessage msg = (ControlMessage) event.getMessage();
+ if (msg == ControlMessage.SASL_COMPLETE_REQUEST) {
+ LOG.debug("Server has sent us the SaslComplete message. Allowing normal work to proceed.");
+
+ if (!saslNettyClient.isComplete()) {
+ String message = "Server returned a Sasl-complete message, but as far as we can tell, we are not authenticated yet.";
+ LOG.error(message);
+ throw new Exception(message);
+ }
+ ctx.getPipeline().remove(this);
+ this.client.channelReady();
+
+ // We call fireMessageReceived since the client is allowed to
+ // perform this request. The client's request will now proceed
+ // to the next pipeline component namely StormClientHandler.
+ Channels.fireMessageReceived(ctx, msg);
+ } else {
+ LOG.warn("Unexpected control message: {}", msg);
+ }
+ return;
+ }
+ else if (event.getMessage() instanceof SaslMessageToken) {
+ SaslMessageToken saslTokenMessage = (SaslMessageToken) event
+ .getMessage();
+ LOG.debug("Responding to server's token of length: {}",
+ saslTokenMessage.getSaslToken().length);
+
+ // Generate SASL response (but we only actually send the response if
+ // it's non-null.
+ byte[] responseToServer = saslNettyClient
+ .saslResponse(saslTokenMessage);
+ if (responseToServer == null) {
+ // If we generate a null response, then authentication has completed
+ // (if not, warn), and return without sending a response back to the
+ // server.
+ LOG.debug("Response to server is null: authentication should now be complete.");
+ if (!saslNettyClient.isComplete()) {
+ LOG.warn("Generated a null response, but authentication is not complete.");
+ throw new Exception("Our reponse to the server is null, but as far as we can tell, we are not authenticated yet.");
+ }
+ this.client.channelReady();
+ return;
+ } else {
+ LOG.debug("Response to server token has length: {}",
+ responseToServer.length);
+ }
+ // Construct a message containing the SASL response and send it to the
+ // server.
+ SaslMessageToken saslResponse = new SaslMessageToken(responseToServer);
+ channel.write(saslResponse);
+ } else {
+ LOG.error("Unexpected message from server: {}", event.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClient.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClient.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClient.java
new file mode 100644
index 0000000..2efcb19
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClient.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import org.apache.storm.Config;
+import org.apache.storm.security.auth.AuthUtils;
+import java.io.IOException;
+import java.security.Principal;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Map;
+import java.util.TreeMap;
+import javax.security.auth.Subject;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.kerberos.KerberosTicket;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginException;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import org.apache.zookeeper.server.auth.KerberosName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements SASL logic for storm worker client processes.
+ */
+public class KerberosSaslNettyClient {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(KerberosSaslNettyClient.class);
+
+ /**
+ * Used to respond to server's counterpart, SaslServer with SASL tokens
+ * represented as byte arrays.
+ */
+ private SaslClient saslClient;
+ private Subject subject;
+ private String jaas_section;
+
+ /**
+ * Create a KerberosSaslNettyClient for authentication with servers.
+ */
+ public KerberosSaslNettyClient(Map storm_conf, String jaas_section, String host) {
+ LOG.debug("KerberosSaslNettyClient: Creating SASL {} client to authenticate to server ",
+ SaslUtils.KERBEROS);
+
+ LOG.info("Creating Kerberos Client.");
+
+ Configuration login_conf;
+ try {
+ login_conf = AuthUtils.GetConfiguration(storm_conf);
+ }
+ catch (Throwable t) {
+ LOG.error("Failed to get login_conf: ", t);
+ throw t;
+ }
+ LOG.debug("KerberosSaslNettyClient: authmethod {}", SaslUtils.KERBEROS);
+
+ SaslClientCallbackHandler ch = new SaslClientCallbackHandler();
+
+ subject = null;
+ try {
+ LOG.debug("Setting Configuration to login_config: {}", login_conf);
+ //specify a configuration object to be used
+ Configuration.setConfiguration(login_conf);
+ //now login
+ LOG.debug("Trying to login.");
+ Login login = new Login(jaas_section, ch);
+ subject = login.getSubject();
+ LOG.debug("Got Subject: {}", subject.toString());
+ } catch (LoginException ex) {
+ LOG.error("Client failed to login in principal:" + ex, ex);
+ throw new RuntimeException(ex);
+ }
+
+ //check the credential of our principal
+ if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) {
+ LOG.error("Failed to verify user principal.");
+ throw new RuntimeException("Fail to verify user principal with section \"" +
+ jaas_section +
+ "\" in login configuration file " +
+ login_conf);
+ }
+
+ String serviceName = null;
+ try {
+ serviceName = AuthUtils.get(login_conf, jaas_section, "serviceName");
+ }
+ catch (IOException e) {
+ LOG.error("Failed to get service name.", e);
+ throw new RuntimeException(e);
+ }
+
+ try {
+ Principal principal = (Principal)subject.getPrincipals().toArray()[0];
+ final String fPrincipalName = principal.getName();
+ final String fHost = host;
+ final String fServiceName = serviceName;
+ final CallbackHandler fch = ch;
+ LOG.debug("Kerberos Client with principal: {}, host: {}", fPrincipalName, fHost);
+ saslClient = Subject.doAs(subject, new PrivilegedExceptionAction<SaslClient>() {
+ public SaslClient run() {
+ try {
+ Map<String, String> props = new TreeMap<String,String>();
+ props.put(Sasl.QOP, "auth");
+ props.put(Sasl.SERVER_AUTH, "false");
+ return Sasl.createSaslClient(
+ new String[] { SaslUtils.KERBEROS },
+ fPrincipalName,
+ fServiceName,
+ fHost,
+ props, fch);
+ }
+ catch (Exception e) {
+ LOG.error("Subject failed to create sasl client.", e);
+ return null;
+ }
+ }
+ });
+ LOG.info("Got Client: {}", saslClient);
+
+ } catch (PrivilegedActionException e) {
+ LOG.error("KerberosSaslNettyClient: Could not create Sasl Netty Client.");
+ throw new RuntimeException(e);
+ }
+ }
+
+ public boolean isComplete() {
+ return saslClient.isComplete();
+ }
+
+ /**
+ * Respond to server's SASL token.
+ *
+ * @param saslTokenMessage
+ * contains server's SASL token
+ * @return client's response SASL token
+ */
+ public byte[] saslResponse(SaslMessageToken saslTokenMessage) {
+ try {
+ final SaslMessageToken fSaslTokenMessage = saslTokenMessage;
+ byte [] retval = Subject.doAs(subject, new PrivilegedExceptionAction<byte[]>() {
+ public byte[] run() {
+ try {
+ byte[] retval = saslClient.evaluateChallenge(fSaslTokenMessage
+ .getSaslToken());
+ return retval;
+ } catch (SaslException e) {
+ LOG.error("saslResponse: Failed to respond to SASL server's token:",
+ e);
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ return retval;
+ }
+ catch (PrivilegedActionException e) {
+ LOG.error("Failed to generate response for token: ", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Implementation of javax.security.auth.callback.CallbackHandler that works
+ * with Storm topology tokens.
+ */
+ private static class SaslClientCallbackHandler implements CallbackHandler {
+
+ /**
+ * Implementation used to respond to SASL tokens from server.
+ *
+ * @param callbacks
+ * objects that indicate what credential information the
+ * server's SaslServer requires from the client.
+ * @throws UnsupportedCallbackException
+ */
+ public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
+ for (Callback callback : callbacks) {
+ LOG.info("Kerberos Client Callback Handler got callback: {}", callback.getClass());
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClientState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClientState.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClientState.java
new file mode 100644
index 0000000..dc76b0d
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClientState.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelLocal;
+
+final class KerberosSaslNettyClientState {
+
+ public static final ChannelLocal<KerberosSaslNettyClient> getKerberosSaslNettyClient = new ChannelLocal<KerberosSaslNettyClient>() {
+ protected KerberosSaslNettyClient initialValue(Channel channel) {
+ return null;
+ }
+ };
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServer.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServer.java
new file mode 100644
index 0000000..72486ef
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServer.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import org.apache.storm.security.auth.AuthUtils;
+import org.apache.storm.security.auth.KerberosPrincipalToLocal;
+import java.io.IOException;
+import java.security.Principal;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import javax.security.auth.Subject;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.kerberos.KerberosPrincipal;
+import javax.security.auth.kerberos.KerberosTicket;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import org.apache.zookeeper.server.auth.KerberosName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+class KerberosSaslNettyServer {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(KerberosSaslNettyServer.class);
+
+ private SaslServer saslServer;
+ private Subject subject;
+ private List<String> authorizedUsers;
+
+ KerberosSaslNettyServer(Map storm_conf, String jaas_section, List<String> authorizedUsers) {
+ this.authorizedUsers = authorizedUsers;
+ LOG.debug("Getting Configuration.");
+ Configuration login_conf;
+ try {
+ login_conf = AuthUtils.GetConfiguration(storm_conf);
+ }
+ catch (Throwable t) {
+ LOG.error("Failed to get login_conf: ", t);
+ throw t;
+ }
+
+ LOG.debug("KerberosSaslNettyServer: authmethod {}", SaslUtils.KERBEROS);
+
+ KerberosSaslCallbackHandler ch = new KerberosSaslNettyServer.KerberosSaslCallbackHandler(authorizedUsers);
+
+ //login our principal
+ subject = null;
+ try {
+ LOG.debug("Setting Configuration to login_config: {}", login_conf);
+ //specify a configuration object to be used
+ Configuration.setConfiguration(login_conf);
+ //now login
+ LOG.debug("Trying to login.");
+ Login login = new Login(jaas_section, ch);
+ subject = login.getSubject();
+ LOG.debug("Got Subject: {}", subject.toString());
+ } catch (LoginException ex) {
+ LOG.error("Server failed to login in principal:", ex);
+ throw new RuntimeException(ex);
+ }
+
+ //check the credential of our principal
+ if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) {
+ LOG.error("Failed to verifyuser principal.");
+ throw new RuntimeException("Fail to verify user principal with section \""
+ + jaas_section
+ + "\" in login configuration file "
+ + login_conf);
+ }
+
+ try {
+ LOG.info("Creating Kerberos Server.");
+ final CallbackHandler fch = ch;
+ Principal p = (Principal)subject.getPrincipals().toArray()[0];
+ KerberosName kName = new KerberosName(p.getName());
+ final String fHost = kName.getHostName();
+ final String fServiceName = kName.getServiceName();
+ LOG.debug("Server with host: {}", fHost);
+ saslServer =
+ Subject.doAs(subject, new PrivilegedExceptionAction<SaslServer>() {
+ public SaslServer run() {
+ try {
+ Map<String, String> props = new TreeMap<String,String>();
+ props.put(Sasl.QOP, "auth");
+ props.put(Sasl.SERVER_AUTH, "false");
+ return Sasl.createSaslServer(SaslUtils.KERBEROS,
+ fServiceName,
+ fHost, props, fch);
+ }
+ catch (Exception e) {
+ LOG.error("Subject failed to create sasl server.", e);
+ return null;
+ }
+ }
+ });
+ LOG.info("Got Server: {}", saslServer);
+
+ } catch (PrivilegedActionException e) {
+ LOG.error("KerberosSaslNettyServer: Could not create SaslServer: ", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public boolean isComplete() {
+ return saslServer.isComplete();
+ }
+
+ public String getUserName() {
+ return saslServer.getAuthorizationID();
+ }
+
+ /** CallbackHandler for SASL DIGEST-MD5 mechanism */
+ public static class KerberosSaslCallbackHandler implements CallbackHandler {
+
+ /** Used to authenticate the clients */
+ private List<String> authorizedUsers;
+
+ public KerberosSaslCallbackHandler(List<String> authorizedUsers) {
+ LOG.debug("KerberosSaslCallback: Creating KerberosSaslCallback handler.");
+ this.authorizedUsers = authorizedUsers;
+ }
+
+ @Override
+ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+ for (Callback callback : callbacks) {
+ LOG.info("Kerberos Callback Handler got callback: {}", callback.getClass());
+ if(callback instanceof AuthorizeCallback) {
+ AuthorizeCallback ac = (AuthorizeCallback)callback;
+ if(!ac.getAuthenticationID().equals(ac.getAuthorizationID())) {
+ LOG.debug("{} != {}", ac.getAuthenticationID(), ac.getAuthorizationID());
+ continue;
+ }
+
+ LOG.debug("Authorized Users: {}", authorizedUsers);
+ LOG.debug("Checking authorization for: {}", ac.getAuthorizationID());
+ for(String user : authorizedUsers) {
+ String requester = ac.getAuthorizationID();
+
+ KerberosPrincipal principal = new KerberosPrincipal(requester);
+ requester = new KerberosPrincipalToLocal().toLocal(principal);
+
+ if(requester.equals(user) ) {
+ ac.setAuthorized(true);
+ break;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Used by SaslTokenMessage::processToken() to respond to server SASL
+ * tokens.
+ *
+ * @param token
+ * Server's SASL token
+ * @return token to send back to the server.
+ */
+ public byte[] response(final byte[] token) {
+ try {
+ byte [] retval = Subject.doAs(subject, new PrivilegedExceptionAction<byte[]>() {
+ public byte[] run(){
+ try {
+ LOG.debug("response: Responding to input token of length: {}",
+ token.length);
+ byte[] retval = saslServer.evaluateResponse(token);
+ return retval;
+ } catch (SaslException e) {
+ LOG.error("response: Failed to evaluate client token of length: {} : {}",
+ token.length, e);
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ return retval;
+ }
+ catch (PrivilegedActionException e) {
+ LOG.error("Failed to generate response for token: ", e);
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServerState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServerState.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServerState.java
new file mode 100644
index 0000000..2ee2bf4
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServerState.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelLocal;
+
+final class KerberosSaslNettyServerState {
+
+ public static final ChannelLocal<KerberosSaslNettyServer> getKerberosSaslNettyServer = new ChannelLocal<KerberosSaslNettyServer>() {
+ protected KerberosSaslNettyServer initialValue(Channel channel) {
+ return null;
+ }
+ };
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslServerHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslServerHandler.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslServerHandler.java
new file mode 100644
index 0000000..14ac172
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslServerHandler.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KerberosSaslServerHandler extends SimpleChannelUpstreamHandler {
+
+ ISaslServer server;
+ /** Used for client or server's token to send or receive from each other. */
+ private Map storm_conf;
+ private String jaas_section;
+ private List<String> authorizedUsers;
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(KerberosSaslServerHandler.class);
+
+ public KerberosSaslServerHandler(ISaslServer server, Map storm_conf, String jaas_section, List<String> authorizedUsers) throws IOException {
+ this.server = server;
+ this.storm_conf = storm_conf;
+ this.jaas_section = jaas_section;
+ this.authorizedUsers = authorizedUsers;
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ throws Exception {
+ Object msg = e.getMessage();
+ if (msg == null) {
+ return;
+ }
+
+ Channel channel = ctx.getChannel();
+
+
+ if (msg instanceof SaslMessageToken) {
+ // initialize server-side SASL functionality, if we haven't yet
+ // (in which case we are looking at the first SASL message from the
+ // client).
+ try {
+ LOG.debug("Got SaslMessageToken!");
+
+ KerberosSaslNettyServer saslNettyServer = KerberosSaslNettyServerState.getKerberosSaslNettyServer
+ .get(channel);
+ if (saslNettyServer == null) {
+ LOG.debug("No saslNettyServer for {} yet; creating now, with topology token: ", channel);
+ try {
+ saslNettyServer = new KerberosSaslNettyServer(storm_conf, jaas_section, authorizedUsers);
+ KerberosSaslNettyServerState.getKerberosSaslNettyServer.set(channel,
+ saslNettyServer);
+ } catch (RuntimeException ioe) {
+ LOG.error("Error occurred while creating saslNettyServer on server {} for client {}",
+ channel.getLocalAddress(), channel.getRemoteAddress());
+ throw ioe;
+ }
+ } else {
+ LOG.debug("Found existing saslNettyServer on server: {} for client {}",
+ channel.getLocalAddress(), channel.getRemoteAddress());
+ }
+
+ byte[] responseBytes = saslNettyServer.response(((SaslMessageToken) msg)
+ .getSaslToken());
+
+ SaslMessageToken saslTokenMessageRequest = new SaslMessageToken(responseBytes);
+
+ if(saslTokenMessageRequest.getSaslToken() == null) {
+ channel.write(ControlMessage.SASL_COMPLETE_REQUEST);
+ } else {
+ // Send response to client.
+ channel.write(saslTokenMessageRequest);
+ }
+
+ if (saslNettyServer.isComplete()) {
+ // If authentication of client is complete, we will also send a
+ // SASL-Complete message to the client.
+ LOG.info("SASL authentication is complete for client with username: {}",
+ saslNettyServer.getUserName());
+ channel.write(ControlMessage.SASL_COMPLETE_REQUEST);
+ LOG.debug("Removing SaslServerHandler from pipeline since SASL authentication is complete.");
+ ctx.getPipeline().remove(this);
+ server.authenticated(channel);
+ }
+ return;
+ }
+ catch (Exception ex) {
+ LOG.error("Failed to handle SaslMessageToken: ", ex);
+ throw ex;
+ }
+ } else {
+ // Client should not be sending other-than-SASL messages before
+ // SaslServerHandler has removed itself from the pipeline. Such
+ // non-SASL requests will be denied by the Authorize channel handler
+ // (the next handler upstream in the server pipeline) if SASL
+ // authentication has not completed.
+ LOG.warn("Sending upstream an unexpected non-SASL message : {}",
+ msg);
+ Channels.fireMessageReceived(ctx, msg);
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+ if(server != null) {
+ server.closeChannel(e.getChannel());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java
new file mode 100644
index 0000000..718c8f3
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java
@@ -0,0 +1,411 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.messaging.netty;
+
+/**
+ * This class is responsible for refreshing Kerberos credentials for
+ * logins for both Zookeeper client and server.
+ * See ZooKeeperSaslServer for server-side usage.
+ * See ZooKeeperSaslClient for client-side usage.
+ * This class is a copied from https://github.com/apache/zookeeper/blob/branch-3.4/src/java/main/org/apache/zookeeper/Login.java
+ * with the difference that refresh thread does not die.
+ */
+
+import javax.security.auth.kerberos.KerberosPrincipal;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+import javax.security.auth.callback.CallbackHandler;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.Shell;
+import org.apache.zookeeper.client.ZooKeeperSaslClient;
+import javax.security.auth.kerberos.KerberosTicket;
+import javax.security.auth.Subject;
+import java.util.Date;
+import java.util.Random;
+import java.util.Set;
+
+public class Login {
+ Logger LOG = Logger.getLogger(Login.class);
+ public CallbackHandler callbackHandler;
+
+ // Login will sleep until 80% of time from last refresh to
+ // ticket's expiry has been reached, at which time it will wake
+ // and try to renew the ticket.
+ private static final float TICKET_RENEW_WINDOW = 0.80f;
+
+ /**
+ * Percentage of random jitter added to the renewal time
+ */
+ private static final float TICKET_RENEW_JITTER = 0.05f;
+
+ // Regardless of TICKET_RENEW_WINDOW setting above and the ticket expiry time,
+ // thread will not sleep between refresh attempts any less than 1 minute (60*1000 milliseconds = 1 minute).
+ // Change the '1' to e.g. 5, to change this to 5 minutes.
+ private static final long MIN_TIME_BEFORE_RELOGIN = 1 * 60 * 1000L;
+
+ private Subject subject = null;
+ private Thread t = null;
+ private boolean isKrbTicket = false;
+ private boolean isUsingTicketCache = false;
+ private boolean isUsingKeytab = false;
+
+ /** Random number generator */
+ private static Random rng = new Random();
+
+ private LoginContext login = null;
+ private String loginContextName = null;
+ private String keytabFile = null;
+ private String principal = null;
+
+ private long lastLogin = 0;
+
+ /**
+ * Login constructor. The constructor starts the thread used
+ * to periodically re-login to the Kerberos Ticket Granting Server.
+ * @param loginContextName
+ * name of section in JAAS file that will be use to login.
+ * Passed as first param to javax.security.auth.login.LoginContext().
+ *
+ * @param callbackHandler
+ * Passed as second param to javax.security.auth.login.LoginContext().
+ * @throws javax.security.auth.login.LoginException
+ * Thrown if authentication fails.
+ */
+ public Login(final String loginContextName, CallbackHandler callbackHandler)
+ throws LoginException {
+ this.callbackHandler = callbackHandler;
+ login = login(loginContextName);
+ this.loginContextName = loginContextName;
+ subject = login.getSubject();
+ isKrbTicket = !subject.getPrivateCredentials(KerberosTicket.class).isEmpty();
+ AppConfigurationEntry entries[] = Configuration.getConfiguration().getAppConfigurationEntry(loginContextName);
+ for (AppConfigurationEntry entry: entries) {
+ // there will only be a single entry, so this for() loop will only be iterated through once.
+ if (entry.getOptions().get("useTicketCache") != null) {
+ String val = (String)entry.getOptions().get("useTicketCache");
+ if (val.equals("true")) {
+ isUsingTicketCache = true;
+ }
+ }
+ if (entry.getOptions().get("keyTab") != null) {
+ keytabFile = (String)entry.getOptions().get("keyTab");
+ isUsingKeytab = true;
+ }
+ if (entry.getOptions().get("principal") != null) {
+ principal = (String)entry.getOptions().get("principal");
+ }
+ break;
+ }
+
+ if (!isKrbTicket) {
+ // if no TGT, do not bother with ticket management.
+ return;
+ }
+
+ // Refresh the Ticket Granting Ticket (TGT) periodically. How often to refresh is determined by the
+ // TGT's existing expiry date and the configured MIN_TIME_BEFORE_RELOGIN. For testing and development,
+ // you can decrease the interval of expiration of tickets (for example, to 3 minutes) by running :
+ // "modprinc -maxlife 3mins <principal>" in kadmin.
+ t = new Thread(new Runnable() {
+ public void run() {
+ LOG.info("TGT refresh thread started.");
+ while (true) { // renewal thread's main loop. if it exits from here, thread will exit.
+ KerberosTicket tgt = getTGT();
+ long now = System.currentTimeMillis();
+ long nextRefresh;
+ Date nextRefreshDate;
+ if (tgt == null) {
+ nextRefresh = now + MIN_TIME_BEFORE_RELOGIN;
+ nextRefreshDate = new Date(nextRefresh);
+ LOG.warn("No TGT found: will try again at " + nextRefreshDate);
+ } else {
+ nextRefresh = getRefreshTime(tgt);
+ long expiry = tgt.getEndTime().getTime();
+ Date expiryDate = new Date(expiry);
+ if ((isUsingTicketCache) && (tgt.getEndTime().equals(tgt.getRenewTill()))) {
+ LOG.error("The TGT cannot be renewed beyond the next expiry date: " + expiryDate + "." +
+ "This process will not be able to authenticate new SASL connections after that " +
+ "time (for example, it will not be authenticate a new connection with a Zookeeper " +
+ "Quorum member). Ask your system administrator to either increase the " +
+ "'renew until' time by doing : 'modprinc -maxrenewlife " + principal + "' within " +
+ "kadmin, or instead, to generate a keytab for " + principal + ". Because the TGT's " +
+ "expiry cannot be further extended by refreshing, exiting refresh thread now.");
+ return;
+ }
+ // determine how long to sleep from looking at ticket's expiry.
+ // We should not allow the ticket to expire, but we should take into consideration
+ // MIN_TIME_BEFORE_RELOGIN. Will not sleep less than MIN_TIME_BEFORE_RELOGIN, unless doing so
+ // would cause ticket expiration.
+ if ((nextRefresh > expiry) ||
+ ((now + MIN_TIME_BEFORE_RELOGIN) > expiry)) {
+ // expiry is before next scheduled refresh).
+ nextRefresh = now;
+ } else {
+ if (nextRefresh < (now + MIN_TIME_BEFORE_RELOGIN)) {
+ // next scheduled refresh is sooner than (now + MIN_TIME_BEFORE_LOGIN).
+ Date until = new Date(nextRefresh);
+ Date newuntil = new Date(now + MIN_TIME_BEFORE_RELOGIN);
+ LOG.warn("TGT refresh thread time adjusted from : " + until + " to : " + newuntil + " since "
+ + "the former is sooner than the minimum refresh interval ("
+ + MIN_TIME_BEFORE_RELOGIN / 1000 + " seconds) from now.");
+ }
+ nextRefresh = Math.max(nextRefresh, now + MIN_TIME_BEFORE_RELOGIN);
+ }
+ }
+ if (tgt != null && now > tgt.getEndTime().getTime()) {
+ if ((now - tgt.getEndTime().getTime()) < ( 10 * MIN_TIME_BEFORE_RELOGIN)) {
+ Date until = new Date(now + MIN_TIME_BEFORE_RELOGIN);
+ LOG.info("TGT already expired but giving additional 10 minutes past TGT expiry, refresh sleeping until: " + until.toString());
+ try {
+ Thread.sleep(MIN_TIME_BEFORE_RELOGIN);
+ } catch (InterruptedException ie) {
+ LOG.warn("TGT renewal thread has been interrupted and will exit.");
+ return;
+ }
+ } else {
+ LOG.error("nextRefresh:" + new Date(nextRefresh) + " is in the past: exiting refresh thread. Check"
+ + " clock sync between this host and KDC - (KDC's clock is likely ahead of this host)."
+ + " Manual intervention will be required for this client to successfully authenticate."
+ + " Exiting worker!.");
+ Runtime.getRuntime().exit(-3);
+ }
+ } else if (now < nextRefresh) {
+ Date until = new Date(nextRefresh);
+ LOG.info("TGT refresh sleeping until: " + until.toString());
+ try {
+ Thread.sleep(nextRefresh - now);
+ } catch (InterruptedException ie) {
+ LOG.warn("TGT renewal thread has been interrupted and will exit.");
+ return;
+ }
+ }
+
+ if (isUsingTicketCache) {
+ String cmd = "/usr/bin/kinit";
+ if (System.getProperty("zookeeper.kinit") != null) {
+ cmd = System.getProperty("zookeeper.kinit");
+ }
+ String kinitArgs = "-R";
+ int retry = 1;
+ while (retry >= 0) {
+ try {
+ LOG.debug("running ticket cache refresh command: " + cmd + " " + kinitArgs);
+ Shell.execCommand(cmd, kinitArgs);
+ break;
+ } catch (Exception e) {
+ if (retry > 0) {
+ --retry;
+ // sleep for 10 seconds
+ try {
+ Thread.sleep(10 * 1000);
+ } catch (InterruptedException ie) {
+ LOG.error("Interrupted while renewing TGT, exiting Login thread");
+ return;
+ }
+ } else {
+ LOG.warn("Could not renew TGT due to problem running shell command: '" + cmd
+ + " " + kinitArgs + "'" + "; exception was:" + e + ". Exiting refresh thread.",e);
+ return;
+ }
+ }
+ }
+ }
+ try {
+ int retry = 1;
+ while (retry >= 0) {
+ try {
+ reLogin();
+ break;
+ } catch (LoginException le) {
+ if (retry > 0) {
+ --retry;
+ // sleep for 10 seconds.
+ try {
+ Thread.sleep(10 * 1000);
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted during login retry after LoginException:", le);
+ throw le;
+ }
+ } else {
+ LOG.error("Could not refresh TGT for principal: " + principal + ".", le);
+ }
+ }
+ }
+ } catch (LoginException le) {
+ LOG.error("Failed to refresh TGT: refresh thread exiting now.",le);
+ break;
+ }
+ }
+ }
+ });
+ t.setName("Refresh-TGT");
+ t.setDaemon(true);
+ }
+
+ public void startThreadIfNeeded() {
+ // thread object 't' will be null if a refresh thread is not needed.
+ if (t != null) {
+ t.start();
+ }
+ }
+
+ public void shutdown() {
+ if ((t != null) && (t.isAlive())) {
+ t.interrupt();
+ try {
+ t.join();
+ } catch (InterruptedException e) {
+ LOG.warn("error while waiting for Login thread to shutdown: " + e);
+ }
+ }
+ }
+
+ public Subject getSubject() {
+ return subject;
+ }
+
+ public String getLoginContextName() {
+ return loginContextName;
+ }
+
+ private synchronized LoginContext login(final String loginContextName) throws LoginException {
+ if (loginContextName == null) {
+ throw new LoginException("loginContext name (JAAS file section header) was null. " +
+ "Please check your java.security.login.auth.config (=" +
+ System.getProperty("java.security.login.auth.config") +
+ ") and your " + ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY + "(=" +
+ System.getProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, "Client") + ")");
+ }
+ LoginContext loginContext = new LoginContext(loginContextName,callbackHandler);
+ loginContext.login();
+ LOG.info("successfully logged in.");
+ return loginContext;
+ }
+
+ // c.f. org.apache.hadoop.security.UserGroupInformation.
+ private long getRefreshTime(KerberosTicket tgt) {
+ long start = tgt.getStartTime().getTime();
+ long expires = tgt.getEndTime().getTime();
+ LOG.info("TGT valid starting at: " + tgt.getStartTime().toString());
+ LOG.info("TGT expires: " + tgt.getEndTime().toString());
+ long proposedRefresh = start + (long) ((expires - start) *
+ (TICKET_RENEW_WINDOW + (TICKET_RENEW_JITTER * rng.nextDouble())));
+ if (proposedRefresh > expires) {
+ // proposedRefresh is too far in the future: it's after ticket expires: simply return now.
+ return System.currentTimeMillis();
+ }
+ else {
+ return proposedRefresh;
+ }
+ }
+
+ private synchronized KerberosTicket getTGT() {
+ Set<KerberosTicket> tickets = subject.getPrivateCredentials(KerberosTicket.class);
+ for(KerberosTicket ticket: tickets) {
+ KerberosPrincipal server = ticket.getServer();
+ if (server.getName().equals("krbtgt/" + server.getRealm() + "@" + server.getRealm())) {
+ LOG.debug("Found tgt " + ticket + ".");
+ return ticket;
+ }
+ }
+ return null;
+ }
+
+ private void sleepUntilSufficientTimeElapsed() {
+ long now = System.currentTimeMillis();
+ if (now - getLastLogin() < MIN_TIME_BEFORE_RELOGIN ) {
+ LOG.warn("Not attempting to re-login since the last re-login was " +
+ "attempted less than " + (MIN_TIME_BEFORE_RELOGIN/1000) + " seconds"+
+ " before.");
+ try {
+ Thread.sleep(MIN_TIME_BEFORE_RELOGIN - (now - getLastLogin()));
+ } catch (InterruptedException e) {
+ LOG.warn("TGT renewal thread has been interrupted and will exit.");
+ Runtime.getRuntime().exit(-2);
+ }
+ }
+ // register most recent relogin attempt
+ setLastLogin(System.currentTimeMillis());
+ }
+
+ /**
+ * Returns login object
+ * @return login
+ */
+ private LoginContext getLogin() {
+ return login;
+ }
+
+ /**
+ * Set the login object
+ * @param login
+ */
+ private void setLogin(LoginContext login) {
+ this.login = login;
+ }
+
+ /**
+ * Set the last login time.
+ * @param time the number of milliseconds since the beginning of time
+ */
+ private void setLastLogin(long time) {
+ lastLogin = time;
+ }
+
+ /**
+ * Get the time of the last login.
+ * @return the number of milliseconds since the beginning of time.
+ */
+ private long getLastLogin() {
+ return lastLogin;
+ }
+
+ /**
+ * Re-login a principal. This method assumes that {@link #login(String)} has happened already.
+ * @throws javax.security.auth.login.LoginException on a failure
+ */
+ // c.f. HADOOP-6559
+ private synchronized void reLogin()
+ throws LoginException {
+ if (!isKrbTicket) {
+ return;
+ }
+ LoginContext login = getLogin();
+ if (login == null) {
+ throw new LoginException("login must be done first");
+ }
+ sleepUntilSufficientTimeElapsed();
+ LOG.info("Initiating logout for " + principal);
+ synchronized (Login.class) {
+ //clear up the kerberos state. But the tokens are not cleared! As per
+ //the Java kerberos login module code, only the kerberos credentials
+ //are cleared
+ login.logout();
+ //login and also update the subject field of this instance to
+ //have the new credentials (pass it to the LoginContext constructor)
+ login = new LoginContext(loginContextName, getSubject());
+ LOG.info("Initiating re-login for " + principal);
+ login.login();
+ setLogin(login);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageBatch.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageBatch.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageBatch.java
new file mode 100644
index 0000000..1c5dd9d
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageBatch.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import org.apache.storm.messaging.TaskMessage;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferOutputStream;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import java.util.ArrayList;
+
+class MessageBatch {
+ private int buffer_size;
+ private ArrayList<TaskMessage> msgs;
+ private int encoded_length;
+
+ MessageBatch(int buffer_size) {
+ this.buffer_size = buffer_size;
+ msgs = new ArrayList<>();
+ encoded_length = ControlMessage.EOB_MESSAGE.encodeLength();
+ }
+
+ void add(TaskMessage msg) {
+ if (msg == null)
+ throw new RuntimeException("null object forbidden in message batch");
+
+ msgs.add(msg);
+ encoded_length += msgEncodeLength(msg);
+ }
+
+
+ private int msgEncodeLength(TaskMessage taskMsg) {
+ if (taskMsg == null) return 0;
+
+ int size = 6; //INT + SHORT
+ if (taskMsg.message() != null)
+ size += taskMsg.message().length;
+ return size;
+ }
+
+ /**
+ * @return true if this batch used up allowed buffer size
+ */
+ boolean isFull() {
+ return encoded_length >= buffer_size;
+ }
+
+ /**
+ * @return true if this batch doesn't have any messages
+ */
+ boolean isEmpty() {
+ return msgs.isEmpty();
+ }
+
+ /**
+ * @return number of msgs in this batch
+ */
+ int size() {
+ return msgs.size();
+ }
+
+ /**
+ * create a buffer containing the encoding of this batch
+ */
+ ChannelBuffer buffer() throws Exception {
+ ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encoded_length));
+
+ for (TaskMessage msg : msgs) {
+ writeTaskMessage(bout, msg);
+ }
+
+ //add a END_OF_BATCH indicator
+ ControlMessage.EOB_MESSAGE.write(bout);
+
+ bout.close();
+
+ return bout.buffer();
+ }
+
+ /**
+ * write a TaskMessage into a stream
+ *
+ * Each TaskMessage is encoded as:
+ * task ... short(2)
+ * len ... int(4)
+ * payload ... byte[] *
+ */
+ private void writeTaskMessage(ChannelBufferOutputStream bout, TaskMessage message) throws Exception {
+ int payload_len = 0;
+ if (message.message() != null)
+ payload_len = message.message().length;
+
+ int task_id = message.task();
+ if (task_id > Short.MAX_VALUE)
+ throw new RuntimeException("Task ID should not exceed "+Short.MAX_VALUE);
+
+ bout.writeShort((short)task_id);
+ bout.writeInt(payload_len);
+ if (payload_len >0)
+ bout.write(message.message());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageBuffer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageBuffer.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageBuffer.java
new file mode 100644
index 0000000..4262f41
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageBuffer.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import org.apache.storm.messaging.TaskMessage;
+
+/**
+ * Encapsulates the state used for batching up messages.
+ */
+public class MessageBuffer {
+ private final int mesageBatchSize;
+ private MessageBatch currentBatch;
+
+ public MessageBuffer(int mesageBatchSize){
+ this.mesageBatchSize = mesageBatchSize;
+ this.currentBatch = new MessageBatch(mesageBatchSize);
+ }
+
+ public synchronized MessageBatch add(TaskMessage msg){
+ currentBatch.add(msg);
+ if(currentBatch.isFull()){
+ MessageBatch ret = currentBatch;
+ currentBatch = new MessageBatch(mesageBatchSize);
+ return ret;
+ } else {
+ return null;
+ }
+ }
+
+ public synchronized boolean isEmpty() {
+ return currentBatch.isEmpty();
+ }
+
+ public synchronized MessageBatch drain() {
+ if(!currentBatch.isEmpty()) {
+ MessageBatch ret = currentBatch;
+ currentBatch = new MessageBatch(mesageBatchSize);
+ return ret;
+ } else {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java
new file mode 100644
index 0000000..9030424
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.storm.messaging.TaskMessage;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.frame.FrameDecoder;
+
+public class MessageDecoder extends FrameDecoder {
+ /*
+ * Each ControlMessage is encoded as:
+ * code (<0) ... short(2)
+ * Each TaskMessage is encoded as:
+ * task (>=0) ... short(2)
+ * len ... int(4)
+ * payload ... byte[] *
+ */
+ protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception {
+ // Make sure that we have received at least a short
+ long available = buf.readableBytes();
+ if (available < 2) {
+ //need more data
+ return null;
+ }
+
+ List<Object> ret = new ArrayList<>();
+
+ // Use while loop, try to decode as more messages as possible in single call
+ while (available >= 2) {
+
+ // Mark the current buffer position before reading task/len field
+ // because the whole frame might not be in the buffer yet.
+ // We will reset the buffer position to the marked position if
+ // there's not enough bytes in the buffer.
+ buf.markReaderIndex();
+
+ // read the short field
+ short code = buf.readShort();
+ available -= 2;
+
+ // case 1: Control message
+ ControlMessage ctrl_msg = ControlMessage.mkMessage(code);
+ if (ctrl_msg != null) {
+
+ if (ctrl_msg == ControlMessage.EOB_MESSAGE) {
+ continue;
+ } else {
+ return ctrl_msg;
+ }
+ }
+
+ //case 2: SaslTokenMessageRequest
+ if(code == SaslMessageToken.IDENTIFIER) {
+ // Make sure that we have received at least an integer (length)
+ if (buf.readableBytes() < 4) {
+ //need more data
+ buf.resetReaderIndex();
+ return null;
+ }
+
+ // Read the length field.
+ int length = buf.readInt();
+ if (length<=0) {
+ return new SaslMessageToken(null);
+ }
+
+ // Make sure if there's enough bytes in the buffer.
+ if (buf.readableBytes() < length) {
+ // The whole bytes were not received yet - return null.
+ buf.resetReaderIndex();
+ return null;
+ }
+
+ // There's enough bytes in the buffer. Read it.
+ ChannelBuffer payload = buf.readBytes(length);
+
+ // Successfully decoded a frame.
+ // Return a SaslTokenMessageRequest object
+ return new SaslMessageToken(payload.array());
+ }
+
+ // case 3: task Message
+
+ // Make sure that we have received at least an integer (length)
+ if (available < 4) {
+ // need more data
+ buf.resetReaderIndex();
+ break;
+ }
+
+ // Read the length field.
+ int length = buf.readInt();
+
+ available -= 4;
+
+ if (length <= 0) {
+ ret.add(new TaskMessage(code, null));
+ break;
+ }
+
+ // Make sure if there's enough bytes in the buffer.
+ if (available < length) {
+ // The whole bytes were not received yet - return null.
+ buf.resetReaderIndex();
+ break;
+ }
+ available -= length;
+
+ // There's enough bytes in the buffer. Read it.
+ ChannelBuffer payload = buf.readBytes(length);
+
+
+ // Successfully decoded a frame.
+ // Return a TaskMessage object
+ ret.add(new TaskMessage(code, payload.array()));
+ }
+
+ if (ret.size() == 0) {
+ return null;
+ } else {
+ return ret;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageEncoder.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageEncoder.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageEncoder.java
new file mode 100644
index 0000000..0e9fc98
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageEncoder.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
+
+public class MessageEncoder extends OneToOneEncoder {
+ @Override
+ protected Object encode(ChannelHandlerContext ctx, Channel channel, Object obj) throws Exception {
+ if (obj instanceof ControlMessage) {
+ return ((ControlMessage)obj).buffer();
+ }
+
+ if (obj instanceof MessageBatch) {
+ return ((MessageBatch)obj).buffer();
+ }
+
+ if (obj instanceof SaslMessageToken) {
+ return ((SaslMessageToken)obj).buffer();
+ }
+
+ throw new RuntimeException("Unsupported encoding of object of class "+obj.getClass().getName());
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/NettyRenameThreadFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/NettyRenameThreadFactory.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/NettyRenameThreadFactory.java
new file mode 100644
index 0000000..e60c711
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/NettyRenameThreadFactory.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jboss.netty.util.ThreadNameDeterminer;
+import org.jboss.netty.util.ThreadRenamingRunnable;
+
+public class NettyRenameThreadFactory implements ThreadFactory {
+
+ static {
+ //Rename Netty threads
+ ThreadRenamingRunnable.setThreadNameDeterminer(ThreadNameDeterminer.CURRENT);
+ }
+
+ final ThreadGroup group;
+ final AtomicInteger index = new AtomicInteger(1);
+ final String name;
+ static final NettyUncaughtExceptionHandler uncaughtExceptionHandler = new NettyUncaughtExceptionHandler();
+
+ public NettyRenameThreadFactory(String name) {
+ SecurityManager s = System.getSecurityManager();
+ group = (s != null)? s.getThreadGroup() :
+ Thread.currentThread().getThreadGroup();
+ this.name = name;
+ }
+
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(group, r, name + "-" + index.getAndIncrement(), 0);
+ if (t.isDaemon()) {
+ t.setDaemon(false);
+ }
+ if (t.getPriority() != Thread.NORM_PRIORITY) {
+ t.setPriority(Thread.NORM_PRIORITY);
+ }
+ t.setUncaughtExceptionHandler(uncaughtExceptionHandler);
+ return t;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/NettyUncaughtExceptionHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/NettyUncaughtExceptionHandler.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/NettyUncaughtExceptionHandler.java
new file mode 100644
index 0000000..fd48bdc
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/NettyUncaughtExceptionHandler.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NettyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(NettyUncaughtExceptionHandler.class);
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ try {
+ Utils.handleUncaughtException(e);
+ } catch (Error error) {
+ LOG.info("Received error in netty thread.. terminating server...");
+ Runtime.getRuntime().exit(1);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslMessageToken.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslMessageToken.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslMessageToken.java
new file mode 100644
index 0000000..ccac70f
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslMessageToken.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import java.io.IOException;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferOutputStream;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Send and receive SASL tokens.
+ */
+public class SaslMessageToken implements INettySerializable {
+ public static final short IDENTIFIER = -500;
+
+ /** Class logger */
+ private static final Logger LOG = LoggerFactory
+ .getLogger(SaslMessageToken.class);
+
+ /** Used for client or server's token to send or receive from each other. */
+ private byte[] token;
+
+ /**
+ * Constructor used for reflection only.
+ */
+ public SaslMessageToken() {
+ }
+
+ /**
+ * Constructor used to send request.
+ *
+ * @param token
+ * the SASL token, generated by a SaslClient or SaslServer.
+ */
+ public SaslMessageToken(byte[] token) {
+ this.token = token;
+ }
+
+ /**
+ * Read accessor for SASL token
+ *
+ * @return saslToken SASL token
+ */
+ public byte[] getSaslToken() {
+ return token;
+ }
+
+ /**
+ * Write accessor for SASL token
+ *
+ * @param token
+ * SASL token
+ */
+ public void setSaslToken(byte[] token) {
+ this.token = token;
+ }
+
+ public int encodeLength() {
+ return 2 + 4 + token.length;
+ }
+
+ /**
+ * encode the current SaslToken Message into a channel buffer
+ * SaslTokenMessageRequest is encoded as: identifier .... short(2)
+ * payload length .... int payload .... byte[]
+ *
+ * @throws IOException
+ */
+ public ChannelBuffer buffer() throws IOException {
+ ChannelBufferOutputStream bout = new ChannelBufferOutputStream(
+ ChannelBuffers.directBuffer(encodeLength()));
+ int payload_len = 0;
+ if (token != null)
+ payload_len = token.length;
+
+ bout.writeShort(IDENTIFIER);
+ bout.writeInt(payload_len);
+
+ if (payload_len > 0) {
+ bout.write(token);
+ }
+ bout.close();
+ return bout.buffer();
+ }
+
+ public static SaslMessageToken read(byte[] serial) {
+ ChannelBuffer sm_buffer = ChannelBuffers.copiedBuffer(serial);
+ short identifier = sm_buffer.readShort();
+ int payload_len = sm_buffer.readInt();
+ if(identifier != IDENTIFIER) {
+ return null;
+ }
+ byte token[] = new byte[payload_len];
+ sm_buffer.readBytes(token, 0, payload_len);
+ return new SaslMessageToken(token);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyClient.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyClient.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyClient.java
new file mode 100644
index 0000000..b24e7d6
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyClient.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import java.io.IOException;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements SASL logic for storm worker client processes.
+ */
+public class SaslNettyClient {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(SaslNettyClient.class);
+
+ /**
+ * Used to respond to server's counterpart, SaslServer with SASL tokens
+ * represented as byte arrays.
+ */
+ private SaslClient saslClient;
+
+ /**
+ * Create a SaslNettyClient for authentication with servers.
+ */
+ public SaslNettyClient(String topologyName, byte[] token) {
+ try {
+ LOG.debug("SaslNettyClient: Creating SASL {} client to authenticate to server ",
+ SaslUtils.AUTH_DIGEST_MD5);
+
+ saslClient = Sasl.createSaslClient(
+ new String[] { SaslUtils.AUTH_DIGEST_MD5 }, null, null,
+ SaslUtils.DEFAULT_REALM, SaslUtils.getSaslProps(),
+ new SaslClientCallbackHandler(topologyName, token));
+
+ } catch (IOException e) {
+ LOG.error("SaslNettyClient: Could not obtain topology token for Netty "
+ + "Client to use to authenticate with a Netty Server.");
+ saslClient = null;
+ }
+ }
+
+ public boolean isComplete() {
+ return saslClient.isComplete();
+ }
+
+ /**
+ * Respond to server's SASL token.
+ *
+ * @param saslTokenMessage
+ * contains server's SASL token
+ * @return client's response SASL token
+ */
+ public byte[] saslResponse(SaslMessageToken saslTokenMessage) {
+ try {
+ return saslClient.evaluateChallenge(saslTokenMessage.getSaslToken());
+ } catch (SaslException e) {
+ LOG.error(
+ "saslResponse: Failed to respond to SASL server's token:",
+ e);
+ return null;
+ }
+ }
+
+ /**
+ * Implementation of javax.security.auth.callback.CallbackHandler that works
+ * with Storm topology tokens.
+ */
+ private static class SaslClientCallbackHandler implements CallbackHandler {
+ /** Generated username contained in TopologyToken */
+ private final String userName;
+ /** Generated password contained in TopologyToken */
+ private final char[] userPassword;
+
+ /**
+ * Set private members using topology token.
+ */
+ public SaslClientCallbackHandler(String topologyToken, byte[] token) {
+ this.userName = SaslUtils
+ .encodeIdentifier(topologyToken.getBytes());
+ this.userPassword = SaslUtils.encodePassword(token);
+ }
+
+ /**
+ * Implementation used to respond to SASL tokens from server.
+ *
+ * @param callbacks
+ * objects that indicate what credential information the
+ * server's SaslServer requires from the client.
+ * @throws UnsupportedCallbackException
+ */
+ public void handle(Callback[] callbacks)
+ throws UnsupportedCallbackException {
+ NameCallback nc = null;
+ PasswordCallback pc = null;
+ RealmCallback rc = null;
+ for (Callback callback : callbacks) {
+ if (callback instanceof RealmChoiceCallback) {
+ continue;
+ } else if (callback instanceof NameCallback) {
+ nc = (NameCallback) callback;
+ } else if (callback instanceof PasswordCallback) {
+ pc = (PasswordCallback) callback;
+ } else if (callback instanceof RealmCallback) {
+ rc = (RealmCallback) callback;
+ } else {
+ throw new UnsupportedCallbackException(callback,
+ "handle: Unrecognized SASL client callback");
+ }
+ }
+ if (nc != null) {
+ LOG.debug("handle: SASL client callback: setting username: {}",
+ userName);
+ nc.setName(userName);
+ }
+ if (pc != null) {
+ LOG.debug("handle: SASL client callback: setting userPassword");
+ pc.setPassword(userPassword);
+ }
+ if (rc != null) {
+ LOG.debug("handle: SASL client callback: setting realm: {}",
+ rc.getDefaultText());
+ rc.setText(rc.getDefaultText());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyClientState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyClientState.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyClientState.java
new file mode 100644
index 0000000..3a7d6a2
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyClientState.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelLocal;
+
+final class SaslNettyClientState {
+
+ public static final ChannelLocal<SaslNettyClient> getSaslNettyClient = new ChannelLocal<SaslNettyClient>() {
+ protected SaslNettyClient initialValue(Channel channel) {
+ return null;
+ }
+ };
+
+}