You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/02/09 02:38:33 UTC
[3/6] storm git commit: STORM-2898: Support for WorkerToken
authentication
http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/digest/ClientCallbackHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/digest/ClientCallbackHandler.java b/storm-client/src/jvm/org/apache/storm/security/auth/digest/ClientCallbackHandler.java
deleted file mode 100644
index 312e4ab..0000000
--- a/storm-client/src/jvm/org/apache/storm/security/auth/digest/ClientCallbackHandler.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.security.auth.digest;
-
-import org.apache.storm.security.auth.AbstractSaslClientCallbackHandler;
-import org.apache.storm.security.auth.AuthUtils;
-
-import javax.security.auth.login.AppConfigurationEntry;
-import javax.security.auth.login.Configuration;
-import java.io.IOException;
-
-/**
- * client side callback handler.
- */
-public class ClientCallbackHandler extends AbstractSaslClientCallbackHandler {
-
- /**
- * Constructor based on a JAAS configuration
- *
- * For digest, you should have a pair of user name and password defined.
- * @throws IOException
- */
- public ClientCallbackHandler(Configuration configuration) throws IOException {
- if (configuration == null) return;
- AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(AuthUtils.LOGIN_CONTEXT_CLIENT);
- if (configurationEntries == null) {
- String errorMessage = "Could not find a '"+AuthUtils.LOGIN_CONTEXT_CLIENT
- + "' entry in this configuration: Client cannot start.";
- throw new IOException(errorMessage);
- }
-
- _password = "";
- for(AppConfigurationEntry entry: configurationEntries) {
- if (entry.getOptions().get(USERNAME) != null) {
- _username = (String)entry.getOptions().get(USERNAME);
- }
- if (entry.getOptions().get(PASSWORD) != null) {
- _password = (String)entry.getOptions().get(PASSWORD);
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java
index 4d123aa..1272712 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java
@@ -15,12 +15,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.security.auth.digest;
import java.io.IOException;
-
+import java.util.Map;
import javax.security.auth.callback.CallbackHandler;
-
+import javax.security.auth.login.AppConfigurationEntry;
+import org.apache.storm.generated.WorkerToken;
+import org.apache.storm.security.auth.AuthUtils;
+import org.apache.storm.security.auth.sasl.SaslTransportPlugin;
+import org.apache.storm.security.auth.sasl.SimpleSaslClientCallbackHandler;
+import org.apache.storm.security.auth.sasl.SimpleSaslServerCallbackHandler;
+import org.apache.storm.security.auth.workertoken.WorkerTokenAuthorizer;
+import org.apache.storm.security.auth.workertoken.WorkerTokenClientCallbackHandler;
import org.apache.thrift.transport.TSaslClientTransport;
import org.apache.thrift.transport.TSaslServerTransport;
import org.apache.thrift.transport.TTransport;
@@ -29,20 +37,19 @@ import org.apache.thrift.transport.TTransportFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.storm.security.auth.AuthUtils;
-import org.apache.storm.security.auth.SaslTransportPlugin;
-
public class DigestSaslTransportPlugin extends SaslTransportPlugin {
public static final String DIGEST = "DIGEST-MD5";
private static final Logger LOG = LoggerFactory.getLogger(DigestSaslTransportPlugin.class);
- protected TTransportFactory getServerTransportFactory() throws IOException {
+ protected TTransportFactory getServerTransportFactory() throws IOException {
//create an authentication callback handler
- CallbackHandler serer_callback_handler = new ServerCallbackHandler(login_conf);
+ CallbackHandler serverCallbackHandler = new SimpleSaslServerCallbackHandler(
+ new WorkerTokenAuthorizer(conf, type),
+ new JassPasswordProvider(loginConf));
//create a transport factory that will invoke our auth callback for digest
TSaslServerTransport.Factory factory = new TSaslServerTransport.Factory();
- factory.addServerDefinition(DIGEST, AuthUtils.SERVICE, "localhost", null, serer_callback_handler);
+ factory.addServerDefinition(DIGEST, AuthUtils.SERVICE, "localhost", null, serverCallbackHandler);
LOG.info("SASL DIGEST-MD5 transport factory will be used");
return factory;
@@ -50,19 +57,46 @@ public class DigestSaslTransportPlugin extends SaslTransportPlugin {
@Override
public TTransport connect(TTransport transport, String serverHost, String asUser) throws TTransportException, IOException {
- ClientCallbackHandler client_callback_handler = new ClientCallbackHandler(login_conf);
- TSaslClientTransport wrapper_transport = new TSaslClientTransport(DIGEST,
+ CallbackHandler clientCallbackHandler;
+ WorkerToken token = WorkerTokenClientCallbackHandler.findWorkerTokenInSubject(type);
+ if (token != null) {
+ clientCallbackHandler = new WorkerTokenClientCallbackHandler(token);
+ } else if (loginConf != null) {
+ AppConfigurationEntry [] configurationEntries = loginConf.getAppConfigurationEntry(AuthUtils.LOGIN_CONTEXT_CLIENT);
+ if (configurationEntries == null) {
+ String errorMessage = "Could not find a '" + AuthUtils.LOGIN_CONTEXT_CLIENT
+ + "' entry in this configuration: Client cannot start.";
+ throw new IOException(errorMessage);
+ }
+
+ String username = "";
+ String password = "";
+ for (AppConfigurationEntry entry : configurationEntries) {
+ Map options = entry.getOptions();
+ username = (String)options.getOrDefault("username", username);
+ password = (String)options.getOrDefault("password", password);
+ }
+ clientCallbackHandler = new SimpleSaslClientCallbackHandler(username, password);
+ } else {
+ throw new IOException("Could not find any way to authenticate with the server.");
+ }
+
+ TSaslClientTransport wrapperTransport = new TSaslClientTransport(DIGEST,
null,
AuthUtils.SERVICE,
serverHost,
null,
- client_callback_handler,
+ clientCallbackHandler,
transport);
- wrapper_transport.open();
+ wrapperTransport.open();
LOG.debug("SASL DIGEST-MD5 client transport has been established");
- return wrapper_transport;
+ return wrapperTransport;
}
+ @Override
+ public boolean areWorkerTokensSupported() {
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/digest/JassPasswordProvider.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/digest/JassPasswordProvider.java b/storm-client/src/jvm/org/apache/storm/security/auth/digest/JassPasswordProvider.java
new file mode 100644
index 0000000..bb3f1bf
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/digest/JassPasswordProvider.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.security.auth.digest;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import org.apache.storm.security.auth.AuthUtils;
+import org.apache.storm.security.auth.ThriftConnectionType;
+import org.apache.storm.security.auth.sasl.PasswordProvider;
+
+/**
+ * Provides passwords out of a jaas conf for typical MD5-DIGEST authentication support.
+ */
+public class JassPasswordProvider implements PasswordProvider {
+ private static final String USER_PREFIX = "user_";
+ /**
+ * The system property that sets a super user password. This can be used in addition to the
+ * jaas conf, and takes precedent over a "super" user in the jaas conf if this is set.
+ */
+ public static final String SYSPROP_SUPER_PASSWORD = "storm.SASLAuthenticationProvider.superPassword";
+
+ private Map<String, char[]> credentials = new ConcurrentHashMap<>();
+
+ /**
+ * Constructor.
+ * @param configuration the jaas configuration to get the credentials out of.
+ * @throws IOException if we could not read the Server section in the jaas conf.
+ */
+ public JassPasswordProvider(Configuration configuration) throws IOException {
+ if (configuration == null) {
+ return;
+ }
+
+ AppConfigurationEntry[] configurationEntries = configuration.getAppConfigurationEntry(AuthUtils.LOGIN_CONTEXT_SERVER);
+ if (configurationEntries == null) {
+ String errorMessage = "Could not find a '" + AuthUtils.LOGIN_CONTEXT_SERVER
+ + "' entry in this configuration: Server cannot start.";
+ throw new IOException(errorMessage);
+ }
+ credentials.clear();
+ for (AppConfigurationEntry entry : configurationEntries) {
+ Map<String, ?> options = entry.getOptions();
+ // Populate user -> password map with JAAS configuration entries from the "Server" section.
+ // Usernames are distinguished from other options by prefixing the username with a "user_" prefix.
+ for (Map.Entry<String, ?> pair : options.entrySet()) {
+ String key = pair.getKey();
+ if (key.startsWith(USER_PREFIX)) {
+ String userName = key.substring(USER_PREFIX.length());
+ credentials.put(userName, ((String) pair.getValue()).toCharArray());
+ }
+ }
+ }
+
+ String superPassword = System.getProperty(SYSPROP_SUPER_PASSWORD);
+ if (superPassword != null) {
+ credentials.put("super", superPassword.toCharArray());
+ }
+ }
+
+ @Override
+ public Optional<char[]> getPasswordFor(String user) {
+ return Optional.ofNullable(credentials.get(user));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/digest/ServerCallbackHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/digest/ServerCallbackHandler.java b/storm-client/src/jvm/org/apache/storm/security/auth/digest/ServerCallbackHandler.java
deleted file mode 100644
index 7c4414f..0000000
--- a/storm-client/src/jvm/org/apache/storm/security/auth/digest/ServerCallbackHandler.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.security.auth.digest;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.storm.security.auth.AbstractSaslServerCallbackHandler;
-import org.apache.storm.security.auth.ReqContext;
-import org.apache.storm.security.auth.SaslTransportPlugin;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.auth.login.AppConfigurationEntry;
-import javax.security.auth.login.Configuration;
-import javax.security.sasl.AuthorizeCallback;
-import javax.security.sasl.RealmCallback;
-
-import org.apache.storm.security.auth.AuthUtils;
-
-/**
- * SASL server side callback handler
- */
-public class ServerCallbackHandler extends AbstractSaslServerCallbackHandler {
- private static final Logger LOG = LoggerFactory.getLogger(ServerCallbackHandler.class);
- private static final String USER_PREFIX = "user_";
- public static final String SYSPROP_SUPER_PASSWORD = "storm.SASLAuthenticationProvider.superPassword";
-
- public ServerCallbackHandler(Configuration configuration) throws IOException {
- if (configuration==null) return;
-
- AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(AuthUtils.LOGIN_CONTEXT_SERVER);
- if (configurationEntries == null) {
- String errorMessage = "Could not find a '"+AuthUtils.LOGIN_CONTEXT_SERVER+"' entry in this configuration: Server cannot start.";
- throw new IOException(errorMessage);
- }
- credentials.clear();
- for(AppConfigurationEntry entry: configurationEntries) {
- Map<String,?> options = entry.getOptions();
- // Populate DIGEST-MD5 user -> password map with JAAS configuration entries from the "Server" section.
- // Usernames are distinguished from other options by prefixing the username with a "user_" prefix.
- for(Map.Entry<String, ?> pair : options.entrySet()) {
- String key = pair.getKey();
- if (key.startsWith(USER_PREFIX)) {
- String userName = key.substring(USER_PREFIX.length());
- credentials.put(userName,(String)pair.getValue());
- }
- }
- }
- }
-
- @Override
- protected void handlePasswordCallback(PasswordCallback pc) {
- LOG.debug("handlePasswordCallback");
- if ("super".equals(this.userName) && System.getProperty(SYSPROP_SUPER_PASSWORD) != null) {
- // superuser: use Java system property for password, if available.
- pc.setPassword(System.getProperty(SYSPROP_SUPER_PASSWORD).toCharArray());
- } else {
- super.handlePasswordCallback(pc);
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
index b6571ba..157ae54 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
@@ -35,8 +35,12 @@ import javax.security.auth.login.Configuration;
import javax.security.auth.login.LoginException;
import javax.security.sasl.Sasl;
+import org.apache.storm.generated.WorkerToken;
import org.apache.storm.messaging.netty.Login;
import org.apache.commons.lang.StringUtils;
+import org.apache.storm.security.auth.sasl.SimpleSaslServerCallbackHandler;
+import org.apache.storm.security.auth.workertoken.WorkerTokenAuthorizer;
+import org.apache.storm.security.auth.workertoken.WorkerTokenClientCallbackHandler;
import org.apache.thrift.transport.TSaslClientTransport;
import org.apache.thrift.transport.TSaslServerTransport;
import org.apache.thrift.transport.TTransport;
@@ -47,10 +51,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.storm.security.auth.AuthUtils;
-import org.apache.storm.security.auth.SaslTransportPlugin;
+import org.apache.storm.security.auth.sasl.SaslTransportPlugin;
public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
- public static final String KERBEROS = "GSSAPI";
+ public static final String KERBEROS = "GSSAPI";
+ private static final String DIGEST = "DIGEST-MD5";
private static final Logger LOG = LoggerFactory.getLogger(KerberosSaslTransportPlugin.class);
private static Map <LoginCacheKey, Login> loginCache = new ConcurrentHashMap<>();
private static final String DISABLE_LOGIN_CACHE = "disableLoginCache";
@@ -93,15 +98,16 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
}
}
+ @Override
public TTransportFactory getServerTransportFactory() throws IOException {
//create an authentication callback handler
- CallbackHandler server_callback_handler = new ServerCallbackHandler(login_conf, topoConf);
+ CallbackHandler server_callback_handler = new ServerCallbackHandler(loginConf);
//login our principal
Subject subject = null;
try {
//specify a configuration object to be used
- Configuration.setConfiguration(login_conf);
+ Configuration.setConfiguration(loginConf);
//now login
Login login = new Login(AuthUtils.LOGIN_CONTEXT_SERVER, server_callback_handler);
subject = login.getSubject();
@@ -114,15 +120,15 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
//check the credential of our principal
if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) {
throw new RuntimeException("Fail to verify user principal with section \""
- +AuthUtils.LOGIN_CONTEXT_SERVER+"\" in login configuration file "+ login_conf);
+ + AuthUtils.LOGIN_CONTEXT_SERVER + "\" in login configuration file " + loginConf);
}
- String principal = AuthUtils.get(login_conf, AuthUtils.LOGIN_CONTEXT_SERVER, "principal");
+ String principal = AuthUtils.get(loginConf, AuthUtils.LOGIN_CONTEXT_SERVER, "principal");
LOG.debug("principal:"+principal);
KerberosName serviceKerberosName = new KerberosName(principal);
String serviceName = serviceKerberosName.getServiceName();
String hostName = serviceKerberosName.getHostName();
- Map<String, String> props = new TreeMap<String,String>();
+ Map<String, String> props = new TreeMap<>();
props.put(Sasl.QOP, "auth");
props.put(Sasl.SERVER_AUTH, "false");
@@ -130,6 +136,10 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
TSaslServerTransport.Factory factory = new TSaslServerTransport.Factory();
factory.addServerDefinition(KERBEROS, serviceName, hostName, props, server_callback_handler);
+ //Also add in support for worker tokens
+ factory.addServerDefinition(DIGEST, AuthUtils.SERVICE, "localhost", null,
+ new SimpleSaslServerCallbackHandler(new WorkerTokenAuthorizer(conf, type)));
+
//create a wrap transport factory so that we could apply user credential during connections
TUGIAssumingTransportFactory wrapFactory = new TUGIAssumingTransportFactory(factory, subject);
@@ -140,9 +150,9 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
private Login mkLogin() throws IOException {
try {
//create an authentication callback handler
- ClientCallbackHandler client_callback_handler = new ClientCallbackHandler(login_conf);
+ ClientCallbackHandler client_callback_handler = new ClientCallbackHandler(loginConf);
//specify a configuration object to be used
- Configuration.setConfiguration(login_conf);
+ Configuration.setConfiguration(loginConf);
//now login
Login login = new Login(AuthUtils.LOGIN_CONTEXT_CLIENT, client_callback_handler);
login.startThreadIfNeeded();
@@ -154,9 +164,28 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
}
@Override
- public TTransport connect(TTransport transport, String serverHost, String asUser) throws TTransportException, IOException {
+ public TTransport connect(TTransport transport, String serverHost, String asUser) throws IOException, TTransportException {
+ WorkerToken token = WorkerTokenClientCallbackHandler.findWorkerTokenInSubject(type);
+ if (token != null && asUser != null) {
+ CallbackHandler clientCallbackHandler = new WorkerTokenClientCallbackHandler(token);
+ TSaslClientTransport wrapperTransport = new TSaslClientTransport(DIGEST,
+ null,
+ AuthUtils.SERVICE,
+ serverHost,
+ null,
+ clientCallbackHandler,
+ transport);
+ wrapperTransport.open();
+ LOG.debug("SASL DIGEST-MD5 WorkerToken client transport has been established");
+
+ return wrapperTransport;
+ }
+ return kerberosConnect(transport, serverHost, asUser);
+ }
+
+ private TTransport kerberosConnect(TTransport transport, String serverHost, String asUser) throws IOException {
//login our user
- SortedMap<String, ?> authConf = AuthUtils.pullConfig(login_conf, AuthUtils.LOGIN_CONTEXT_CLIENT);
+ SortedMap<String, ?> authConf = AuthUtils.pullConfig(loginConf, AuthUtils.LOGIN_CONTEXT_CLIENT);
if (authConf == null) {
throw new RuntimeException("Error in parsing the kerberos login Configuration, returned null");
}
@@ -194,15 +223,15 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
final Subject subject = login.getSubject();
if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) { //error
throw new RuntimeException("Fail to verify user principal with section \""
- +AuthUtils.LOGIN_CONTEXT_CLIENT+"\" in login configuration file "+ login_conf);
+ +AuthUtils.LOGIN_CONTEXT_CLIENT+"\" in login configuration file "+ loginConf);
}
final String principal = StringUtils.isBlank(asUser) ? getPrincipal(subject) : asUser;
- String serviceName = AuthUtils.get(login_conf, AuthUtils.LOGIN_CONTEXT_CLIENT, "serviceName");
+ String serviceName = AuthUtils.get(loginConf, AuthUtils.LOGIN_CONTEXT_CLIENT, "serviceName");
if (serviceName == null) {
serviceName = AuthUtils.SERVICE;
}
- Map<String, String> props = new TreeMap<String,String>();
+ Map<String, String> props = new TreeMap<>();
props.put(Sasl.QOP, "auth");
props.put(Sasl.SERVER_AUTH, "false");
@@ -246,7 +275,8 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
return ((Principal)(principals.toArray()[0])).getName();
}
- /** A TransportFactory that wraps another one, but assumes a specified UGI
+ /**
+ * A TransportFactory that wraps another one, but assumes a specified UGI
* before calling through.
*
* This is used on the server side to assume the server's Principal when accepting
@@ -269,22 +299,25 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
public TTransport getTransport(final TTransport trans) {
try {
return Subject.doAs(subject,
- new PrivilegedExceptionAction<TTransport>() {
- public TTransport run() {
+ (PrivilegedExceptionAction<TTransport>) () -> {
try {
return wrapped.getTransport(trans);
}
catch (Exception e) {
LOG.debug("Storm server failed to open transport " +
- "to interact with a client during session initiation: " + e, e);
+ "to interact with a client during session initiation: " + e, e);
return new NoOpTTrasport(null);
}
- }
- });
+ });
} catch (PrivilegedActionException e) {
LOG.error("Storm server experienced a PrivilegedActionException exception while creating a transport using a JAAS principal context:" + e, e);
return null;
}
}
}
+
+ @Override
+ public boolean areWorkerTokensSupported() {
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java
index 59eb80d..d3157c8 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java
@@ -18,79 +18,87 @@
package org.apache.storm.security.auth.kerberos;
+import javax.security.sasl.RealmCallback;
import org.apache.storm.security.auth.AuthUtils;
import org.apache.storm.security.auth.ReqContext;
-import org.apache.storm.security.auth.SaslTransportPlugin;
+import org.apache.storm.security.auth.sasl.SaslTransportPlugin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.security.auth.Subject;
import javax.security.auth.callback.*;
import javax.security.auth.login.AppConfigurationEntry;
import javax.security.auth.login.Configuration;
import javax.security.sasl.AuthorizeCallback;
import java.io.IOException;
-import java.util.Map;
/**
- * SASL server side callback handler
+ * SASL server side callback handler for kerberos auth.
*/
public class ServerCallbackHandler implements CallbackHandler {
private static final Logger LOG = LoggerFactory.getLogger(ServerCallbackHandler.class);
- private String userName;
-
- public ServerCallbackHandler(Configuration configuration, Map<String, Object> topoConf) throws IOException {
- if (configuration==null) return;
+ public ServerCallbackHandler(Configuration configuration) throws IOException {
+ if (configuration == null) {
+ return;
+ }
AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(AuthUtils.LOGIN_CONTEXT_SERVER);
if (configurationEntries == null) {
- String errorMessage = "Could not find a '"+AuthUtils.LOGIN_CONTEXT_SERVER+"' entry in this configuration: Server cannot start.";
+ String errorMessage = "Could not find a '" + AuthUtils.LOGIN_CONTEXT_SERVER
+ + "' entry in this configuration: Server cannot start.";
LOG.error(errorMessage);
throw new IOException(errorMessage);
}
-
}
public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
+ NameCallback nc = null;
+ PasswordCallback pc = null;
+ AuthorizeCallback ac = null;
for (Callback callback : callbacks) {
- if (callback instanceof NameCallback) {
- handleNameCallback((NameCallback) callback);
+ if (callback instanceof AuthorizeCallback) {
+ ac = (AuthorizeCallback) callback;
+ } else if (callback instanceof NameCallback) {
+ nc = (NameCallback) callback;
} else if (callback instanceof PasswordCallback) {
- handlePasswordCallback((PasswordCallback) callback);
- } else if (callback instanceof AuthorizeCallback) {
- handleAuthorizeCallback((AuthorizeCallback) callback);
+ pc = (PasswordCallback) callback;
+ } else if (callback instanceof RealmCallback) {
+ //Ignored...
+ } else {
+ throw new UnsupportedCallbackException(callback,
+ "Unrecognized SASL Callback");
}
}
- }
- private void handleNameCallback(NameCallback nc) {
- LOG.debug("handleNameCallback");
- userName = nc.getDefaultName();
- nc.setName(nc.getDefaultName());
- }
+ String userName = "UNKNOWN";
+ if (nc != null) {
+ LOG.debug("handleNameCallback");
+ userName = nc.getDefaultName();
+ nc.setName(nc.getDefaultName());
+ }
- private void handlePasswordCallback(PasswordCallback pc) {
- LOG.warn("No password found for user: " + userName);
- }
+ if (pc != null) {
+ LOG.warn("No password found for user: {}", userName);
+ }
- private void handleAuthorizeCallback(AuthorizeCallback ac) {
- String authenticationID = ac.getAuthenticationID();
- LOG.info("Successfully authenticated client: authenticationID=" + authenticationID + " authorizationID= " + ac.getAuthorizationID());
+ if (ac != null) {
+ String authenticationID = ac.getAuthenticationID();
+ LOG.info("Successfully authenticated client: authenticationID=" + authenticationID + " authorizationID= " + ac.getAuthorizationID());
- //if authorizationId is not set, set it to authenticationId.
- if(ac.getAuthorizationID() == null) {
- ac.setAuthorizedID(authenticationID);
- }
+ //if authorizationId is not set, set it to authenticationId.
+ if (ac.getAuthorizationID() == null) {
+ ac.setAuthorizedID(authenticationID);
+ }
- //When authNid and authZid are not equal , authNId is attempting to impersonate authZid, We
- //add the authNid as the real user in reqContext's subject which will be used during authorization.
- if(!ac.getAuthenticationID().equals(ac.getAuthorizationID())) {
- ReqContext.context().setRealPrincipal(new SaslTransportPlugin.User(ac.getAuthenticationID()));
- } else {
- ReqContext.context().setRealPrincipal(null);
- }
+ //When authNid and authZid are not equal , authNId is attempting to impersonate authZid, We
+ //add the authNid as the real user in reqContext's subject which will be used during authorization.
+ if (!ac.getAuthenticationID().equals(ac.getAuthorizationID())) {
+ ReqContext.context().setRealPrincipal(new SaslTransportPlugin.User(ac.getAuthenticationID()));
+ } else {
+ ReqContext.context().setRealPrincipal(null);
+ }
- ac.setAuthorized(true);
+ ac.setAuthorized(true);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainClientCallbackHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainClientCallbackHandler.java b/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainClientCallbackHandler.java
index 13340df..b01cdc4 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainClientCallbackHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainClientCallbackHandler.java
@@ -15,17 +15,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.security.auth.plain;
-import org.apache.storm.security.auth.AbstractSaslClientCallbackHandler;
+import org.apache.storm.security.auth.sasl.SimpleSaslClientCallbackHandler;
-public class PlainClientCallbackHandler extends AbstractSaslClientCallbackHandler {
+/**
+ * This should only ever be used for testing. It provides no security at all.
+ * DO NOT USE THIS. The user name is the current user and the password is
+ * "password".
+ */
+@Deprecated
+public class PlainClientCallbackHandler extends SimpleSaslClientCallbackHandler {
- /*
+ /**
* For plain, using constants for a pair of user name and password.
*/
public PlainClientCallbackHandler() {
- _username = System.getProperty("user.name");
- _password = PASSWORD;
+ super(System.getProperty("user.name"), "password");
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainSaslTransportPlugin.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainSaslTransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainSaslTransportPlugin.java
index eaef91a..2df61c1 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainSaslTransportPlugin.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainSaslTransportPlugin.java
@@ -15,10 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.security.auth.plain;
+import java.util.Optional;
import org.apache.storm.security.auth.AuthUtils;
-import org.apache.storm.security.auth.SaslTransportPlugin;
+import org.apache.storm.security.auth.sasl.SaslTransportPlugin;
+import org.apache.storm.security.auth.sasl.SimpleSaslServerCallbackHandler;
import org.apache.thrift.transport.TSaslClientTransport;
import org.apache.thrift.transport.TSaslServerTransport;
import org.apache.thrift.transport.TTransport;
@@ -31,6 +34,11 @@ import javax.security.auth.callback.CallbackHandler;
import java.io.IOException;
import java.security.Security;
+/**
+ * This should never be used except for testing. It provides no security at all.
+ * The password is hard coded, and even if it were not it is sent in plain text.
+ */
+@Deprecated
public class PlainSaslTransportPlugin extends SaslTransportPlugin {
public static final String PLAIN = "PLAIN";
private static final Logger LOG = LoggerFactory.getLogger(PlainSaslTransportPlugin.class);
@@ -38,7 +46,7 @@ public class PlainSaslTransportPlugin extends SaslTransportPlugin {
@Override
protected TTransportFactory getServerTransportFactory() throws IOException {
//create an authentication callback handler
- CallbackHandler serverCallbackHandler = new PlainServerCallbackHandler();
+ CallbackHandler serverCallbackHandler = new SimpleSaslServerCallbackHandler((userName) -> Optional.of("password".toCharArray()));
if (Security.getProvider(SaslPlainServer.SecurityProvider.SASL_PLAIN_SERVER) == null) {
Security.addProvider(new SaslPlainServer.SecurityProvider());
}
@@ -46,7 +54,7 @@ public class PlainSaslTransportPlugin extends SaslTransportPlugin {
TSaslServerTransport.Factory factory = new TSaslServerTransport.Factory();
factory.addServerDefinition(PLAIN, AuthUtils.SERVICE, "localhost", null, serverCallbackHandler);
- LOG.info("SASL PLAIN transport factory will be used");
+ LOG.error("SASL PLAIN transport factory will be used. This is totally insecure. Please do not use this.");
return factory;
}
@@ -62,10 +70,8 @@ public class PlainSaslTransportPlugin extends SaslTransportPlugin {
transport);
wrapperTransport.open();
- LOG.debug("SASL PLAIN client transport has been established");
+ LOG.error("SASL PLAIN client transport has been established. This is totally insecure. Please do not use this.");
return wrapperTransport;
-
}
-
}
http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainServerCallbackHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainServerCallbackHandler.java b/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainServerCallbackHandler.java
deleted file mode 100644
index c646fc9..0000000
--- a/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainServerCallbackHandler.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.security.auth.plain;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.storm.security.auth.AbstractSaslServerCallbackHandler;
-import org.apache.storm.security.auth.ReqContext;
-import org.apache.storm.security.auth.SaslTransportPlugin;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.sasl.AuthorizeCallback;
-import javax.security.sasl.RealmCallback;
-
-/**
- * SASL server side callback handler
- */
-public class PlainServerCallbackHandler extends AbstractSaslServerCallbackHandler {
- private static final Logger LOG = LoggerFactory.getLogger(PlainServerCallbackHandler.class);
- public static final String PASSWORD = "password";
-
- public PlainServerCallbackHandler() throws IOException {
- userName=null;
- }
-
- protected void handlePasswordCallback(PasswordCallback pc) {
- LOG.debug("handlePasswordCallback");
- pc.setPassword(PASSWORD.toCharArray());
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/plain/SaslPlainServer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/plain/SaslPlainServer.java b/storm-client/src/jvm/org/apache/storm/security/auth/plain/SaslPlainServer.java
index c84ce77..62ee872 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/plain/SaslPlainServer.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/plain/SaslPlainServer.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.security.auth.plain;
import javax.security.auth.callback.Callback;
@@ -29,6 +30,7 @@ import javax.security.sasl.SaslServerFactory;
import java.security.Provider;
import java.util.Map;
+@Deprecated
public class SaslPlainServer implements SaslServer {
@SuppressWarnings("serial")
public static class SecurityProvider extends Provider {
http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/sasl/PasswordProvider.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/sasl/PasswordProvider.java b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/PasswordProvider.java
new file mode 100644
index 0000000..d1e0c0f
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/PasswordProvider.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.security.auth.sasl;
+
+import java.util.Optional;
+
+/**
+ * A very basic API that will provide a password for a given user name.
+ * This is intended to be used with the SimpleSaslServerCallbackHandler
+ * to verify a user that is attempting to log in.
+ */
+public interface PasswordProvider {
+ /**
+ * Get an optional password for a user. If no password for the user is found
+ * the option will be empty and another PasswordProvider would be tried.
+ * @param user the user this is for.
+ * @return the password if it is found.
+ */
+ Optional<char[]> getPasswordFor(String user);
+
+ /**
+ * Convert the supplied user name to the actual user name that should be used
+ * in the system. This may be called on any name. If it cannot be translated
+ * then a null may be returned or an exception thrown. If getPassword returns successfully
+ * this should not return null, nor throw an exception for the same user.
+ * @param user the SASL negotiated user name.
+ * @return the user name that storm should use.
+ */
+ default String userName(String user) {
+ return user;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java
new file mode 100644
index 0000000..71cdbf5
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.security.auth.sasl;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.security.Principal;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import javax.security.auth.Subject;
+import javax.security.auth.login.Configuration;
+import javax.security.sasl.SaslServer;
+import org.apache.storm.security.auth.ITransportPlugin;
+import org.apache.storm.security.auth.ReqContext;
+import org.apache.storm.security.auth.ThriftConnectionType;
+import org.apache.storm.security.auth.kerberos.NoOpTTrasport;
+import org.apache.storm.utils.ExtendedThreadPoolExecutor;
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TSaslServerTransport;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.TTransportFactory;
+
+/**
+ * Base class for SASL authentication plugin.
+ */
+public abstract class SaslTransportPlugin implements ITransportPlugin {
+ protected ThriftConnectionType type;
+ protected Map<String, Object> conf;
+ protected Configuration loginConf;
+ private int port;
+
+ @Override
+ public void prepare(ThriftConnectionType type, Map<String, Object> conf, Configuration loginConf) {
+ this.type = type;
+ this.conf = conf;
+ this.loginConf = loginConf;
+ }
+
+ @Override
+ public TServer getServer(TProcessor processor) throws IOException, TTransportException {
+ int configuredPort = type.getPort(conf);
+ Integer socketTimeout = type.getSocketTimeOut(conf);
+ TTransportFactory serverTransportFactory = getServerTransportFactory();
+ TServerSocket serverTransport = null;
+ if (socketTimeout != null) {
+ serverTransport = new TServerSocket(configuredPort, socketTimeout);
+ } else {
+ serverTransport = new TServerSocket(configuredPort);
+ }
+ this.port = serverTransport.getServerSocket().getLocalPort();
+ int numWorkerThreads = type.getNumThreads(conf);
+ Integer queueSize = type.getQueueSize(conf);
+
+ TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport)
+ .processor(new TUGIWrapProcessor(processor))
+ .minWorkerThreads(numWorkerThreads)
+ .maxWorkerThreads(numWorkerThreads)
+ .protocolFactory(new TBinaryProtocol.Factory(false, true));
+
+ if (serverTransportFactory != null) {
+ serverArgs.transportFactory(serverTransportFactory);
+ }
+ BlockingQueue workQueue = new SynchronousQueue();
+ if (queueSize != null) {
+ workQueue = new ArrayBlockingQueue(queueSize);
+ }
+ ThreadPoolExecutor executorService = new ExtendedThreadPoolExecutor(numWorkerThreads, numWorkerThreads,
+ 60, TimeUnit.SECONDS, workQueue);
+ serverArgs.executorService(executorService);
+ return new TThreadPoolServer(serverArgs);
+ }
+
+ /**
+ * Create the transport factory needed for serving. All subclass must implement this method.
+ * @return server transport factory
+ * @throws IOException on any error.
+ */
+ protected abstract TTransportFactory getServerTransportFactory() throws IOException;
+
+ @Override
+ public int getPort() {
+ return this.port;
+ }
+
+
+ /**
+ * Processor that pulls the SaslServer object out of the transport, and
+ * assumes the remote user's UGI before calling through to the original
+ * processor. This is used on the server side to set the UGI for each specific call.
+ */
+ private static class TUGIWrapProcessor implements TProcessor {
+ final TProcessor wrapped;
+
+ TUGIWrapProcessor(TProcessor wrapped) {
+ this.wrapped = wrapped;
+ }
+
+ public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException {
+ //populating request context
+ ReqContext reqContext = ReqContext.context();
+
+ TTransport trans = inProt.getTransport();
+ //Sasl transport
+ TSaslServerTransport saslTrans = (TSaslServerTransport)trans;
+
+ if (trans instanceof NoOpTTrasport) {
+ return false;
+ }
+
+ //remote address
+ TSocket tsocket = (TSocket)saslTrans.getUnderlyingTransport();
+ Socket socket = tsocket.getSocket();
+ reqContext.setRemoteAddress(socket.getInetAddress());
+
+ //remote subject
+ SaslServer saslServer = saslTrans.getSaslServer();
+ String authId = saslServer.getAuthorizationID();
+ Subject remoteUser = new Subject();
+ remoteUser.getPrincipals().add(new User(authId));
+ reqContext.setSubject(remoteUser);
+
+ //invoke service handler
+ return wrapped.process(inProt, outProt);
+ }
+ }
+
+ public static class User implements Principal {
+ private final String name;
+
+ public User(String name) {
+ this.name = name;
+ }
+
+ /**
+ * Get the full name of the user.
+ */
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ return !(o == null || getClass() != o.getClass()) && (name.equals(((User) o).name));
+ }
+
+ @Override
+ public int hashCode() {
+ return name.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SimpleSaslClientCallbackHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SimpleSaslClientCallbackHandler.java b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SimpleSaslClientCallbackHandler.java
new file mode 100644
index 0000000..2242f0c
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SimpleSaslClientCallbackHandler.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.security.auth.sasl;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+
+/**
+ * A client callback handler that supports a single username and password.
+ */
+public class SimpleSaslClientCallbackHandler implements CallbackHandler {
+ private final String username;
+ private final String password;
+
+ /**
+ * Constructor.
+ * @param username the username to use.
+ * @param password the password to use.
+ */
+ public SimpleSaslClientCallbackHandler(String username, String password) {
+ this.username = username;
+ this.password = password;
+ }
+
+ @Override
+ public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
+ for (Callback c : callbacks) {
+ if (c instanceof NameCallback) {
+ NameCallback nc = (NameCallback) c;
+ nc.setName(username);
+ } else if (c instanceof PasswordCallback) {
+ PasswordCallback pc = (PasswordCallback)c;
+ if (password != null) {
+ pc.setPassword(password.toCharArray());
+ }
+ } else if (c instanceof AuthorizeCallback) {
+ AuthorizeCallback ac = (AuthorizeCallback) c;
+ String authid = ac.getAuthenticationID();
+ String authzid = ac.getAuthorizationID();
+ if (authid.equals(authzid)) {
+ ac.setAuthorized(true);
+ } else {
+ ac.setAuthorized(false);
+ }
+ if (ac.isAuthorized()) {
+ ac.setAuthorizedID(authzid);
+ }
+ } else if (c instanceof RealmCallback) {
+ RealmCallback rc = (RealmCallback) c;
+ ((RealmCallback) c).setText(rc.getDefaultText());
+ } else {
+ throw new UnsupportedCallbackException(c);
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SimpleSaslServerCallbackHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SimpleSaslServerCallbackHandler.java b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SimpleSaslServerCallbackHandler.java
new file mode 100644
index 0000000..d64fa1b
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SimpleSaslServerCallbackHandler.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.security.auth.sasl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+import org.apache.storm.security.auth.ReqContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SimpleSaslServerCallbackHandler implements CallbackHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(SimpleSaslServerCallbackHandler.class);
+ private final List<PasswordProvider> providers;
+
+ /**
+ * Constructor with different password providers.
+ * @param providers what will provide a password. They will be checked in order, and the first one to
+ * return a password wins.
+ */
+ public SimpleSaslServerCallbackHandler(PasswordProvider ... providers) {
+ this(Arrays.asList(providers));
+ }
+
+ /**
+ * Constructor with different password providers.
+ * @param providers what will provide a password. They will be checked in order, and the first one to
+ * return a password wins.
+ */
+ public SimpleSaslServerCallbackHandler(List<PasswordProvider> providers) {
+ this.providers = new ArrayList<>(providers);
+ }
+
+ private static void log(String type, AuthorizeCallback ac, NameCallback nc, PasswordCallback pc, RealmCallback rc) {
+ if (LOG.isDebugEnabled()) {
+ String acs = "null";
+ if (ac != null) {
+ acs = "athz: " + ac.getAuthorizationID() + " athn: " + ac.getAuthenticationID() + " authorized: " + ac.getAuthorizedID();
+ }
+
+ String ncs = "null";
+ if (nc != null) {
+ ncs = "default: " + nc.getDefaultName() + " name: " + nc.getName();
+ }
+
+ String pcs = "null";
+ if (pc != null) {
+ char[] pwd = pc.getPassword();
+ pcs = "password: " + (pwd == null ? "null" : "not null " + pwd.length);
+ }
+
+ String rcs = "null";
+ if (rc != null) {
+ rcs = "default: " + rc.getDefaultText() + " text: " + rc.getText();
+ }
+ LOG.debug("{}\nAC: {}\nNC: {}\nPC: {}\nRC: {}", type, acs, ncs, pcs, rcs);
+ }
+ }
+
+ private String translateName(String orig) {
+ for (PasswordProvider provider: providers) {
+ try {
+ String ret = provider.userName(orig);
+ if (ret != null) {
+ return ret;
+ }
+ } catch (Exception e) {
+ //Translating the name (this call) happens in a different callback from validating
+ // the user name and password. This has to be stateless though, so we cannot save
+ // the password provider away to be sure we got the same one that validated the password.
+ // If the password providers are written correctly this should never happen,
+ // because if they cannot read the name they would return a null.
+ // But on the off chance that something goes wrong with the translation because of a mismatch
+ // we try to skip the bad one.
+ LOG.debug("{} could not read name from {}", provider, orig, e);
+ }
+ }
+ // In the worst case we will return a serialized name after a password provider said that the password
+ // was okay. In that case the ACLs are likely to prevent the request from going through anyways.
+ // But that is only if there is a bug in one of the password providers.
+ return orig;
+ }
+
+ @Override
+ public void handle(Callback[] callbacks) throws UnsupportedCallbackException, IOException {
+ NameCallback nc = null;
+ PasswordCallback pc = null;
+ AuthorizeCallback ac = null;
+ RealmCallback rc = null;
+ for (Callback callback : callbacks) {
+ if (callback instanceof AuthorizeCallback) {
+ ac = (AuthorizeCallback) callback;
+ } else if (callback instanceof NameCallback) {
+ nc = (NameCallback) callback;
+ } else if (callback instanceof PasswordCallback) {
+ pc = (PasswordCallback) callback;
+ } else if (callback instanceof RealmCallback) {
+ rc = (RealmCallback) callback;
+ } else {
+ throw new UnsupportedCallbackException(callback,
+ "Unrecognized SASL Callback");
+ }
+ }
+
+ log("GOT", ac, nc, pc, rc);
+
+ if (nc != null) {
+ String userName = nc.getDefaultName();
+ boolean passwordFound = false;
+ for (PasswordProvider provider : providers) {
+ Optional<char[]> password = provider.getPasswordFor(userName);
+ if (password.isPresent()) {
+ pc.setPassword(password.get());
+ nc.setName(provider.userName(userName));
+ passwordFound = true;
+ break;
+ }
+ }
+ if (!passwordFound) {
+ LOG.warn("No password found for user: {}", userName);
+ throw new IOException("NOT ALLOWED.");
+ }
+ }
+
+ if (rc != null) {
+ rc.setText(rc.getDefaultText());
+ }
+
+ if (ac != null) {
+ String nid = ac.getAuthenticationID();
+ if (nid != null) {
+ nid = translateName(nid);
+ }
+
+ String zid = ac.getAuthorizationID();
+ if (zid != null) {
+ zid = translateName(zid);
+ }
+ LOG.info("Successfully authenticated client: authenticationID = {} authorizationID = {}",
+ nid, zid);
+
+ //if authorizationId is not set, set it to authenticationId.
+ if (zid == null) {
+ ac.setAuthorizedID(nid);
+ zid = nid;
+ } else {
+ ac.setAuthorizedID(zid);
+ }
+
+ //When zid and zid are not equal, nid is attempting to impersonate zid, We
+ //add the nid as the real user in reqContext's subject which will be used during authorization.
+ if (!nid.equals(zid)) {
+ LOG.info("Impersonation attempt authenticationID = {} authorizationID = {}",
+ nid, zid);
+ ReqContext.context().setRealPrincipal(new SaslTransportPlugin.User(nid));
+ } else {
+ ReqContext.context().setRealPrincipal(null);
+ }
+
+ ac.setAuthorized(true);
+ }
+ log("FINISHED", ac, nc, pc, rc);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java b/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java
new file mode 100644
index 0000000..b196ade
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.security.auth.workertoken;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import java.util.Base64;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import javax.crypto.SecretKey;
+import javax.crypto.spec.SecretKeySpec;
+import org.apache.storm.cluster.ClusterStateContext;
+import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.generated.PrivateWorkerKey;
+import org.apache.storm.generated.WorkerTokenInfo;
+import org.apache.storm.generated.WorkerTokenServiceType;
+import org.apache.storm.security.auth.AuthUtils;
+import org.apache.storm.security.auth.ThriftConnectionType;
+import org.apache.storm.security.auth.sasl.PasswordProvider;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Allow for SASL authentication using worker tokens.
+ */
+public class WorkerTokenAuthorizer implements PasswordProvider {
+ private static final Logger LOG = LoggerFactory.getLogger(WorkerTokenAuthorizer.class);
+
+ private static IStormClusterState buildStateIfNeeded(Map<String, Object> conf, ThriftConnectionType connectionType) {
+ IStormClusterState state = null;
+
+ if (AuthUtils.areWorkerTokensEnabledServer(connectionType, conf)) {
+ try {
+ state = ClusterUtils.mkStormClusterState(conf, new ClusterStateContext(DaemonType.UNKNOWN, conf));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return state;
+ }
+
+ private final LoadingCache<WorkerTokenInfo, PrivateWorkerKey> keyCache;
+
+ /**
+ * Constructor.
+ * @param conf the daemon config for the server.
+ * @param connectionType the type of connection we are authorizing.
+ */
+ public WorkerTokenAuthorizer(Map<String, Object> conf, ThriftConnectionType connectionType) {
+ this(connectionType.getWtType(), buildStateIfNeeded(conf, connectionType));
+ }
+
+ @VisibleForTesting
+ WorkerTokenAuthorizer(final WorkerTokenServiceType serviceType, final IStormClusterState state) {
+ LoadingCache<WorkerTokenInfo, PrivateWorkerKey> tmpKeyCache = null;
+ if (state != null) {
+ tmpKeyCache =
+ CacheBuilder.newBuilder()
+ .maximumSize(2_000)
+ .expireAfterWrite(2, TimeUnit.HOURS)
+ .build(new CacheLoader<WorkerTokenInfo, PrivateWorkerKey>() {
+
+ @Override
+ public PrivateWorkerKey load(WorkerTokenInfo wtInfo) {
+ return state.getPrivateWorkerKey(serviceType,
+ wtInfo.get_topologyId(),
+ wtInfo.get_secretVersion());
+ }
+ });
+ }
+ keyCache = tmpKeyCache;
+ }
+
+ @VisibleForTesting
+ byte[] getSignedPasswordFor(byte[] user, WorkerTokenInfo deser) {
+ assert keyCache != null;
+
+ if (deser.is_set_expirationTimeMillis() && deser.get_expirationTimeMillis() <= Time.currentTimeMillis()) {
+ throw new IllegalArgumentException("Token is not valid, token has expired.");
+ }
+
+ PrivateWorkerKey key = keyCache.getUnchecked(deser);
+ if (key == null) {
+ throw new IllegalArgumentException("Token is not valid, private key not found.");
+ }
+
+ if (key.is_set_expirationTimeMillis() && key.get_expirationTimeMillis() <= Time.currentTimeMillis()) {
+ throw new IllegalArgumentException("Token is not valid, key has expired.");
+ }
+
+ return WorkerTokenSigner.createPassword(user, new SecretKeySpec(key.get_key(), WorkerTokenSigner.DEFAULT_HMAC_ALGORITHM));
+ }
+
+ @Override
+ public Optional<char[]> getPasswordFor(String userName) {
+ if (keyCache == null) {
+ return Optional.empty();
+ }
+ try {
+ byte[] user = Base64.getDecoder().decode(userName);
+ WorkerTokenInfo deser = Utils.deserialize(user, WorkerTokenInfo.class);
+ byte[] password = getSignedPasswordFor(user, deser);
+ return Optional.of(Base64.getEncoder().encodeToString(password).toCharArray());
+ } catch (Exception e) {
+ LOG.debug("Could not decode {}, might just be a plain digest request...", userName, e);
+ return Optional.empty();
+ }
+ }
+
+ @Override
+ public String userName(String userName) {
+ byte[] user = Base64.getDecoder().decode(userName);
+ WorkerTokenInfo deser = Utils.deserialize(user, WorkerTokenInfo.class);
+ return deser.get_userName();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenClientCallbackHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenClientCallbackHandler.java b/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenClientCallbackHandler.java
new file mode 100644
index 0000000..8bdc1be
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenClientCallbackHandler.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.security.auth.workertoken;
+
+import java.security.AccessController;
+import java.util.Base64;
+import javax.security.auth.Subject;
+import org.apache.storm.generated.WorkerToken;
+import org.apache.storm.generated.WorkerTokenServiceType;
+import org.apache.storm.security.auth.AuthUtils;
+import org.apache.storm.security.auth.ThriftConnectionType;
+import org.apache.storm.security.auth.sasl.SimpleSaslClientCallbackHandler;
+
+/**
+ * A Client callback handler for a WorkerToken. In general a client that wants to
+ * support worker tokens should first check if a WorkerToken is available for the
+ * specific connection type by calling findWorkerTokenInSubject. If that returns
+ * a token, then proceed to create and use this with a DIGEST-MD5 SaslClient.
+ * If not you should fall back to whatever other client auth you want to do.
+ */
+public class WorkerTokenClientCallbackHandler extends SimpleSaslClientCallbackHandler {
+
+ /**
+ * Look in the current subject for a WorkerToken. This should really only happen
+ * when we are in a worker, because the tokens will not be placed in anything else.
+ * @param type the type of connection we need a token for.
+ * @return the found token or null.
+ */
+ public static WorkerToken findWorkerTokenInSubject(ThriftConnectionType type) {
+ WorkerTokenServiceType serviceType = type.getWtType();
+ WorkerToken ret = null;
+ if (serviceType != null) {
+ Subject subject = Subject.getSubject(AccessController.getContext());
+ if (subject != null) {
+ ret = AuthUtils.findWorkerToken(subject, serviceType);
+ }
+ }
+ return ret;
+ }
+
+ /**
+ * Constructor.
+ * @param token the token to use to authenticate. This was probably retrieved by calling findWorkerTokenInSubject.
+ */
+ public WorkerTokenClientCallbackHandler(WorkerToken token) {
+ super(Base64.getEncoder().encodeToString(token.get_info()),
+ Base64.getEncoder().encodeToString(token.get_signature()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenSigner.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenSigner.java b/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenSigner.java
new file mode 100644
index 0000000..46d3c07
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenSigner.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.security.auth.workertoken;
+
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import javax.crypto.Mac;
+import javax.crypto.SecretKey;
+
+/**
+ * Provides everything needed to sign a worker token with a secret key.
+ */
+class WorkerTokenSigner {
+ /**
+ * The name of the hashing algorithm.
+ */
+ static final String DEFAULT_HMAC_ALGORITHM = "HmacSHA256";
+
+ /**
+ * A thread local store for the Macs.
+ */
+ private static final ThreadLocal<Mac> threadLocalMac =
+ ThreadLocal.withInitial(() -> {
+ try {
+ return Mac.getInstance(DEFAULT_HMAC_ALGORITHM);
+ } catch (NoSuchAlgorithmException nsa) {
+ throw new IllegalArgumentException("Can't find " + DEFAULT_HMAC_ALGORITHM + " algorithm.");
+ }
+ });
+
+ /**
+ * Compute HMAC of the identifier using the secret key and return the
+ * output as password.
+ * @param identifier the bytes of the identifier
+ * @param key the secret key
+ * @return the bytes of the generated password
+ */
+ static byte[] createPassword(byte[] identifier, SecretKey key) {
+ Mac mac = threadLocalMac.get();
+ try {
+ mac.init(key);
+ } catch (InvalidKeyException ike) {
+ throw new IllegalArgumentException("Invalid key to HMAC computation", ike);
+ }
+ return mac.doFinal(identifier);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/utils/ObjectReader.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/ObjectReader.java b/storm-client/src/jvm/org/apache/storm/utils/ObjectReader.java
index 07fece1..f50947a 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/ObjectReader.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/ObjectReader.java
@@ -76,6 +76,18 @@ public class ObjectReader {
throw new IllegalArgumentException("Don't know how to convert " + o + " to int");
}
+ public static Long getLong(Object o, Long defaultValue) {
+ if (null == o) {
+ return defaultValue;
+ }
+ if (o instanceof Number) {
+ return ((Number)o).longValue();
+ } else if (o instanceof String) {
+ return Long.valueOf((String) o);
+ }
+ throw new IllegalArgumentException("Don't know how to convert " + o + " to a long");
+ }
+
public static Double getDouble(Object o) {
Double result = getDouble(o, null);
if (null == result) {
http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
index 73cfc81..e20e2ee 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
@@ -42,6 +42,7 @@ import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
@@ -81,6 +82,7 @@ import org.apache.storm.generated.Nimbus;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.TopologyInfo;
import org.apache.storm.generated.TopologySummary;
+import org.apache.storm.generated.WorkerToken;
import org.apache.storm.security.auth.ReqContext;
import org.apache.storm.serialization.DefaultSerializationDelegate;
import org.apache.storm.serialization.SerializationDelegate;
@@ -524,6 +526,14 @@ public class Utils {
return ret.toString();
}
+ public static Id parseZkId(String id, String configName) {
+ String[] split = id.split(":", 2);
+ if (split.length != 2) {
+ throw new IllegalArgumentException(configName + " does not appear to be in the form scheme:acl, i.e. sasl:storm-user");
+ }
+ return new Id(split[0], split[1]);
+ }
+
public static List<ACL> getWorkerACL(Map<String, Object> conf) {
//This is a work around to an issue with ZK where a sasl super user is not super unless there is an open SASL ACL so we are trying to give the correct perms
if (!isZkAuthenticationConfiguredTopology(conf)) {
@@ -533,12 +543,8 @@ public class Utils {
if (stormZKUser == null) {
throw new IllegalArgumentException("Authentication is enabled but " + Config.STORM_ZOOKEEPER_SUPERACL + " is not set");
}
- String[] split = stormZKUser.split(":", 2);
- if (split.length != 2) {
- throw new IllegalArgumentException(Config.STORM_ZOOKEEPER_SUPERACL + " does not appear to be in the form scheme:acl, i.e. sasl:storm-user");
- }
- ArrayList<ACL> ret = new ArrayList<ACL>(ZooDefs.Ids.CREATOR_ALL_ACL);
- ret.add(new ACL(ZooDefs.Perms.ALL, new Id(split[0], split[1])));
+ ArrayList<ACL> ret = new ArrayList<>(ZooDefs.Ids.CREATOR_ALL_ACL);
+ ret.add(new ACL(ZooDefs.Perms.ALL, parseZkId(stormZKUser, Config.STORM_ZOOKEEPER_SUPERACL)));
return ret;
}
@@ -693,6 +699,27 @@ public class Utils {
return serializationDelegate.deserialize(serialized, clazz);
}
+ /**
+ * Serialize an object using the configured serialization and then base64 encode it into a string.
+ * @param obj the object to encode
+ * @return a string with the encoded object in it.
+ */
+ public static String serializeToString(Object obj) {
+ return Base64.getEncoder().encodeToString(serializationDelegate.serialize(obj));
+ }
+
+ /**
+ * Deserialize an object stored in a string. The String is assumed to be a base64 encoded string
+ * containing the bytes to actually deserialize.
+ * @param str the encoded string.
+ * @param clazz the thrift class we are expecting.
+ * @param <T> The type of clazz
+ * @return the decoded object
+ */
+ public static <T> T deserializeFromString(String str, Class<T> clazz) {
+ return deserialize(Base64.getDecoder().decode(str), clazz);
+ }
+
public static byte[] toByteArray(ByteBuffer buffer) {
byte[] ret = new byte[buffer.remaining()];
buffer.get(ret, 0, ret.length);