You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:05:11 UTC
[33/51] [partial] storm git commit: Update JStorm to latest release
2.1.0
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/IAuthorizer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/IAuthorizer.java b/jstorm-core/src/main/java/backtype/storm/security/auth/IAuthorizer.java
index d592bb7..7ed498b 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/IAuthorizer.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/IAuthorizer.java
@@ -20,27 +20,27 @@ package backtype.storm.security.auth;
import java.util.Map;
/**
- * Nimbus could be configured with an authorization plugin.
- * If not specified, all requests are authorized.
+ * Nimbus could be configured with an authorization plugin. If not specified, all requests are authorized.
*
- * You could specify the authorization plugin via storm parameter. For example:
- * storm -c nimbus.authorization.class=backtype.storm.security.auth.NoopAuthorizer ...
- *
- * You could also specify it via storm.yaml:
- * nimbus.authorization.class: backtype.storm.security.auth.NoopAuthorizer
+ * You could specify the authorization plugin via storm parameter. For example: storm -c nimbus.authorization.class=backtype.storm.security.auth.NoopAuthorizer
+ * ...
+ *
+ * You could also specify it via storm.yaml: nimbus.authorization.class: backtype.storm.security.auth.NoopAuthorizer
*/
public interface IAuthorizer {
/**
* Invoked once immediately after construction
- * @param conf Storm configuration
+ *
+ * @param conf Storm configuration
*/
void prepare(Map storm_conf);
-
+
/**
* permit() method is invoked for each incoming Thrift request.
- * @param context request context includes info about
+ *
+ * @param context request context includes info about
* @param operation operation name
- * @param topology_storm configuration of targeted topology
+ * @param topology_storm configuration of targeted topology
* @return true if the request is authorized, false if reject
*/
public boolean permit(ReqContext context, String operation, Map topology_conf);
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/IAutoCredentials.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/IAutoCredentials.java b/jstorm-core/src/main/java/backtype/storm/security/auth/IAutoCredentials.java
index b3886da..16841d5 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/IAutoCredentials.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/IAutoCredentials.java
@@ -23,8 +23,7 @@ import java.util.Map;
import javax.security.auth.Subject;
/**
- * Provides a way to automatically push credentials to a topology and to
- * retreave them in the worker.
+ * Provides a way to automatically push credentials to a topology and to retreave them in the worker.
*/
public interface IAutoCredentials {
@@ -32,24 +31,26 @@ public interface IAutoCredentials {
/**
* Called to populate the credentials on the client side.
+ *
* @param credentials the credentials to be populated.
*/
public void populateCredentials(Map<String, String> credentials);
/**
* Called to initially populate the subject on the worker side with credentials passed in.
+ *
* @param subject the subject to optionally put credentials in.
* @param credentials the credentials to be used.
- */
+ */
public void populateSubject(Subject subject, Map<String, String> credentials);
-
/**
- * Called to update the subject on the worker side when new credentials are recieved.
- * This means that populateSubject has already been called on this subject.
+ * Called to update the subject on the worker side when new credentials are recieved. This means that populateSubject has already been called on this
+ * subject.
+ *
* @param subject the subject to optionally put credentials in.
* @param credentials the credentials to be used.
- */
+ */
public void updateSubject(Subject subject, Map<String, String> credentials);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/ICredentialsRenewer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/ICredentialsRenewer.java b/jstorm-core/src/main/java/backtype/storm/security/auth/ICredentialsRenewer.java
index 3eaf6c4..34358f4 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/ICredentialsRenewer.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/ICredentialsRenewer.java
@@ -26,16 +26,18 @@ import java.util.Map;
*/
public interface ICredentialsRenewer {
- /**
- * Called when initializing the service.
- * @param conf the storm cluster configuration.
- */
- public void prepare(Map conf);
+ /**
+ * Called when initializing the service.
+ *
+ * @param conf the storm cluster configuration.
+ */
+ public void prepare(Map conf);
/**
* Renew any credentials that need to be renewed. (Update the credentials if needed)
+ *
* @param credentials the credentials that may have something to renew.
* @param topologyConf topology configuration.
- */
+ */
public void renew(Map<String, String> credentials, Map topologyConf);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/IGroupMappingServiceProvider.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/IGroupMappingServiceProvider.java b/jstorm-core/src/main/java/backtype/storm/security/auth/IGroupMappingServiceProvider.java
index 5590b81..865e950 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/IGroupMappingServiceProvider.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/IGroupMappingServiceProvider.java
@@ -26,13 +26,14 @@ public interface IGroupMappingServiceProvider {
/**
* Invoked once immediately after construction
+ *
* @param storm_conf Storm configuration
*/
void prepare(Map storm_conf);
/**
- * Get all various group memberships of a given user.
- * Returns EMPTY list in case of non-existing user
+ * Get all various group memberships of a given user. Returns EMPTY list in case of non-existing user
+ *
* @param user User's name
* @return group memberships of user
* @throws IOException
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/IHttpCredentialsPlugin.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/IHttpCredentialsPlugin.java b/jstorm-core/src/main/java/backtype/storm/security/auth/IHttpCredentialsPlugin.java
index a012ce4..66dfcee 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/IHttpCredentialsPlugin.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/IHttpCredentialsPlugin.java
@@ -29,20 +29,22 @@ import backtype.storm.security.auth.ReqContext;
public interface IHttpCredentialsPlugin {
/**
* Invoked once immediately after construction
+ *
* @param storm_conf Storm configuration
*/
void prepare(Map storm_conf);
/**
* Gets the user name from the request.
+ *
* @param req the servlet request
* @return the authenticated user, or null if none is authenticated.
*/
String getUserName(HttpServletRequest req);
/**
- * Populates a given context with credentials information from an HTTP
- * request.
+ * Populates a given context with credentials information from an HTTP request.
+ *
* @param req the servlet request
* @return the context
*/
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/IPrincipalToLocal.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/IPrincipalToLocal.java b/jstorm-core/src/main/java/backtype/storm/security/auth/IPrincipalToLocal.java
index fca3d37..32b4564 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/IPrincipalToLocal.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/IPrincipalToLocal.java
@@ -22,18 +22,19 @@ import java.util.Map;
import java.security.Principal;
/**
- * Storm can be configured to launch worker processed as a given user.
- * Some transports need to map the Principal to a local user name.
+ * Storm can be configured to launch worker processed as a given user. Some transports need to map the Principal to a local user name.
*/
public interface IPrincipalToLocal {
/**
* Invoked once immediately after construction
- * @param conf Storm configuration
+ *
+ * @param conf Storm configuration
*/
void prepare(Map storm_conf);
-
+
/**
* Convert a Principal to a local user name.
+ *
* @param principal the principal to convert
* @return The local user name.
*/
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/ITransportPlugin.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/ITransportPlugin.java b/jstorm-core/src/main/java/backtype/storm/security/auth/ITransportPlugin.java
index 5ba2557..c3c657f 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/ITransportPlugin.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/ITransportPlugin.java
@@ -37,25 +37,28 @@ import backtype.storm.security.auth.ThriftConnectionType;
public interface ITransportPlugin {
/**
* Invoked once immediately after construction
+ *
* @param type the type of connection this will process.
- * @param storm_conf Storm configuration
+ * @param storm_conf Storm configuration
* @param login_conf login configuration
*/
void prepare(ThriftConnectionType type, Map storm_conf, Configuration login_conf);
-
+
/**
* Create a server associated with a given port, service handler, and purpose
+ *
* @param processor service handler
* @return server
*/
public TServer getServer(TProcessor processor) throws IOException, TTransportException;
/**
- * Connect to the specified server via framed transport
+ * Connect to the specified server via framed transport
+ *
* @param transport The underlying Thrift transport.
* @param serverHost server host
- * @param asUser the user as which the connection should be established, and all the subsequent actions should be executed.
- * Only applicable when using secure storm cluster. A null/blank value here will just indicate to use the logged in user.
+ * @param asUser the user as which the connection should be established, and all the subsequent actions should be executed. Only applicable when using
+ * secure storm cluster. A null/blank value here will just indicate to use the logged in user.
*/
public TTransport connect(TTransport transport, String serverHost, String asUser) throws IOException, TTransportException;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/KerberosPrincipalToLocal.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/KerberosPrincipalToLocal.java b/jstorm-core/src/main/java/backtype/storm/security/auth/KerberosPrincipalToLocal.java
index 35c7788..7ac6a6d 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/KerberosPrincipalToLocal.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/KerberosPrincipalToLocal.java
@@ -28,18 +28,21 @@ public class KerberosPrincipalToLocal implements IPrincipalToLocal {
/**
* Invoked once immediately after construction
- * @param conf Storm configuration
+ *
+ * @param conf Storm configuration
*/
- public void prepare(Map storm_conf) {}
-
+ public void prepare(Map storm_conf) {
+ }
+
/**
* Convert a Principal to a local user name.
+ *
* @param principal the principal to convert
* @return The local user name.
*/
public String toLocal(Principal principal) {
- //This technically does not conform with rfc1964, but should work so
- // long as you don't have any really odd names in your KDC.
- return principal == null ? null : principal.getName().split("[/@]")[0];
+ // This technically does not conform with rfc1964, but should work so
+ // long as you don't have any really odd names in your KDC.
+ return principal == null ? null : principal.getName().split("[/@]")[0];
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/ReqContext.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/ReqContext.java b/jstorm-core/src/main/java/backtype/storm/security/auth/ReqContext.java
index a252f85..47f317c 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/ReqContext.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/ReqContext.java
@@ -31,10 +31,7 @@ import java.security.Principal;
import javax.security.auth.Subject;
/**
- * context request context includes info about
- * (1) remote address,
- * (2) remote subject and primary principal
- * (3) request ID
+ * context request context includes info about (1) remote address, (2) remote subject and primary principal (3) request ID
*/
public class ReqContext {
private static final AtomicInteger uniqueId = new AtomicInteger(0);
@@ -46,39 +43,37 @@ public class ReqContext {
private static final Logger LOG = LoggerFactory.getLogger(ReqContext.class);
-
/**
* Get a request context associated with current thread
+ *
* @return
*/
public static ReqContext context() {
return ctxt.get();
}
- //each thread will have its own request context
- private static final ThreadLocal < ReqContext > ctxt =
- new ThreadLocal < ReqContext > () {
- @Override
+ // each thread will have its own request context
+ private static final ThreadLocal<ReqContext> ctxt = new ThreadLocal<ReqContext>() {
+ @Override
protected ReqContext initialValue() {
return new ReqContext(AccessController.getContext());
}
};
- //private constructor
+ // private constructor
@VisibleForTesting
public ReqContext(AccessControlContext acl_ctxt) {
_subject = Subject.getSubject(acl_ctxt);
_reqID = uniqueId.incrementAndGet();
}
- //private constructor
+ // private constructor
@VisibleForTesting
public ReqContext(Subject sub) {
_subject = sub;
_reqID = uniqueId.incrementAndGet();
}
-
/**
* client address
*/
@@ -108,15 +103,18 @@ public class ReqContext {
* The primary principal associated current subject
*/
public Principal principal() {
- if (_subject == null) return null;
+ if (_subject == null)
+ return null;
Set<Principal> princs = _subject.getPrincipals();
- if (princs.size()==0) return null;
+ if (princs.size() == 0)
+ return null;
return (Principal) (princs.toArray()[0]);
}
public void setRealPrincipal(Principal realPrincipal) {
this.realPrincipal = realPrincipal;
}
+
/**
* The real principal associated with the subject.
*/
@@ -126,12 +124,13 @@ public class ReqContext {
/**
* Returns true if this request is an impersonation request.
+ *
* @return
*/
public boolean isImpersonating() {
return this.realPrincipal != null;
}
-
+
/**
* request ID of this request
*/
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/SaslTransportPlugin.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/SaslTransportPlugin.java b/jstorm-core/src/main/java/backtype/storm/security/auth/SaslTransportPlugin.java
index 7208a17..9c8780d 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/SaslTransportPlugin.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/SaslTransportPlugin.java
@@ -73,11 +73,9 @@ public abstract class SaslTransportPlugin implements ITransportPlugin {
int numWorkerThreads = type.getNumThreads(storm_conf);
Integer queueSize = type.getQueueSize(storm_conf);
- TThreadPoolServer.Args server_args = new TThreadPoolServer.Args(serverTransport).
- processor(new TUGIWrapProcessor(processor)).
- minWorkerThreads(numWorkerThreads).
- maxWorkerThreads(numWorkerThreads).
- protocolFactory(new TBinaryProtocol.Factory(false, true));
+ TThreadPoolServer.Args server_args =
+ new TThreadPoolServer.Args(serverTransport).processor(new TUGIWrapProcessor(processor)).minWorkerThreads(numWorkerThreads)
+ .maxWorkerThreads(numWorkerThreads).protocolFactory(new TBinaryProtocol.Factory(false, true));
if (serverTransportFactory != null) {
server_args.transportFactory(serverTransportFactory);
@@ -86,26 +84,23 @@ public abstract class SaslTransportPlugin implements ITransportPlugin {
if (queueSize != null) {
workQueue = new ArrayBlockingQueue(queueSize);
}
- ThreadPoolExecutor executorService = new ExtendedThreadPoolExecutor(numWorkerThreads, numWorkerThreads,
- 60, TimeUnit.SECONDS, workQueue);
+ ThreadPoolExecutor executorService = new ExtendedThreadPoolExecutor(numWorkerThreads, numWorkerThreads, 60, TimeUnit.SECONDS, workQueue);
server_args.executorService(executorService);
return new TThreadPoolServer(server_args);
}
/**
* All subclass must implement this method
+ *
* @return
* @throws IOException
*/
protected abstract TTransportFactory getServerTransportFactory() throws IOException;
-
- /**
- * Processor that pulls the SaslServer object out of the transport, and
- * assumes the remote user's UGI before calling through to the original
- * processor.
- *
- * This is used on the server side to set the UGI for each specific call.
+ /**
+ * Processor that pulls the SaslServer object out of the transport, and assumes the remote user's UGI before calling through to the original processor.
+ *
+ * This is used on the server side to set the UGI for each specific call.
*/
private class TUGIWrapProcessor implements TProcessor {
final TProcessor wrapped;
@@ -115,25 +110,25 @@ public abstract class SaslTransportPlugin implements ITransportPlugin {
}
public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException {
- //populating request context
+ // populating request context
ReqContext req_context = ReqContext.context();
TTransport trans = inProt.getTransport();
- //Sasl transport
- TSaslServerTransport saslTrans = (TSaslServerTransport)trans;
- //remote address
- TSocket tsocket = (TSocket)saslTrans.getUnderlyingTransport();
+ // Sasl transport
+ TSaslServerTransport saslTrans = (TSaslServerTransport) trans;
+ // remote address
+ TSocket tsocket = (TSocket) saslTrans.getUnderlyingTransport();
Socket socket = tsocket.getSocket();
req_context.setRemoteAddress(socket.getInetAddress());
- //remote subject
+ // remote subject
SaslServer saslServer = saslTrans.getSaslServer();
String authId = saslServer.getAuthorizationID();
Subject remoteUser = new Subject();
remoteUser.getPrincipals().add(new User(authId));
req_context.setSubject(remoteUser);
- //invoke service handler
+ // invoke service handler
return wrapped.process(inProt, outProt);
}
}
@@ -142,11 +137,11 @@ public abstract class SaslTransportPlugin implements ITransportPlugin {
private final String name;
public User(String name) {
- this.name = name;
+ this.name = name;
}
- /**
- * Get the full name of the user.
+ /**
+ * Get the full name of the user.
*/
public String getName() {
return name;
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/ShellBasedGroupsMapping.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/ShellBasedGroupsMapping.java b/jstorm-core/src/main/java/backtype/storm/security/auth/ShellBasedGroupsMapping.java
index 62a4c7e..16f2fe4 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/ShellBasedGroupsMapping.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/ShellBasedGroupsMapping.java
@@ -31,15 +31,14 @@ import backtype.storm.utils.ShellUtils.ExitCodeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-public class ShellBasedGroupsMapping implements
- IGroupMappingServiceProvider {
+public class ShellBasedGroupsMapping implements IGroupMappingServiceProvider {
public static Logger LOG = LoggerFactory.getLogger(ShellBasedGroupsMapping.class);
public TimeCacheMap<String, Set<String>> cachedGroups;
/**
* Invoked once immediately after construction
+ *
* @param storm_conf Storm configuration
*/
@Override
@@ -50,24 +49,24 @@ public class ShellBasedGroupsMapping implements
/**
* Returns list of groups for a user
- *
+ *
* @param user get groups for this user
* @return list of groups for a given user
*/
@Override
public Set<String> getGroups(String user) throws IOException {
- if(cachedGroups.containsKey(user)) {
+ if (cachedGroups.containsKey(user)) {
return cachedGroups.get(user);
}
Set<String> groups = getUnixGroups(user);
- if(!groups.isEmpty())
- cachedGroups.put(user,groups);
+ if (!groups.isEmpty())
+ cachedGroups.put(user, groups);
return groups;
}
/**
- * Get the current user's group list from Unix by running the command 'groups'
- * NOTE. For non-existing user it will return EMPTY list
+ * Get the current user's group list from Unix by running the command 'groups' NOTE. For non-existing user it will return EMPTY list
+ *
* @param user user name
* @return the groups set that the <code>user</code> belongs to
* @throws IOException if encounter any error when running the command
@@ -82,8 +81,7 @@ public class ShellBasedGroupsMapping implements
return new HashSet<String>();
}
- StringTokenizer tokenizer =
- new StringTokenizer(result, ShellUtils.TOKEN_SEPARATOR_REGEX);
+ StringTokenizer tokenizer = new StringTokenizer(result, ShellUtils.TOKEN_SEPARATOR_REGEX);
Set<String> groups = new HashSet<String>();
while (tokenizer.hasMoreTokens()) {
groups.add(tokenizer.nextToken());
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/SimpleTransportPlugin.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/SimpleTransportPlugin.java b/jstorm-core/src/main/java/backtype/storm/security/auth/SimpleTransportPlugin.java
index 2abcdae..c7e816f 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/SimpleTransportPlugin.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/SimpleTransportPlugin.java
@@ -73,22 +73,21 @@ public class SimpleTransportPlugin implements ITransportPlugin {
int maxBufferSize = type.getMaxBufferSize(storm_conf);
Integer queueSize = type.getQueueSize(storm_conf);
- THsHaServer.Args server_args = new THsHaServer.Args(serverTransport).
- processor(new SimpleWrapProcessor(processor)).
- workerThreads(numWorkerThreads).
- protocolFactory(new TBinaryProtocol.Factory(false, true, maxBufferSize, -1));
+ THsHaServer.Args server_args =
+ new THsHaServer.Args(serverTransport).processor(new SimpleWrapProcessor(processor)).workerThreads(numWorkerThreads)
+ .protocolFactory(new TBinaryProtocol.Factory(false, true, maxBufferSize, -1));
if (queueSize != null) {
- server_args.executorService(new ThreadPoolExecutor(numWorkerThreads, numWorkerThreads,
- 60, TimeUnit.SECONDS, new ArrayBlockingQueue(queueSize)));
+ server_args.executorService(new ThreadPoolExecutor(numWorkerThreads, numWorkerThreads, 60, TimeUnit.SECONDS, new ArrayBlockingQueue(queueSize)));
}
- //construct THsHaServer
+ // construct THsHaServer
return new THsHaServer(server_args);
}
/**
- * Connect to the specified server via framed transport
+ * Connect to the specified server via framed transport
+ *
* @param transport The underlying Thrift transport.
* @param serverHost unused.
* @param asUser unused.
@@ -96,10 +95,10 @@ public class SimpleTransportPlugin implements ITransportPlugin {
@Override
public TTransport connect(TTransport transport, String serverHost, String asUser) throws TTransportException {
int maxBufferSize = type.getMaxBufferSize(storm_conf);
- //create a framed transport
+ // create a framed transport
TTransport conn = new TFramedTransport(transport, maxBufferSize);
- //connect
+ // connect
conn.open();
LOG.debug("Simple client transport has been established");
@@ -108,13 +107,13 @@ public class SimpleTransportPlugin implements ITransportPlugin {
/**
* @return the subject that will be used for all connections
- */
+ */
protected Subject getDefaultSubject() {
return null;
}
- /**
- * Processor that populate simple transport info into ReqContext, and then invoke a service handler
+ /**
+ * Processor that populate simple transport info into ReqContext, and then invoke a service handler
*/
private class SimpleWrapProcessor implements TProcessor {
final TProcessor wrapped;
@@ -124,7 +123,7 @@ public class SimpleTransportPlugin implements ITransportPlugin {
}
public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException {
- //populating request context
+ // populating request context
ReqContext req_context = ReqContext.context();
TTransport trans = inProt.getTransport();
@@ -133,31 +132,36 @@ public class SimpleTransportPlugin implements ITransportPlugin {
req_context.setRemoteAddress(InetAddress.getLocalHost());
} catch (UnknownHostException e) {
throw new RuntimeException(e);
- }
+ }
} else if (trans instanceof TSocket) {
- TSocket tsocket = (TSocket)trans;
- //remote address
+ TSocket tsocket = (TSocket) trans;
+ // remote address
Socket socket = tsocket.getSocket();
- req_context.setRemoteAddress(socket.getInetAddress());
- }
+ req_context.setRemoteAddress(socket.getInetAddress());
+ }
- //anonymous user
+ // anonymous user
Subject s = getDefaultSubject();
if (s == null) {
- final String user = (String)storm_conf.get("debug.simple.transport.user");
- if (user != null) {
- HashSet<Principal> principals = new HashSet<Principal>();
- principals.add(new Principal() {
- public String getName() { return user; }
- public String toString() { return user; }
- });
- s = new Subject(true, principals, new HashSet<Object>(), new HashSet<Object>());
- }
+ final String user = (String) storm_conf.get("debug.simple.transport.user");
+ if (user != null) {
+ HashSet<Principal> principals = new HashSet<Principal>();
+ principals.add(new Principal() {
+ public String getName() {
+ return user;
+ }
+
+ public String toString() {
+ return user;
+ }
+ });
+ s = new Subject(true, principals, new HashSet<Object>(), new HashSet<Object>());
+ }
}
req_context.setSubject(s);
- //invoke service handler
+ // invoke service handler
return wrapped.process(inProt, outProt);
}
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/SingleUserPrincipal.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/SingleUserPrincipal.java b/jstorm-core/src/main/java/backtype/storm/security/auth/SingleUserPrincipal.java
index 6af17fa..fd9e694 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/SingleUserPrincipal.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/SingleUserPrincipal.java
@@ -34,7 +34,7 @@ public class SingleUserPrincipal implements Principal {
@Override
public boolean equals(Object another) {
if (another instanceof SingleUserPrincipal) {
- return _userName.equals(((SingleUserPrincipal)another)._userName);
+ return _userName.equals(((SingleUserPrincipal) another)._userName);
}
return false;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/TBackoffConnect.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/TBackoffConnect.java b/jstorm-core/src/main/java/backtype/storm/security/auth/TBackoffConnect.java
index f547868..b699bc4 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/TBackoffConnect.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/TBackoffConnect.java
@@ -35,15 +35,13 @@ public class TBackoffConnect {
public TBackoffConnect(int retryTimes, int retryInterval, int retryIntervalCeiling) {
_retryTimes = retryTimes;
- waitGrabber = new StormBoundedExponentialBackoffRetry(retryInterval,
- retryIntervalCeiling,
- retryTimes);
+ waitGrabber = new StormBoundedExponentialBackoffRetry(retryInterval, retryIntervalCeiling, retryTimes);
}
public TTransport doConnectWithRetry(ITransportPlugin transportPlugin, TTransport underlyingTransport, String host, String asUser) throws IOException {
boolean connected = false;
TTransport transportResult = null;
- while(!connected) {
+ while (!connected) {
try {
transportResult = transportPlugin.connect(underlyingTransport, host, asUser);
connected = true;
@@ -55,13 +53,13 @@ public class TBackoffConnect {
}
private void retryNext(TTransportException ex) {
- if(!canRetry()) {
+ if (!canRetry()) {
throw new RuntimeException(ex);
}
try {
int sleeptime = waitGrabber.getSleepTimeMs(_completedRetries, 0);
- LOG.debug("Failed to connect. Retrying... (" + Integer.toString( _completedRetries) + ") in " + Integer.toString(sleeptime) + "ms");
+ LOG.debug("Failed to connect. Retrying... (" + Integer.toString(_completedRetries) + ") in " + Integer.toString(sleeptime) + "ms");
Thread.sleep(sleeptime);
} catch (InterruptedException e) {
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftClient.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftClient.java b/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftClient.java
index 8d2136a..954b4f8 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftClient.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftClient.java
@@ -45,17 +45,17 @@ public class ThriftClient {
private String hostPort;
private String host;
private Integer port;
-
+
private Map<Object, Object> conf;
-
+
private Integer timeout;
private ThriftConnectionType type;
private String asUser;
-
+
public ThriftClient(Map conf, ThriftConnectionType type) throws Exception {
this(conf, type, null, null, null, null);
}
-
+
@SuppressWarnings("unchecked")
public ThriftClient(Map conf, ThriftConnectionType type, Integer timeout) throws Exception {
this(conf, type, null, null, timeout, null);
@@ -63,6 +63,7 @@ public class ThriftClient {
/**
* This is only for be compatible for Storm
+ *
* @param conf
* @param type
* @param host
@@ -71,45 +72,39 @@ public class ThriftClient {
this(conf, type, host, null, null, null);
}
- public ThriftClient(Map conf, ThriftConnectionType type, String host, Integer port, Integer timeout){
+ public ThriftClient(Map conf, ThriftConnectionType type, String host, Integer port, Integer timeout) {
this(conf, type, host, port, timeout, null);
}
public ThriftClient(Map conf, ThriftConnectionType type, String host, Integer port, Integer timeout, String asUser) {
- //create a socket with server
-
+ // create a socket with server
+
this.timeout = timeout;
this.conf = conf;
this.type = type;
this.asUser = asUser;
-
+
getMaster(conf, host, port);
reconnect();
}
-
-
-
+
public static String getMasterByZk(Map conf) throws Exception {
-
CuratorFramework zkobj = null;
String masterHost = null;
-
+
try {
String root = String.valueOf(conf.get(Config.STORM_ZOOKEEPER_ROOT));
String zkMasterDir = root + Cluster.MASTER_SUBTREE;
-
- zkobj = Utils.newCurator(conf,
- (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS),
- conf.get(Config.STORM_ZOOKEEPER_PORT),
- zkMasterDir);
+
+ zkobj = Utils.newCurator(conf, (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS), conf.get(Config.STORM_ZOOKEEPER_PORT), zkMasterDir);
zkobj.start();
if (zkobj.checkExists().forPath("/") == null) {
throw new RuntimeException("No alive nimbus ");
}
-
+
masterHost = new String(zkobj.getData().forPath("/"));
-
+
LOG.info("masterHost:" + masterHost);
return masterHost;
} finally {
@@ -119,8 +114,8 @@ public class ThriftClient {
}
}
}
-
- public void getMaster(Map conf, String host, Integer port){
+
+ public void getMaster(Map conf, String host, Integer port) {
if (StringUtils.isBlank(host) == false) {
this.host = host;
if (port == null) {
@@ -128,7 +123,7 @@ public class ThriftClient {
}
this.port = port;
this.hostPort = host + ":" + port;
- }else {
+ } else {
try {
hostPort = ThriftClient.getMasterByZk(conf);
} catch (Exception e) {
@@ -142,7 +137,7 @@ public class ThriftClient {
this.host = host_port[0];
this.port = Integer.parseInt(host_port[1]);
}
-
+
// create a socket with server
if (this.host == null) {
throw new IllegalArgumentException("host is not set");
@@ -151,45 +146,43 @@ public class ThriftClient {
throw new IllegalArgumentException("invalid port: " + port);
}
}
-
+
public synchronized TTransport transport() {
return _transport;
}
-
+
public synchronized void reconnect() {
- close();
+ close();
try {
TSocket socket = new TSocket(host, port);
- if(timeout!=null) {
+ if (timeout != null) {
socket.setTimeout(timeout);
- }else {
- //@@@ Todo
+ } else {
+ // @@@ Todo
// set the socket default Timeout as xxxx
}
- //locate login configuration
+ // locate login configuration
Configuration login_conf = AuthUtils.GetConfiguration(conf);
- //construct a transport plugin
+ // construct a transport plugin
ITransportPlugin transportPlugin = AuthUtils.GetTransportPlugin(type, conf, login_conf);
final TTransport underlyingTransport = socket;
- //TODO get this from type instead of hardcoding to Nimbus.
- //establish client-server transport via plugin
- //do retries if the connect fails
- TBackoffConnect connectionRetry
- = new TBackoffConnect(
- Utils.getInt(conf.get(Config.STORM_NIMBUS_RETRY_TIMES)),
- Utils.getInt(conf.get(Config.STORM_NIMBUS_RETRY_INTERVAL)),
- Utils.getInt(conf.get(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING)));
+ // TODO get this from type instead of hardcoding to Nimbus.
+ // establish client-server transport via plugin
+ // do retries if the connect fails
+ TBackoffConnect connectionRetry =
+ new TBackoffConnect(Utils.getInt(conf.get(Config.STORM_NIMBUS_RETRY_TIMES)), Utils.getInt(conf.get(Config.STORM_NIMBUS_RETRY_INTERVAL)),
+ Utils.getInt(conf.get(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING)));
_transport = connectionRetry.doConnectWithRetry(transportPlugin, underlyingTransport, host, asUser);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
_protocol = null;
if (_transport != null) {
- _protocol = new TBinaryProtocol(_transport);
+ _protocol = new TBinaryProtocol(_transport);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftConnectionType.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftConnectionType.java b/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftConnectionType.java
index f9be7ae..e248df8 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftConnectionType.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftConnectionType.java
@@ -26,12 +26,9 @@ import java.util.Map;
* The purpose for which the Thrift server is created.
*/
public enum ThriftConnectionType {
- NIMBUS(Config.NIMBUS_THRIFT_TRANSPORT_PLUGIN, Config.NIMBUS_THRIFT_PORT, null,
- Config.NIMBUS_THRIFT_THREADS, Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE),
- DRPC(Config.DRPC_THRIFT_TRANSPORT_PLUGIN, Config.DRPC_PORT, Config.DRPC_QUEUE_SIZE,
- Config.DRPC_WORKER_THREADS, Config.DRPC_MAX_BUFFER_SIZE),
- DRPC_INVOCATIONS(Config.DRPC_INVOCATIONS_THRIFT_TRANSPORT_PLUGIN, Config.DRPC_INVOCATIONS_PORT, null,
- Config.DRPC_INVOCATIONS_THREADS, Config.DRPC_MAX_BUFFER_SIZE);
+ NIMBUS(Config.NIMBUS_THRIFT_TRANSPORT_PLUGIN, Config.NIMBUS_THRIFT_PORT, null, Config.NIMBUS_THRIFT_THREADS, Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE), DRPC(
+ Config.DRPC_THRIFT_TRANSPORT_PLUGIN, Config.DRPC_PORT, Config.DRPC_QUEUE_SIZE, Config.DRPC_WORKER_THREADS, Config.DRPC_MAX_BUFFER_SIZE), DRPC_INVOCATIONS(
+ Config.DRPC_INVOCATIONS_THRIFT_TRANSPORT_PLUGIN, Config.DRPC_INVOCATIONS_PORT, null, Config.DRPC_INVOCATIONS_THREADS, Config.DRPC_MAX_BUFFER_SIZE);
private final String _transConf;
private final String _portConf;
@@ -39,8 +36,7 @@ public enum ThriftConnectionType {
private final String _threadsConf;
private final String _buffConf;
- ThriftConnectionType(String transConf, String portConf, String qConf,
- String threadsConf, String buffConf) {
+ ThriftConnectionType(String transConf, String portConf, String qConf, String threadsConf, String buffConf) {
_transConf = transConf;
_portConf = portConf;
_qConf = qConf;
@@ -49,9 +45,9 @@ public enum ThriftConnectionType {
}
public String getTransportPlugin(Map conf) {
- String ret = (String)conf.get(_transConf);
+ String ret = (String) conf.get(_transConf);
if (ret == null) {
- ret = (String)conf.get(Config.STORM_THRIFT_TRANSPORT_PLUGIN);
+ ret = (String) conf.get(Config.STORM_THRIFT_TRANSPORT_PLUGIN);
}
return ret;
}
@@ -64,10 +60,10 @@ public enum ThriftConnectionType {
if (_qConf == null) {
return null;
}
- return (Integer)conf.get(_qConf);
+ return (Integer) conf.get(_qConf);
}
- public int getNumThreads(Map conf) {
+ public int getNumThreads(Map conf) {
return Utils.getInt(conf.get(_threadsConf));
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftServer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftServer.java b/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftServer.java
index 64243ce..410f1ce 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftServer.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftServer.java
@@ -28,19 +28,19 @@ import org.slf4j.LoggerFactory;
public class ThriftServer {
private static final Logger LOG = LoggerFactory.getLogger(ThriftServer.class);
- private Map _storm_conf; //storm configuration
+ private Map _storm_conf; // storm configuration
protected TProcessor _processor = null;
private final ThriftConnectionType _type;
private TServer _server = null;
private Configuration _login_conf;
-
+
public ThriftServer(Map storm_conf, TProcessor processor, ThriftConnectionType type) {
_storm_conf = storm_conf;
_processor = processor;
_type = type;
try {
- //retrieve authentication configuration
+ // retrieve authentication configuration
_login_conf = AuthUtils.GetConfiguration(_storm_conf);
} catch (Exception x) {
LOG.error(x.getMessage(), x);
@@ -54,27 +54,30 @@ public class ThriftServer {
/**
* Is ThriftServer listening to requests?
+ *
* @return
*/
public boolean isServing() {
- if (_server == null) return false;
+ if (_server == null)
+ return false;
return _server.isServing();
}
-
- public void serve() {
+
+ public void serve() {
try {
- //locate our thrift transport plugin
- ITransportPlugin transportPlugin = AuthUtils.GetTransportPlugin(_type, _storm_conf, _login_conf);
+ // locate our thrift transport plugin
+ ITransportPlugin transportPlugin = AuthUtils.GetTransportPlugin(_type, _storm_conf, _login_conf);
- //server
+ // server
_server = transportPlugin.getServer(_processor);
- //start accepting requests
+ // start accepting requests
_server.serve();
} catch (Exception ex) {
LOG.error("ThriftServer is being stopped due to: " + ex, ex);
- if (_server != null) _server.stop();
- Runtime.getRuntime().halt(1); //shutdown server process since we could not handle Thrift requests any more
+ if (_server != null)
+ _server.stop();
+ Runtime.getRuntime().halt(1); // shutdown server process since we could not handle Thrift requests any more
}
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DRPCAuthorizerBase.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DRPCAuthorizerBase.java b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DRPCAuthorizerBase.java
index 8951edd..11c4a0f 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DRPCAuthorizerBase.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DRPCAuthorizerBase.java
@@ -22,9 +22,10 @@ public abstract class DRPCAuthorizerBase implements IAuthorizer {
abstract protected boolean permitClientRequest(ReqContext context, String operation, Map params);
abstract protected boolean permitInvocationRequest(ReqContext context, String operation, Map params);
-
+
/**
* Authorizes request from to the DRPC server.
+ *
* @param context the client request context
* @param operation the operation requested by the DRPC server
* @param params a Map with any key-value entries of use to the authorization implementation
@@ -33,14 +34,11 @@ public abstract class DRPCAuthorizerBase implements IAuthorizer {
public boolean permit(ReqContext context, String operation, Map params) {
if ("execute".equals(operation)) {
return permitClientRequest(context, operation, params);
- } else if ("failRequest".equals(operation) ||
- "fetchRequest".equals(operation) ||
- "result".equals(operation)) {
+ } else if ("failRequest".equals(operation) || "fetchRequest".equals(operation) || "result".equals(operation)) {
return permitInvocationRequest(context, operation, params);
}
// Deny unsupported operations.
- LOG.warn("Denying unsupported operation \""+operation+"\" from "+
- context.remoteAddress());
+ LOG.warn("Denying unsupported operation \"" + operation + "\" from " + context.remoteAddress());
return false;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java
index 45eaea5..8aa7243 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java
@@ -19,8 +19,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DRPCSimpleACLAuthorizer extends DRPCAuthorizerBase {
- public static Logger LOG =
- LoggerFactory.getLogger(DRPCSimpleACLAuthorizer.class);
+ public static Logger LOG = LoggerFactory.getLogger(DRPCSimpleACLAuthorizer.class);
public static final String CLIENT_USERS_KEY = "client.users";
public static final String INVOCATION_USER_KEY = "invocation.user";
@@ -33,44 +32,35 @@ public class DRPCSimpleACLAuthorizer extends DRPCAuthorizerBase {
protected class AclFunctionEntry {
final public Set<String> clientUsers;
final public String invocationUser;
- public AclFunctionEntry(Collection<String> clientUsers,
- String invocationUser) {
- this.clientUsers = (clientUsers != null) ?
- new HashSet<String>(clientUsers) : new HashSet<String>();
+
+ public AclFunctionEntry(Collection<String> clientUsers, String invocationUser) {
+ this.clientUsers = (clientUsers != null) ? new HashSet<String>(clientUsers) : new HashSet<String>();
this.invocationUser = invocationUser;
}
}
- private volatile Map<String,AclFunctionEntry> _acl = null;
+ private volatile Map<String, AclFunctionEntry> _acl = null;
private volatile long _lastUpdate = 0;
- protected Map<String,AclFunctionEntry> readAclFromConfig() {
- //Thread safety is mostly around _acl. If _acl needs to be updated it is changed atomically
- //More then one thread may be trying to update it at a time, but that is OK, because the
- //change is atomic
+ protected Map<String, AclFunctionEntry> readAclFromConfig() {
+ // Thread safety is mostly around _acl. If _acl needs to be updated it is changed atomically
+ // More then one thread may be trying to update it at a time, but that is OK, because the
+ // change is atomic
long now = System.currentTimeMillis();
if ((now - 5000) > _lastUpdate || _acl == null) {
- Map<String,AclFunctionEntry> acl = new HashMap<String,AclFunctionEntry>();
+ Map<String, AclFunctionEntry> acl = new HashMap<String, AclFunctionEntry>();
Map conf = Utils.findAndReadConfigFile(_aclFileName);
if (conf.containsKey(Config.DRPC_AUTHORIZER_ACL)) {
- Map<String,Map<String,?>> confAcl =
- (Map<String,Map<String,?>>)
- conf.get(Config.DRPC_AUTHORIZER_ACL);
+ Map<String, Map<String, ?>> confAcl = (Map<String, Map<String, ?>>) conf.get(Config.DRPC_AUTHORIZER_ACL);
for (String function : confAcl.keySet()) {
- Map<String,?> val = confAcl.get(function);
- Collection<String> clientUsers =
- val.containsKey(CLIENT_USERS_KEY) ?
- (Collection<String>) val.get(CLIENT_USERS_KEY) : null;
- String invocationUser =
- val.containsKey(INVOCATION_USER_KEY) ?
- (String) val.get(INVOCATION_USER_KEY) : null;
- acl.put(function,
- new AclFunctionEntry(clientUsers, invocationUser));
+ Map<String, ?> val = confAcl.get(function);
+ Collection<String> clientUsers = val.containsKey(CLIENT_USERS_KEY) ? (Collection<String>) val.get(CLIENT_USERS_KEY) : null;
+ String invocationUser = val.containsKey(INVOCATION_USER_KEY) ? (String) val.get(INVOCATION_USER_KEY) : null;
+ acl.put(function, new AclFunctionEntry(clientUsers, invocationUser));
}
} else if (!_permitWhenMissingFunctionEntry) {
- LOG.warn("Requiring explicit ACL entries, but none given. " +
- "Therefore, all operiations will be denied.");
+ LOG.warn("Requiring explicit ACL entries, but none given. " + "Therefore, all operiations will be denied.");
}
_acl = acl;
_lastUpdate = System.currentTimeMillis();
@@ -80,10 +70,8 @@ public class DRPCSimpleACLAuthorizer extends DRPCAuthorizerBase {
@Override
public void prepare(Map conf) {
- Boolean isStrict =
- (Boolean) conf.get(Config.DRPC_AUTHORIZER_ACL_STRICT);
- _permitWhenMissingFunctionEntry =
- (isStrict != null && !isStrict) ? true : false;
+ Boolean isStrict = (Boolean) conf.get(Config.DRPC_AUTHORIZER_ACL_STRICT);
+ _permitWhenMissingFunctionEntry = (isStrict != null && !isStrict) ? true : false;
_aclFileName = (String) conf.get(Config.DRPC_AUTHORIZER_ACL_FILENAME);
_ptol = AuthUtils.GetPrincipalToLocalPlugin(conf);
}
@@ -105,11 +93,10 @@ public class DRPCSimpleACLAuthorizer extends DRPCAuthorizerBase {
return null;
}
- protected boolean permitClientOrInvocationRequest(ReqContext context, Map params,
- String fieldName) {
- Map<String,AclFunctionEntry> acl = readAclFromConfig();
+ protected boolean permitClientOrInvocationRequest(ReqContext context, Map params, String fieldName) {
+ Map<String, AclFunctionEntry> acl = readAclFromConfig();
String function = (String) params.get(FUNCTION_KEY);
- if (function != null && ! function.isEmpty()) {
+ if (function != null && !function.isEmpty()) {
AclFunctionEntry entry = acl.get(function);
if (entry == null && _permitWhenMissingFunctionEntry) {
return true;
@@ -126,16 +113,11 @@ public class DRPCSimpleACLAuthorizer extends DRPCAuthorizerBase {
String principal = getUserFromContext(context);
String user = getLocalUserFromContext(context);
if (value == null) {
- LOG.warn("Configuration for function '"+function+"' is "+
- "invalid: it should have both an invocation user "+
- "and a list of client users defined.");
- } else if (value instanceof Set &&
- (((Set<String>)value).contains(principal) ||
- ((Set<String>)value).contains(user))) {
+ LOG.warn("Configuration for function '" + function + "' is " + "invalid: it should have both an invocation user "
+ + "and a list of client users defined.");
+ } else if (value instanceof Set && (((Set<String>) value).contains(principal) || ((Set<String>) value).contains(user))) {
return true;
- } else if (value instanceof String &&
- (value.equals(principal) ||
- value.equals(user))) {
+ } else if (value instanceof String && (value.equals(principal) || value.equals(user))) {
return true;
}
}
@@ -144,14 +126,12 @@ public class DRPCSimpleACLAuthorizer extends DRPCAuthorizerBase {
}
@Override
- protected boolean permitClientRequest(ReqContext context, String operation,
- Map params) {
+ protected boolean permitClientRequest(ReqContext context, String operation, Map params) {
return permitClientOrInvocationRequest(context, params, "clientUsers");
}
@Override
- protected boolean permitInvocationRequest(ReqContext context, String operation,
- Map params) {
+ protected boolean permitInvocationRequest(ReqContext context, String operation, Map params) {
return permitClientOrInvocationRequest(context, params, "invocationUser");
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DenyAuthorizer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DenyAuthorizer.java b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DenyAuthorizer.java
index 5e84b38..32f809a 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DenyAuthorizer.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DenyAuthorizer.java
@@ -31,27 +31,27 @@ import org.slf4j.LoggerFactory;
*/
public class DenyAuthorizer implements IAuthorizer {
private static final Logger LOG = LoggerFactory.getLogger(DenyAuthorizer.class);
-
+
/**
* Invoked once immediately after construction
- * @param conf Storm configuration
+ *
+ * @param conf Storm configuration
*/
- public void prepare(Map conf) {
+ public void prepare(Map conf) {
}
/**
* permit() method is invoked for each incoming Thrift request
- * @param contrext request context
+ *
+ * @param contrext request context
* @param operation operation name
- * @param topology_storm configuration of targeted topology
+ * @param topology_storm configuration of targeted topology
* @return true if the request is authorized, false if reject
*/
public boolean permit(ReqContext context, String operation, Map topology_conf) {
- LOG.info("[req "+ context.requestID()+ "] Access "
- + " from: " + (context.remoteAddress() == null? "null" : context.remoteAddress().toString())
- + (context.principal() == null? "" : (" principal:"+ context.principal()))
- +" op:"+operation
- + (topology_conf == null? "" : (" topoology:"+topology_conf.get(Config.TOPOLOGY_NAME))));
+ LOG.info("[req " + context.requestID() + "] Access " + " from: " + (context.remoteAddress() == null ? "null" : context.remoteAddress().toString())
+ + (context.principal() == null ? "" : (" principal:" + context.principal())) + " op:" + operation
+ + (topology_conf == null ? "" : (" topoology:" + topology_conf.get(Config.TOPOLOGY_NAME))));
return false;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/ImpersonationAuthorizer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/ImpersonationAuthorizer.java b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/ImpersonationAuthorizer.java
index d6431be..e1a037f 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/ImpersonationAuthorizer.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/ImpersonationAuthorizer.java
@@ -10,7 +10,6 @@ import java.io.IOException;
import java.net.InetAddress;
import java.util.*;
-
public class ImpersonationAuthorizer implements IAuthorizer {
private static final Logger LOG = LoggerFactory.getLogger(ImpersonationAuthorizer.class);
protected static final String WILD_CARD = "*";
@@ -49,16 +48,16 @@ public class ImpersonationAuthorizer implements IAuthorizer {
String userBeingImpersonated = _ptol.toLocal(context.principal());
InetAddress remoteAddress = context.remoteAddress();
- LOG.info("user = {}, principal = {} is attmepting to impersonate user = {} for operation = {} from host = {}",
- impersonatingUser, impersonatingPrincipal, userBeingImpersonated, operation, remoteAddress);
+ LOG.info("user = {}, principal = {} is attmepting to impersonate user = {} for operation = {} from host = {}", impersonatingUser,
+ impersonatingPrincipal, userBeingImpersonated, operation, remoteAddress);
/**
* no config is present for impersonating principal or user, do not permit impersonation.
*/
if (!userImpersonationACL.containsKey(impersonatingPrincipal) && !userImpersonationACL.containsKey(impersonatingUser)) {
- LOG.info("user = {}, principal = {} is trying to impersonate user {}, but config {} does not have entry for impersonating user or principal." +
- "Please see SECURITY.MD to learn how to configure users for impersonation."
- , impersonatingUser, impersonatingPrincipal, userBeingImpersonated, Config.NIMBUS_IMPERSONATION_ACL);
+ LOG.info("user = {}, principal = {} is trying to impersonate user {}, but config {} does not have entry for impersonating user or principal."
+ + "Please see SECURITY.MD to learn how to configure users for impersonation.", impersonatingUser, impersonatingPrincipal,
+ userBeingImpersonated, Config.NIMBUS_IMPERSONATION_ACL);
return false;
}
@@ -78,18 +77,17 @@ public class ImpersonationAuthorizer implements IAuthorizer {
authorizedGroups.addAll(userACL.authorizedGroups);
}
- LOG.debug("user = {}, principal = {} is allowed to impersonate groups = {} from hosts = {} ",
- impersonatingUser, impersonatingPrincipal, authorizedGroups, authorizedHosts);
+ LOG.debug("user = {}, principal = {} is allowed to impersonate groups = {} from hosts = {} ", impersonatingUser, impersonatingPrincipal,
+ authorizedGroups, authorizedHosts);
if (!isAllowedToImpersonateFromHost(authorizedHosts, remoteAddress)) {
- LOG.info("user = {}, principal = {} is not allowed to impersonate from host {} ",
- impersonatingUser, impersonatingPrincipal, remoteAddress);
+ LOG.info("user = {}, principal = {} is not allowed to impersonate from host {} ", impersonatingUser, impersonatingPrincipal, remoteAddress);
return false;
}
if (!isAllowedToImpersonateUser(authorizedGroups, userBeingImpersonated)) {
- LOG.info("user = {}, principal = {} is not allowed to impersonate any group that user {} is part of.",
- impersonatingUser, impersonatingPrincipal, userBeingImpersonated);
+ LOG.info("user = {}, principal = {} is not allowed to impersonate any group that user {} is part of.", impersonatingUser, impersonatingPrincipal,
+ userBeingImpersonated);
return false;
}
@@ -98,14 +96,12 @@ public class ImpersonationAuthorizer implements IAuthorizer {
}
private boolean isAllowedToImpersonateFromHost(Set<String> authorizedHosts, InetAddress remoteAddress) {
- return authorizedHosts.contains(WILD_CARD) ||
- authorizedHosts.contains(remoteAddress.getCanonicalHostName()) ||
- authorizedHosts.contains(remoteAddress.getHostName()) ||
- authorizedHosts.contains(remoteAddress.getHostAddress());
+ return authorizedHosts.contains(WILD_CARD) || authorizedHosts.contains(remoteAddress.getCanonicalHostName())
+ || authorizedHosts.contains(remoteAddress.getHostName()) || authorizedHosts.contains(remoteAddress.getHostAddress());
}
private boolean isAllowedToImpersonateUser(Set<String> authorizedGroups, String userBeingImpersonated) {
- if(authorizedGroups.contains(WILD_CARD)) {
+ if (authorizedGroups.contains(WILD_CARD)) {
return true;
}
@@ -131,9 +127,9 @@ public class ImpersonationAuthorizer implements IAuthorizer {
protected class ImpersonationACL {
public String impersonatingUser;
- //Groups this user is authorized to impersonate.
+ // Groups this user is authorized to impersonate.
public Set<String> authorizedGroups;
- //Hosts this user is authorized to impersonate from.
+ // Hosts this user is authorized to impersonate from.
public Set<String> authorizedHosts;
private ImpersonationACL(String impersonatingUser, Set<String> authorizedGroups, Set<String> authorizedHosts) {
@@ -144,11 +140,8 @@ public class ImpersonationAuthorizer implements IAuthorizer {
@Override
public String toString() {
- return "ImpersonationACL{" +
- "impersonatingUser='" + impersonatingUser + '\'' +
- ", authorizedGroups=" + authorizedGroups +
- ", authorizedHosts=" + authorizedHosts +
- '}';
+ return "ImpersonationACL{" + "impersonatingUser='" + impersonatingUser + '\'' + ", authorizedGroups=" + authorizedGroups + ", authorizedHosts="
+ + authorizedHosts + '}';
}
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/NoopAuthorizer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/NoopAuthorizer.java b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/NoopAuthorizer.java
index 9af44d3..1d88202 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/NoopAuthorizer.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/NoopAuthorizer.java
@@ -34,24 +34,24 @@ public class NoopAuthorizer implements IAuthorizer {
/**
* Invoked once immediately after construction
- * @param conf Storm configuration
+ *
+ * @param conf Storm configuration
*/
- public void prepare(Map conf) {
+ public void prepare(Map conf) {
}
/**
* permit() method is invoked for each incoming Thrift request
- * @param context request context includes info about
+ *
+ * @param context request context includes info about
* @param operation operation name
- * @param topology_storm configuration of targeted topology
+ * @param topology_storm configuration of targeted topology
* @return true if the request is authorized, false if reject
*/
public boolean permit(ReqContext context, String operation, Map topology_conf) {
- LOG.info("[req "+ context.requestID()+ "] Access "
- + " from: " + (context.remoteAddress() == null? "null" : context.remoteAddress().toString())
- + (context.principal() == null? "" : (" principal:"+ context.principal()))
- +" op:"+operation
- + (topology_conf == null? "" : (" topoology:"+topology_conf.get(Config.TOPOLOGY_NAME))));
+ LOG.info("[req " + context.requestID() + "] Access " + " from: " + (context.remoteAddress() == null ? "null" : context.remoteAddress().toString())
+ + (context.principal() == null ? "" : (" principal:" + context.principal())) + " op:" + operation
+ + (topology_conf == null ? "" : (" topoology:" + topology_conf.get(Config.TOPOLOGY_NAME))));
return true;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java
index e50a587..40d7a5d 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java
@@ -36,15 +36,15 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * An authorization implementation that simply checks if a user is allowed to perform specific
- * operations.
+ * An authorization implementation that simply checks if a user is allowed to perform specific operations.
*/
public class SimpleACLAuthorizer implements IAuthorizer {
private static final Logger LOG = LoggerFactory.getLogger(SimpleACLAuthorizer.class);
protected Set<String> _userCommands = new HashSet<String>(Arrays.asList("submitTopology", "fileUpload", "getNimbusConf", "getClusterInfo"));
protected Set<String> _supervisorCommands = new HashSet<String>(Arrays.asList("fileDownload"));
- protected Set<String> _topoCommands = new HashSet<String>(Arrays.asList("killTopology","rebalance","activate","deactivate","getTopologyConf","getTopology","getUserTopology","getTopologyInfo","uploadNewCredentials"));
+ protected Set<String> _topoCommands = new HashSet<String>(Arrays.asList("killTopology", "rebalance", "activate", "deactivate", "getTopologyConf",
+ "getTopology", "getUserTopology", "getTopologyInfo", "uploadNewCredentials"));
protected Set<String> _admins;
protected Set<String> _supervisors;
@@ -52,8 +52,10 @@ public class SimpleACLAuthorizer implements IAuthorizer {
protected Set<String> _nimbusGroups;
protected IPrincipalToLocal _ptol;
protected IGroupMappingServiceProvider _groupMappingProvider;
+
/**
* Invoked once immediately after construction
+ *
* @param conf Storm configuration
*/
@Override
@@ -64,17 +66,17 @@ public class SimpleACLAuthorizer implements IAuthorizer {
_nimbusGroups = new HashSet<String>();
if (conf.containsKey(Config.NIMBUS_ADMINS)) {
- _admins.addAll((Collection<String>)conf.get(Config.NIMBUS_ADMINS));
+ _admins.addAll((Collection<String>) conf.get(Config.NIMBUS_ADMINS));
}
if (conf.containsKey(Config.NIMBUS_SUPERVISOR_USERS)) {
- _supervisors.addAll((Collection<String>)conf.get(Config.NIMBUS_SUPERVISOR_USERS));
+ _supervisors.addAll((Collection<String>) conf.get(Config.NIMBUS_SUPERVISOR_USERS));
}
if (conf.containsKey(Config.NIMBUS_USERS)) {
- _nimbusUsers.addAll((Collection<String>)conf.get(Config.NIMBUS_USERS));
+ _nimbusUsers.addAll((Collection<String>) conf.get(Config.NIMBUS_USERS));
}
if (conf.containsKey(Config.NIMBUS_GROUPS)) {
- _nimbusGroups.addAll((Collection<String>)conf.get(Config.NIMBUS_GROUPS));
+ _nimbusGroups.addAll((Collection<String>) conf.get(Config.NIMBUS_GROUPS));
}
_ptol = AuthUtils.GetPrincipalToLocalPlugin(conf);
@@ -83,6 +85,7 @@ public class SimpleACLAuthorizer implements IAuthorizer {
/**
* permit() method is invoked for each incoming Thrift request
+ *
* @param context request context includes info about
* @param operation operation name
* @param topology_conf configuration of targeted topology
@@ -90,10 +93,8 @@ public class SimpleACLAuthorizer implements IAuthorizer {
*/
@Override
public boolean permit(ReqContext context, String operation, Map topology_conf) {
- LOG.info("[req " + context.requestID() + "] Access "
- + " from: " + (context.remoteAddress() == null ? "null" : context.remoteAddress().toString())
- + (context.principal() == null ? "" : (" principal:" + context.principal()))
- + " op:" + operation
+ LOG.info("[req " + context.requestID() + "] Access " + " from: " + (context.remoteAddress() == null ? "null" : context.remoteAddress().toString())
+ + (context.principal() == null ? "" : (" principal:" + context.principal())) + " op:" + operation
+ (topology_conf == null ? "" : (" topoology:" + topology_conf.get(Config.TOPOLOGY_NAME))));
String principal = context.principal().getName();
@@ -103,8 +104,8 @@ public class SimpleACLAuthorizer implements IAuthorizer {
if (_groupMappingProvider != null) {
try {
userGroups = _groupMappingProvider.getGroups(user);
- } catch(IOException e) {
- LOG.warn("Error while trying to fetch user groups",e);
+ } catch (IOException e) {
+ LOG.warn("Error while trying to fetch user groups", e);
}
}
@@ -123,7 +124,7 @@ public class SimpleACLAuthorizer implements IAuthorizer {
if (_topoCommands.contains(operation)) {
Set topoUsers = new HashSet<String>();
if (topology_conf.containsKey(Config.TOPOLOGY_USERS)) {
- topoUsers.addAll((Collection<String>)topology_conf.get(Config.TOPOLOGY_USERS));
+ topoUsers.addAll((Collection<String>) topology_conf.get(Config.TOPOLOGY_USERS));
}
if (topoUsers.contains(principal) || topoUsers.contains(user)) {
@@ -132,18 +133,19 @@ public class SimpleACLAuthorizer implements IAuthorizer {
Set<String> topoGroups = new HashSet<String>();
if (topology_conf.containsKey(Config.TOPOLOGY_GROUPS) && topology_conf.get(Config.TOPOLOGY_GROUPS) != null) {
- topoGroups.addAll((Collection<String>)topology_conf.get(Config.TOPOLOGY_GROUPS));
+ topoGroups.addAll((Collection<String>) topology_conf.get(Config.TOPOLOGY_GROUPS));
}
- if (checkUserGroupAllowed(userGroups, topoGroups)) return true;
+ if (checkUserGroupAllowed(userGroups, topoGroups))
+ return true;
}
return false;
}
private Boolean checkUserGroupAllowed(Set<String> userGroups, Set<String> configuredGroups) {
- if(userGroups.size() > 0 && configuredGroups.size() > 0) {
+ if (userGroups.size() > 0 && configuredGroups.size() > 0) {
for (String tgroup : configuredGroups) {
- if(userGroups.contains(tgroup))
+ if (userGroups.contains(tgroup))
return true;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/SimpleWhitelistAuthorizer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/SimpleWhitelistAuthorizer.java b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/SimpleWhitelistAuthorizer.java
index 55109f9..dbbc945 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/SimpleWhitelistAuthorizer.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/SimpleWhitelistAuthorizer.java
@@ -31,8 +31,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * An authorization implementation that simply checks a whitelist of users that
- * are allowed to use the cluster.
+ * An authorization implementation that simply checks a whitelist of users that are allowed to use the cluster.
*/
public class SimpleWhitelistAuthorizer implements IAuthorizer {
private static final Logger LOG = LoggerFactory.getLogger(SimpleWhitelistAuthorizer.class);
@@ -41,30 +40,30 @@ public class SimpleWhitelistAuthorizer implements IAuthorizer {
/**
* Invoked once immediately after construction
- * @param conf Storm configuration
+ *
+ * @param conf Storm configuration
*/
@Override
public void prepare(Map conf) {
users = new HashSet<String>();
if (conf.containsKey(WHITELIST_USERS_CONF)) {
- users.addAll((Collection<String>)conf.get(WHITELIST_USERS_CONF));
+ users.addAll((Collection<String>) conf.get(WHITELIST_USERS_CONF));
}
}
/**
* permit() method is invoked for each incoming Thrift request
- * @param context request context includes info about
+ *
+ * @param context request context includes info about
* @param operation operation name
- * @param topology_storm configuration of targeted topology
+ * @param topology_storm configuration of targeted topology
* @return true if the request is authorized, false if reject
*/
@Override
public boolean permit(ReqContext context, String operation, Map topology_conf) {
- LOG.info("[req "+ context.requestID()+ "] Access "
- + " from: " + (context.remoteAddress() == null? "null" : context.remoteAddress().toString())
- + (context.principal() == null? "" : (" principal:"+ context.principal()))
- +" op:"+operation
- + (topology_conf == null? "" : (" topoology:"+topology_conf.get(Config.TOPOLOGY_NAME))));
+ LOG.info("[req " + context.requestID() + "] Access " + " from: " + (context.remoteAddress() == null ? "null" : context.remoteAddress().toString())
+ + (context.principal() == null ? "" : (" principal:" + context.principal())) + " op:" + operation
+ + (topology_conf == null ? "" : (" topoology:" + topology_conf.get(Config.TOPOLOGY_NAME))));
return context.principal() != null ? users.contains(context.principal().getName()) : false;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/digest/ClientCallbackHandler.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/digest/ClientCallbackHandler.java b/jstorm-core/src/main/java/backtype/storm/security/auth/digest/ClientCallbackHandler.java
index 3caacaa..0e3f626 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/digest/ClientCallbackHandler.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/digest/ClientCallbackHandler.java
@@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory;
import backtype.storm.security.auth.AuthUtils;
/**
- * client side callback handler.
+ * client side callback handler.
*/
public class ClientCallbackHandler implements CallbackHandler {
private static final String USERNAME = "username";
@@ -51,28 +51,29 @@ public class ClientCallbackHandler implements CallbackHandler {
* @throws IOException
*/
public ClientCallbackHandler(Configuration configuration) throws IOException {
- if (configuration == null) return;
+ if (configuration == null)
+ return;
AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(AuthUtils.LOGIN_CONTEXT_CLIENT);
if (configurationEntries == null) {
- String errorMessage = "Could not find a '"+AuthUtils.LOGIN_CONTEXT_CLIENT
- + "' entry in this configuration: Client cannot start.";
+ String errorMessage = "Could not find a '" + AuthUtils.LOGIN_CONTEXT_CLIENT + "' entry in this configuration: Client cannot start.";
throw new IOException(errorMessage);
}
_password = "";
- for(AppConfigurationEntry entry: configurationEntries) {
+ for (AppConfigurationEntry entry : configurationEntries) {
if (entry.getOptions().get(USERNAME) != null) {
- _username = (String)entry.getOptions().get(USERNAME);
+ _username = (String) entry.getOptions().get(USERNAME);
}
if (entry.getOptions().get(PASSWORD) != null) {
- _password = (String)entry.getOptions().get(PASSWORD);
+ _password = (String) entry.getOptions().get(PASSWORD);
}
}
}
/**
* This method is invoked by SASL for authentication challenges
- * @param callbacks a collection of challenge callbacks
+ *
+ * @param callbacks a collection of challenge callbacks
*/
public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
for (Callback c : callbacks) {
@@ -82,10 +83,10 @@ public class ClientCallbackHandler implements CallbackHandler {
nc.setName(_username);
} else if (c instanceof PasswordCallback) {
LOG.debug("password callback");
- PasswordCallback pc = (PasswordCallback)c;
+ PasswordCallback pc = (PasswordCallback) c;
if (_password != null) {
pc.setPassword(_password.toCharArray());
- }
+ }
} else if (c instanceof AuthorizeCallback) {
LOG.debug("authorization callback");
AuthorizeCallback ac = (AuthorizeCallback) c;
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/digest/DigestSaslTransportPlugin.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/digest/DigestSaslTransportPlugin.java b/jstorm-core/src/main/java/backtype/storm/security/auth/digest/DigestSaslTransportPlugin.java
index ad642d8..7b497c6 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/digest/DigestSaslTransportPlugin.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/digest/DigestSaslTransportPlugin.java
@@ -38,11 +38,11 @@ public class DigestSaslTransportPlugin extends SaslTransportPlugin {
public static final String DIGEST = "DIGEST-MD5";
private static final Logger LOG = LoggerFactory.getLogger(DigestSaslTransportPlugin.class);
- protected TTransportFactory getServerTransportFactory() throws IOException {
- //create an authentication callback handler
+ protected TTransportFactory getServerTransportFactory() throws IOException {
+ // create an authentication callback handler
CallbackHandler serer_callback_handler = new ServerCallbackHandler(login_conf);
- //create a transport factory that will invoke our auth callback for digest
+ // create a transport factory that will invoke our auth callback for digest
TSaslServerTransport.Factory factory = new TSaslServerTransport.Factory();
factory.addServerDefinition(DIGEST, AuthUtils.SERVICE, "localhost", null, serer_callback_handler);
@@ -53,13 +53,8 @@ public class DigestSaslTransportPlugin extends SaslTransportPlugin {
@Override
public TTransport connect(TTransport transport, String serverHost, String asUser) throws TTransportException, IOException {
ClientCallbackHandler client_callback_handler = new ClientCallbackHandler(login_conf);
- TSaslClientTransport wrapper_transport = new TSaslClientTransport(DIGEST,
- null,
- AuthUtils.SERVICE,
- serverHost,
- null,
- client_callback_handler,
- transport);
+ TSaslClientTransport wrapper_transport =
+ new TSaslClientTransport(DIGEST, null, AuthUtils.SERVICE, serverHost, null, client_callback_handler, transport);
wrapper_transport.open();
LOG.debug("SASL DIGEST-MD5 client transport has been established");