You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:05:11 UTC

[33/51] [partial] storm git commit: Update JStorm to latest release 2.1.0

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/IAuthorizer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/IAuthorizer.java b/jstorm-core/src/main/java/backtype/storm/security/auth/IAuthorizer.java
index d592bb7..7ed498b 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/IAuthorizer.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/IAuthorizer.java
@@ -20,27 +20,27 @@ package backtype.storm.security.auth;
 import java.util.Map;
 
 /**
- * Nimbus could be configured with an authorization plugin.
- * If not specified, all requests are authorized.
+ * Nimbus could be configured with an authorization plugin. If not specified, all requests are authorized.
  * 
- * You could specify the authorization plugin via storm parameter. For example:
- *  storm -c nimbus.authorization.class=backtype.storm.security.auth.NoopAuthorizer ...
- *  
- * You could also specify it via storm.yaml:
- *   nimbus.authorization.class: backtype.storm.security.auth.NoopAuthorizer
+ * You could specify the authorization plugin via storm parameter. For example: storm -c nimbus.authorization.class=backtype.storm.security.auth.NoopAuthorizer
+ * ...
+ * 
+ * You could also specify it via storm.yaml: nimbus.authorization.class: backtype.storm.security.auth.NoopAuthorizer
  */
 public interface IAuthorizer {
     /**
      * Invoked once immediately after construction
-     * @param conf Storm configuration 
+     * 
+     * @param conf Storm configuration
      */
     void prepare(Map storm_conf);
-    
+
     /**
      * permit() method is invoked for each incoming Thrift request.
-     * @param context request context includes info about 
+     * 
+     * @param context request context includes info about
      * @param operation operation name
-     * @param topology_storm configuration of targeted topology 
+     * @param topology_storm configuration of targeted topology
      * @return true if the request is authorized, false if reject
      */
     public boolean permit(ReqContext context, String operation, Map topology_conf);

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/IAutoCredentials.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/IAutoCredentials.java b/jstorm-core/src/main/java/backtype/storm/security/auth/IAutoCredentials.java
index b3886da..16841d5 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/IAutoCredentials.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/IAutoCredentials.java
@@ -23,8 +23,7 @@ import java.util.Map;
 import javax.security.auth.Subject;
 
 /**
- * Provides a way to automatically push credentials to a topology and to
- * retreave them in the worker.
+ * Provides a way to automatically push credentials to a topology and to retreave them in the worker.
  */
 public interface IAutoCredentials {
 
@@ -32,24 +31,26 @@ public interface IAutoCredentials {
 
     /**
      * Called to populate the credentials on the client side.
+     * 
      * @param credentials the credentials to be populated.
      */
     public void populateCredentials(Map<String, String> credentials);
 
     /**
      * Called to initially populate the subject on the worker side with credentials passed in.
+     * 
      * @param subject the subject to optionally put credentials in.
      * @param credentials the credentials to be used.
-     */ 
+     */
     public void populateSubject(Subject subject, Map<String, String> credentials);
 
-
     /**
-     * Called to update the subject on the worker side when new credentials are recieved.
-     * This means that populateSubject has already been called on this subject.  
+     * Called to update the subject on the worker side when new credentials are recieved. This means that populateSubject has already been called on this
+     * subject.
+     * 
      * @param subject the subject to optionally put credentials in.
      * @param credentials the credentials to be used.
-     */ 
+     */
     public void updateSubject(Subject subject, Map<String, String> credentials);
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/ICredentialsRenewer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/ICredentialsRenewer.java b/jstorm-core/src/main/java/backtype/storm/security/auth/ICredentialsRenewer.java
index 3eaf6c4..34358f4 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/ICredentialsRenewer.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/ICredentialsRenewer.java
@@ -26,16 +26,18 @@ import java.util.Map;
  */
 public interface ICredentialsRenewer {
 
-   /**
-    * Called when initializing the service.
-    * @param conf the storm cluster configuration.
-    */ 
-   public void prepare(Map conf);
+    /**
+     * Called when initializing the service.
+     * 
+     * @param conf the storm cluster configuration.
+     */
+    public void prepare(Map conf);
 
     /**
      * Renew any credentials that need to be renewed. (Update the credentials if needed)
+     * 
      * @param credentials the credentials that may have something to renew.
      * @param topologyConf topology configuration.
-     */ 
+     */
     public void renew(Map<String, String> credentials, Map topologyConf);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/IGroupMappingServiceProvider.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/IGroupMappingServiceProvider.java b/jstorm-core/src/main/java/backtype/storm/security/auth/IGroupMappingServiceProvider.java
index 5590b81..865e950 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/IGroupMappingServiceProvider.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/IGroupMappingServiceProvider.java
@@ -26,13 +26,14 @@ public interface IGroupMappingServiceProvider {
 
     /**
      * Invoked once immediately after construction
+     * 
      * @param storm_conf Storm configuration
      */
     void prepare(Map storm_conf);
 
     /**
-     * Get all various group memberships of a given user.
-     * Returns EMPTY list in case of non-existing user
+     * Get all various group memberships of a given user. Returns EMPTY list in case of non-existing user
+     * 
      * @param user User's name
      * @return group memberships of user
      * @throws IOException

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/IHttpCredentialsPlugin.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/IHttpCredentialsPlugin.java b/jstorm-core/src/main/java/backtype/storm/security/auth/IHttpCredentialsPlugin.java
index a012ce4..66dfcee 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/IHttpCredentialsPlugin.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/IHttpCredentialsPlugin.java
@@ -29,20 +29,22 @@ import backtype.storm.security.auth.ReqContext;
 public interface IHttpCredentialsPlugin {
     /**
      * Invoked once immediately after construction
+     * 
      * @param storm_conf Storm configuration
      */
     void prepare(Map storm_conf);
 
     /**
      * Gets the user name from the request.
+     * 
      * @param req the servlet request
      * @return the authenticated user, or null if none is authenticated.
      */
     String getUserName(HttpServletRequest req);
 
     /**
-     * Populates a given context with credentials information from an HTTP
-     * request.
+     * Populates a given context with credentials information from an HTTP request.
+     * 
      * @param req the servlet request
      * @return the context
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/IPrincipalToLocal.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/IPrincipalToLocal.java b/jstorm-core/src/main/java/backtype/storm/security/auth/IPrincipalToLocal.java
index fca3d37..32b4564 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/IPrincipalToLocal.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/IPrincipalToLocal.java
@@ -22,18 +22,19 @@ import java.util.Map;
 import java.security.Principal;
 
 /**
- * Storm can be configured to launch worker processed as a given user.
- * Some transports need to map the Principal to a local user name.
+ * Storm can be configured to launch worker processed as a given user. Some transports need to map the Principal to a local user name.
  */
 public interface IPrincipalToLocal {
     /**
      * Invoked once immediately after construction
-     * @param conf Storm configuration 
+     * 
+     * @param conf Storm configuration
      */
     void prepare(Map storm_conf);
-    
+
     /**
      * Convert a Principal to a local user name.
+     * 
      * @param principal the principal to convert
      * @return The local user name.
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/ITransportPlugin.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/ITransportPlugin.java b/jstorm-core/src/main/java/backtype/storm/security/auth/ITransportPlugin.java
index 5ba2557..c3c657f 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/ITransportPlugin.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/ITransportPlugin.java
@@ -37,25 +37,28 @@ import backtype.storm.security.auth.ThriftConnectionType;
 public interface ITransportPlugin {
     /**
      * Invoked once immediately after construction
+     * 
      * @param type the type of connection this will process.
-     * @param storm_conf Storm configuration 
+     * @param storm_conf Storm configuration
      * @param login_conf login configuration
      */
     void prepare(ThriftConnectionType type, Map storm_conf, Configuration login_conf);
-    
+
     /**
      * Create a server associated with a given port, service handler, and purpose
+     * 
      * @param processor service handler
      * @return server
      */
     public TServer getServer(TProcessor processor) throws IOException, TTransportException;
 
     /**
-     * Connect to the specified server via framed transport 
+     * Connect to the specified server via framed transport
+     * 
      * @param transport The underlying Thrift transport.
      * @param serverHost server host
-     * @param asUser the user as which the connection should be established, and all the subsequent actions should be executed.
-     *               Only applicable when using secure storm cluster. A null/blank value here will just indicate to use the logged in user.
+     * @param asUser the user as which the connection should be established, and all the subsequent actions should be executed. Only applicable when using
+     *            secure storm cluster. A null/blank value here will just indicate to use the logged in user.
      */
     public TTransport connect(TTransport transport, String serverHost, String asUser) throws IOException, TTransportException;
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/KerberosPrincipalToLocal.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/KerberosPrincipalToLocal.java b/jstorm-core/src/main/java/backtype/storm/security/auth/KerberosPrincipalToLocal.java
index 35c7788..7ac6a6d 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/KerberosPrincipalToLocal.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/KerberosPrincipalToLocal.java
@@ -28,18 +28,21 @@ public class KerberosPrincipalToLocal implements IPrincipalToLocal {
 
     /**
      * Invoked once immediately after construction
-     * @param conf Storm configuration 
+     * 
+     * @param conf Storm configuration
      */
-    public void prepare(Map storm_conf) {}
-    
+    public void prepare(Map storm_conf) {
+    }
+
     /**
      * Convert a Principal to a local user name.
+     * 
      * @param principal the principal to convert
      * @return The local user name.
      */
     public String toLocal(Principal principal) {
-      //This technically does not conform with rfc1964, but should work so
-      // long as you don't have any really odd names in your KDC.
-      return principal == null ? null : principal.getName().split("[/@]")[0];
+        // This technically does not conform with rfc1964, but should work so
+        // long as you don't have any really odd names in your KDC.
+        return principal == null ? null : principal.getName().split("[/@]")[0];
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/ReqContext.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/ReqContext.java b/jstorm-core/src/main/java/backtype/storm/security/auth/ReqContext.java
index a252f85..47f317c 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/ReqContext.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/ReqContext.java
@@ -31,10 +31,7 @@ import java.security.Principal;
 import javax.security.auth.Subject;
 
 /**
- * context request context includes info about 
- *      	   (1) remote address, 
- *             (2) remote subject and primary principal
- *             (3) request ID 
+ * context request context includes info about (1) remote address, (2) remote subject and primary principal (3) request ID
  */
 public class ReqContext {
     private static final AtomicInteger uniqueId = new AtomicInteger(0);
@@ -46,39 +43,37 @@ public class ReqContext {
 
     private static final Logger LOG = LoggerFactory.getLogger(ReqContext.class);
 
-
     /**
      * Get a request context associated with current thread
+     * 
      * @return
      */
     public static ReqContext context() {
         return ctxt.get();
     }
 
-    //each thread will have its own request context
-    private static final ThreadLocal < ReqContext > ctxt = 
-            new ThreadLocal < ReqContext > () {
-        @Override 
+    // each thread will have its own request context
+    private static final ThreadLocal<ReqContext> ctxt = new ThreadLocal<ReqContext>() {
+        @Override
         protected ReqContext initialValue() {
             return new ReqContext(AccessController.getContext());
         }
     };
 
-    //private constructor
+    // private constructor
     @VisibleForTesting
     public ReqContext(AccessControlContext acl_ctxt) {
         _subject = Subject.getSubject(acl_ctxt);
         _reqID = uniqueId.incrementAndGet();
     }
 
-    //private constructor
+    // private constructor
     @VisibleForTesting
     public ReqContext(Subject sub) {
         _subject = sub;
         _reqID = uniqueId.incrementAndGet();
     }
 
-
     /**
      * client address
      */
@@ -108,15 +103,18 @@ public class ReqContext {
      * The primary principal associated current subject
      */
     public Principal principal() {
-        if (_subject == null) return null;
+        if (_subject == null)
+            return null;
         Set<Principal> princs = _subject.getPrincipals();
-        if (princs.size()==0) return null;
+        if (princs.size() == 0)
+            return null;
         return (Principal) (princs.toArray()[0]);
     }
 
     public void setRealPrincipal(Principal realPrincipal) {
         this.realPrincipal = realPrincipal;
     }
+
     /**
      * The real principal associated with the subject.
      */
@@ -126,12 +124,13 @@ public class ReqContext {
 
     /**
      * Returns true if this request is an impersonation request.
+     * 
      * @return
      */
     public boolean isImpersonating() {
         return this.realPrincipal != null;
     }
-    
+
     /**
      * request ID of this request
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/SaslTransportPlugin.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/SaslTransportPlugin.java b/jstorm-core/src/main/java/backtype/storm/security/auth/SaslTransportPlugin.java
index 7208a17..9c8780d 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/SaslTransportPlugin.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/SaslTransportPlugin.java
@@ -73,11 +73,9 @@ public abstract class SaslTransportPlugin implements ITransportPlugin {
         int numWorkerThreads = type.getNumThreads(storm_conf);
         Integer queueSize = type.getQueueSize(storm_conf);
 
-        TThreadPoolServer.Args server_args = new TThreadPoolServer.Args(serverTransport).
-                processor(new TUGIWrapProcessor(processor)).
-                minWorkerThreads(numWorkerThreads).
-                maxWorkerThreads(numWorkerThreads).
-                protocolFactory(new TBinaryProtocol.Factory(false, true));
+        TThreadPoolServer.Args server_args =
+                new TThreadPoolServer.Args(serverTransport).processor(new TUGIWrapProcessor(processor)).minWorkerThreads(numWorkerThreads)
+                        .maxWorkerThreads(numWorkerThreads).protocolFactory(new TBinaryProtocol.Factory(false, true));
 
         if (serverTransportFactory != null) {
             server_args.transportFactory(serverTransportFactory);
@@ -86,26 +84,23 @@ public abstract class SaslTransportPlugin implements ITransportPlugin {
         if (queueSize != null) {
             workQueue = new ArrayBlockingQueue(queueSize);
         }
-        ThreadPoolExecutor executorService = new ExtendedThreadPoolExecutor(numWorkerThreads, numWorkerThreads,
-            60, TimeUnit.SECONDS, workQueue);
+        ThreadPoolExecutor executorService = new ExtendedThreadPoolExecutor(numWorkerThreads, numWorkerThreads, 60, TimeUnit.SECONDS, workQueue);
         server_args.executorService(executorService);
         return new TThreadPoolServer(server_args);
     }
 
     /**
      * All subclass must implement this method
+     * 
      * @return
      * @throws IOException
      */
     protected abstract TTransportFactory getServerTransportFactory() throws IOException;
 
-
-    /**                                                                                                                                                                             
-     * Processor that pulls the SaslServer object out of the transport, and                                                                                                         
-     * assumes the remote user's UGI before calling through to the original                                                                                                         
-     * processor.                                                                                                                                                                   
-     *                                                                                                                                                                              
-     * This is used on the server side to set the UGI for each specific call.                                                                                                       
+    /**
+     * Processor that pulls the SaslServer object out of the transport, and assumes the remote user's UGI before calling through to the original processor.
+     * 
+     * This is used on the server side to set the UGI for each specific call.
      */
     private class TUGIWrapProcessor implements TProcessor {
         final TProcessor wrapped;
@@ -115,25 +110,25 @@ public abstract class SaslTransportPlugin implements ITransportPlugin {
         }
 
         public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException {
-            //populating request context 
+            // populating request context
             ReqContext req_context = ReqContext.context();
 
             TTransport trans = inProt.getTransport();
-            //Sasl transport
-            TSaslServerTransport saslTrans = (TSaslServerTransport)trans;
-            //remote address
-            TSocket tsocket = (TSocket)saslTrans.getUnderlyingTransport();
+            // Sasl transport
+            TSaslServerTransport saslTrans = (TSaslServerTransport) trans;
+            // remote address
+            TSocket tsocket = (TSocket) saslTrans.getUnderlyingTransport();
             Socket socket = tsocket.getSocket();
             req_context.setRemoteAddress(socket.getInetAddress());
 
-            //remote subject 
+            // remote subject
             SaslServer saslServer = saslTrans.getSaslServer();
             String authId = saslServer.getAuthorizationID();
             Subject remoteUser = new Subject();
             remoteUser.getPrincipals().add(new User(authId));
             req_context.setSubject(remoteUser);
 
-            //invoke service handler
+            // invoke service handler
             return wrapped.process(inProt, outProt);
         }
     }
@@ -142,11 +137,11 @@ public abstract class SaslTransportPlugin implements ITransportPlugin {
         private final String name;
 
         public User(String name) {
-            this.name =  name;
+            this.name = name;
         }
 
-        /**                                                                                                                                                                                
-         * Get the full name of the user.                                                                                                                                                  
+        /**
+         * Get the full name of the user.
          */
         public String getName() {
             return name;

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/ShellBasedGroupsMapping.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/ShellBasedGroupsMapping.java b/jstorm-core/src/main/java/backtype/storm/security/auth/ShellBasedGroupsMapping.java
index 62a4c7e..16f2fe4 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/ShellBasedGroupsMapping.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/ShellBasedGroupsMapping.java
@@ -31,15 +31,14 @@ import backtype.storm.utils.ShellUtils.ExitCodeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
-public class ShellBasedGroupsMapping implements
-                                             IGroupMappingServiceProvider {
+public class ShellBasedGroupsMapping implements IGroupMappingServiceProvider {
 
     public static Logger LOG = LoggerFactory.getLogger(ShellBasedGroupsMapping.class);
     public TimeCacheMap<String, Set<String>> cachedGroups;
 
     /**
      * Invoked once immediately after construction
+     * 
      * @param storm_conf Storm configuration
      */
     @Override
@@ -50,24 +49,24 @@ public class ShellBasedGroupsMapping implements
 
     /**
      * Returns list of groups for a user
-     *
+     * 
      * @param user get groups for this user
      * @return list of groups for a given user
      */
     @Override
     public Set<String> getGroups(String user) throws IOException {
-        if(cachedGroups.containsKey(user)) {
+        if (cachedGroups.containsKey(user)) {
             return cachedGroups.get(user);
         }
         Set<String> groups = getUnixGroups(user);
-        if(!groups.isEmpty())
-            cachedGroups.put(user,groups);
+        if (!groups.isEmpty())
+            cachedGroups.put(user, groups);
         return groups;
     }
 
     /**
-     * Get the current user's group list from Unix by running the command 'groups'
-     * NOTE. For non-existing user it will return EMPTY list
+     * Get the current user's group list from Unix by running the command 'groups' NOTE. For non-existing user it will return EMPTY list
+     * 
      * @param user user name
      * @return the groups set that the <code>user</code> belongs to
      * @throws IOException if encounter any error when running the command
@@ -82,8 +81,7 @@ public class ShellBasedGroupsMapping implements
             return new HashSet<String>();
         }
 
-        StringTokenizer tokenizer =
-            new StringTokenizer(result, ShellUtils.TOKEN_SEPARATOR_REGEX);
+        StringTokenizer tokenizer = new StringTokenizer(result, ShellUtils.TOKEN_SEPARATOR_REGEX);
         Set<String> groups = new HashSet<String>();
         while (tokenizer.hasMoreTokens()) {
             groups.add(tokenizer.nextToken());

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/SimpleTransportPlugin.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/SimpleTransportPlugin.java b/jstorm-core/src/main/java/backtype/storm/security/auth/SimpleTransportPlugin.java
index 2abcdae..c7e816f 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/SimpleTransportPlugin.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/SimpleTransportPlugin.java
@@ -73,22 +73,21 @@ public class SimpleTransportPlugin implements ITransportPlugin {
         int maxBufferSize = type.getMaxBufferSize(storm_conf);
         Integer queueSize = type.getQueueSize(storm_conf);
 
-        THsHaServer.Args server_args = new THsHaServer.Args(serverTransport).
-                processor(new SimpleWrapProcessor(processor)).
-                workerThreads(numWorkerThreads).
-                protocolFactory(new TBinaryProtocol.Factory(false, true, maxBufferSize, -1));
+        THsHaServer.Args server_args =
+                new THsHaServer.Args(serverTransport).processor(new SimpleWrapProcessor(processor)).workerThreads(numWorkerThreads)
+                        .protocolFactory(new TBinaryProtocol.Factory(false, true, maxBufferSize, -1));
 
         if (queueSize != null) {
-            server_args.executorService(new ThreadPoolExecutor(numWorkerThreads, numWorkerThreads, 
-                                   60, TimeUnit.SECONDS, new ArrayBlockingQueue(queueSize)));
+            server_args.executorService(new ThreadPoolExecutor(numWorkerThreads, numWorkerThreads, 60, TimeUnit.SECONDS, new ArrayBlockingQueue(queueSize)));
         }
 
-        //construct THsHaServer
+        // construct THsHaServer
         return new THsHaServer(server_args);
     }
 
     /**
-     * Connect to the specified server via framed transport 
+     * Connect to the specified server via framed transport
+     * 
      * @param transport The underlying Thrift transport.
      * @param serverHost unused.
      * @param asUser unused.
@@ -96,10 +95,10 @@ public class SimpleTransportPlugin implements ITransportPlugin {
     @Override
     public TTransport connect(TTransport transport, String serverHost, String asUser) throws TTransportException {
         int maxBufferSize = type.getMaxBufferSize(storm_conf);
-        //create a framed transport
+        // create a framed transport
         TTransport conn = new TFramedTransport(transport, maxBufferSize);
 
-        //connect
+        // connect
         conn.open();
         LOG.debug("Simple client transport has been established");
 
@@ -108,13 +107,13 @@ public class SimpleTransportPlugin implements ITransportPlugin {
 
     /**
      * @return the subject that will be used for all connections
-     */  
+     */
     protected Subject getDefaultSubject() {
         return null;
     }
 
-    /**                                                                                                                                                                             
-     * Processor that populate simple transport info into ReqContext, and then invoke a service handler                                                                              
+    /**
+     * Processor that populate simple transport info into ReqContext, and then invoke a service handler
      */
     private class SimpleWrapProcessor implements TProcessor {
         final TProcessor wrapped;
@@ -124,7 +123,7 @@ public class SimpleTransportPlugin implements ITransportPlugin {
         }
 
         public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException {
-            //populating request context 
+            // populating request context
             ReqContext req_context = ReqContext.context();
 
             TTransport trans = inProt.getTransport();
@@ -133,31 +132,36 @@ public class SimpleTransportPlugin implements ITransportPlugin {
                     req_context.setRemoteAddress(InetAddress.getLocalHost());
                 } catch (UnknownHostException e) {
                     throw new RuntimeException(e);
-                }                                
+                }
             } else if (trans instanceof TSocket) {
-                TSocket tsocket = (TSocket)trans;
-                //remote address
+                TSocket tsocket = (TSocket) trans;
+                // remote address
                 Socket socket = tsocket.getSocket();
-                req_context.setRemoteAddress(socket.getInetAddress());                
-            } 
+                req_context.setRemoteAddress(socket.getInetAddress());
+            }
 
-            //anonymous user
+            // anonymous user
             Subject s = getDefaultSubject();
             if (s == null) {
-              final String user = (String)storm_conf.get("debug.simple.transport.user");
-              if (user != null) {
-                HashSet<Principal> principals = new HashSet<Principal>();
-                principals.add(new Principal() {
-                  public String getName() { return user; }
-                  public String toString() { return user; }
-                });
-                s = new Subject(true, principals, new HashSet<Object>(), new HashSet<Object>());
-              }
+                final String user = (String) storm_conf.get("debug.simple.transport.user");
+                if (user != null) {
+                    HashSet<Principal> principals = new HashSet<Principal>();
+                    principals.add(new Principal() {
+                        public String getName() {
+                            return user;
+                        }
+
+                        public String toString() {
+                            return user;
+                        }
+                    });
+                    s = new Subject(true, principals, new HashSet<Object>(), new HashSet<Object>());
+                }
             }
             req_context.setSubject(s);
 
-            //invoke service handler
+            // invoke service handler
             return wrapped.process(inProt, outProt);
         }
-    } 
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/SingleUserPrincipal.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/SingleUserPrincipal.java b/jstorm-core/src/main/java/backtype/storm/security/auth/SingleUserPrincipal.java
index 6af17fa..fd9e694 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/SingleUserPrincipal.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/SingleUserPrincipal.java
@@ -34,7 +34,7 @@ public class SingleUserPrincipal implements Principal {
     @Override
     public boolean equals(Object another) {
         if (another instanceof SingleUserPrincipal) {
-            return _userName.equals(((SingleUserPrincipal)another)._userName);
+            return _userName.equals(((SingleUserPrincipal) another)._userName);
         }
         return false;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/TBackoffConnect.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/TBackoffConnect.java b/jstorm-core/src/main/java/backtype/storm/security/auth/TBackoffConnect.java
index f547868..b699bc4 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/TBackoffConnect.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/TBackoffConnect.java
@@ -35,15 +35,13 @@ public class TBackoffConnect {
     public TBackoffConnect(int retryTimes, int retryInterval, int retryIntervalCeiling) {
 
         _retryTimes = retryTimes;
-        waitGrabber = new StormBoundedExponentialBackoffRetry(retryInterval,
-                                                              retryIntervalCeiling,
-                                                              retryTimes);
+        waitGrabber = new StormBoundedExponentialBackoffRetry(retryInterval, retryIntervalCeiling, retryTimes);
     }
 
     public TTransport doConnectWithRetry(ITransportPlugin transportPlugin, TTransport underlyingTransport, String host, String asUser) throws IOException {
         boolean connected = false;
         TTransport transportResult = null;
-        while(!connected) {
+        while (!connected) {
             try {
                 transportResult = transportPlugin.connect(underlyingTransport, host, asUser);
                 connected = true;
@@ -55,13 +53,13 @@ public class TBackoffConnect {
     }
 
     private void retryNext(TTransportException ex) {
-        if(!canRetry()) {
+        if (!canRetry()) {
             throw new RuntimeException(ex);
         }
         try {
             int sleeptime = waitGrabber.getSleepTimeMs(_completedRetries, 0);
 
-            LOG.debug("Failed to connect. Retrying... (" + Integer.toString( _completedRetries) + ") in " + Integer.toString(sleeptime) + "ms");
+            LOG.debug("Failed to connect. Retrying... (" + Integer.toString(_completedRetries) + ") in " + Integer.toString(sleeptime) + "ms");
 
             Thread.sleep(sleeptime);
         } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftClient.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftClient.java b/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftClient.java
index 8d2136a..954b4f8 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftClient.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftClient.java
@@ -45,17 +45,17 @@ public class ThriftClient {
     private String hostPort;
     private String host;
     private Integer port;
-    
+
     private Map<Object, Object> conf;
-    
+
     private Integer timeout;
     private ThriftConnectionType type;
     private String asUser;
-    
+
     public ThriftClient(Map conf, ThriftConnectionType type) throws Exception {
         this(conf, type, null, null, null, null);
     }
-    
+
     @SuppressWarnings("unchecked")
     public ThriftClient(Map conf, ThriftConnectionType type, Integer timeout) throws Exception {
         this(conf, type, null, null, timeout, null);
@@ -63,6 +63,7 @@ public class ThriftClient {
 
     /**
      * This is only for be compatible for Storm
+     * 
      * @param conf
      * @param type
      * @param host
@@ -71,45 +72,39 @@ public class ThriftClient {
         this(conf, type, host, null, null, null);
     }
 
-    public ThriftClient(Map conf, ThriftConnectionType type, String host, Integer port, Integer timeout){
+    public ThriftClient(Map conf, ThriftConnectionType type, String host, Integer port, Integer timeout) {
         this(conf, type, host, port, timeout, null);
     }
 
     public ThriftClient(Map conf, ThriftConnectionType type, String host, Integer port, Integer timeout, String asUser) {
-        //create a socket with server
-        
+        // create a socket with server
+
         this.timeout = timeout;
         this.conf = conf;
         this.type = type;
         this.asUser = asUser;
-        
+
         getMaster(conf, host, port);
         reconnect();
     }
-    
-    
-    
+
     public static String getMasterByZk(Map conf) throws Exception {
 
-        
         CuratorFramework zkobj = null;
         String masterHost = null;
-        
+
         try {
             String root = String.valueOf(conf.get(Config.STORM_ZOOKEEPER_ROOT));
             String zkMasterDir = root + Cluster.MASTER_SUBTREE;
-            
-            zkobj = Utils.newCurator(conf, 
-                            (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS), 
-                            conf.get(Config.STORM_ZOOKEEPER_PORT), 
-                            zkMasterDir);
+
+            zkobj = Utils.newCurator(conf, (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS), conf.get(Config.STORM_ZOOKEEPER_PORT), zkMasterDir);
             zkobj.start();
             if (zkobj.checkExists().forPath("/") == null) {
                 throw new RuntimeException("No alive nimbus ");
             }
-            
+
             masterHost = new String(zkobj.getData().forPath("/"));
-            
+
             LOG.info("masterHost:" + masterHost);
             return masterHost;
         } finally {
@@ -119,8 +114,8 @@ public class ThriftClient {
             }
         }
     }
-    
-    public void getMaster(Map conf, String host, Integer port){
+
+    public void getMaster(Map conf, String host, Integer port) {
         if (StringUtils.isBlank(host) == false) {
             this.host = host;
             if (port == null) {
@@ -128,7 +123,7 @@ public class ThriftClient {
             }
             this.port = port;
             this.hostPort = host + ":" + port;
-        }else {
+        } else {
             try {
                 hostPort = ThriftClient.getMasterByZk(conf);
             } catch (Exception e) {
@@ -142,7 +137,7 @@ public class ThriftClient {
             this.host = host_port[0];
             this.port = Integer.parseInt(host_port[1]);
         }
-        
+
         // create a socket with server
         if (this.host == null) {
             throw new IllegalArgumentException("host is not set");
@@ -151,45 +146,43 @@ public class ThriftClient {
             throw new IllegalArgumentException("invalid port: " + port);
         }
     }
-    
+
     public synchronized TTransport transport() {
         return _transport;
     }
-    
+
     public synchronized void reconnect() {
-        close();    
+        close();
         try {
             TSocket socket = new TSocket(host, port);
-            if(timeout!=null) {
+            if (timeout != null) {
                 socket.setTimeout(timeout);
-            }else {
-                //@@@ Todo
+            } else {
+                // @@@ Todo
                 // set the socket default Timeout as xxxx
             }
 
-            //locate login configuration 
+            // locate login configuration
             Configuration login_conf = AuthUtils.GetConfiguration(conf);
 
-            //construct a transport plugin
+            // construct a transport plugin
             ITransportPlugin transportPlugin = AuthUtils.GetTransportPlugin(type, conf, login_conf);
 
             final TTransport underlyingTransport = socket;
 
-            //TODO get this from type instead of hardcoding to Nimbus.
-            //establish client-server transport via plugin
-            //do retries if the connect fails
-            TBackoffConnect connectionRetry 
-                = new TBackoffConnect(
-                                      Utils.getInt(conf.get(Config.STORM_NIMBUS_RETRY_TIMES)),
-                                      Utils.getInt(conf.get(Config.STORM_NIMBUS_RETRY_INTERVAL)),
-                                      Utils.getInt(conf.get(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING)));
+            // TODO get this from type instead of hardcoding to Nimbus.
+            // establish client-server transport via plugin
+            // do retries if the connect fails
+            TBackoffConnect connectionRetry =
+                    new TBackoffConnect(Utils.getInt(conf.get(Config.STORM_NIMBUS_RETRY_TIMES)), Utils.getInt(conf.get(Config.STORM_NIMBUS_RETRY_INTERVAL)),
+                            Utils.getInt(conf.get(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING)));
             _transport = connectionRetry.doConnectWithRetry(transportPlugin, underlyingTransport, host, asUser);
         } catch (IOException ex) {
             throw new RuntimeException(ex);
         }
         _protocol = null;
         if (_transport != null) {
-            _protocol = new  TBinaryProtocol(_transport);
+            _protocol = new TBinaryProtocol(_transport);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftConnectionType.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftConnectionType.java b/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftConnectionType.java
index f9be7ae..e248df8 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftConnectionType.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftConnectionType.java
@@ -26,12 +26,9 @@ import java.util.Map;
  * The purpose for which the Thrift server is created.
  */
 public enum ThriftConnectionType {
-    NIMBUS(Config.NIMBUS_THRIFT_TRANSPORT_PLUGIN, Config.NIMBUS_THRIFT_PORT, null,
-         Config.NIMBUS_THRIFT_THREADS, Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE),
-    DRPC(Config.DRPC_THRIFT_TRANSPORT_PLUGIN, Config.DRPC_PORT, Config.DRPC_QUEUE_SIZE,
-         Config.DRPC_WORKER_THREADS, Config.DRPC_MAX_BUFFER_SIZE),
-    DRPC_INVOCATIONS(Config.DRPC_INVOCATIONS_THRIFT_TRANSPORT_PLUGIN, Config.DRPC_INVOCATIONS_PORT, null,
-         Config.DRPC_INVOCATIONS_THREADS, Config.DRPC_MAX_BUFFER_SIZE);
+    NIMBUS(Config.NIMBUS_THRIFT_TRANSPORT_PLUGIN, Config.NIMBUS_THRIFT_PORT, null, Config.NIMBUS_THRIFT_THREADS, Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE), DRPC(
+            Config.DRPC_THRIFT_TRANSPORT_PLUGIN, Config.DRPC_PORT, Config.DRPC_QUEUE_SIZE, Config.DRPC_WORKER_THREADS, Config.DRPC_MAX_BUFFER_SIZE), DRPC_INVOCATIONS(
+            Config.DRPC_INVOCATIONS_THRIFT_TRANSPORT_PLUGIN, Config.DRPC_INVOCATIONS_PORT, null, Config.DRPC_INVOCATIONS_THREADS, Config.DRPC_MAX_BUFFER_SIZE);
 
     private final String _transConf;
     private final String _portConf;
@@ -39,8 +36,7 @@ public enum ThriftConnectionType {
     private final String _threadsConf;
     private final String _buffConf;
 
-    ThriftConnectionType(String transConf, String portConf, String qConf,
-                         String threadsConf, String buffConf) {
+    ThriftConnectionType(String transConf, String portConf, String qConf, String threadsConf, String buffConf) {
         _transConf = transConf;
         _portConf = portConf;
         _qConf = qConf;
@@ -49,9 +45,9 @@ public enum ThriftConnectionType {
     }
 
     public String getTransportPlugin(Map conf) {
-        String ret = (String)conf.get(_transConf);
+        String ret = (String) conf.get(_transConf);
         if (ret == null) {
-            ret = (String)conf.get(Config.STORM_THRIFT_TRANSPORT_PLUGIN);
+            ret = (String) conf.get(Config.STORM_THRIFT_TRANSPORT_PLUGIN);
         }
         return ret;
     }
@@ -64,10 +60,10 @@ public enum ThriftConnectionType {
         if (_qConf == null) {
             return null;
         }
-        return (Integer)conf.get(_qConf);
+        return (Integer) conf.get(_qConf);
     }
 
-    public int getNumThreads(Map conf) { 
+    public int getNumThreads(Map conf) {
         return Utils.getInt(conf.get(_threadsConf));
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftServer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftServer.java b/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftServer.java
index 64243ce..410f1ce 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftServer.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/ThriftServer.java
@@ -28,19 +28,19 @@ import org.slf4j.LoggerFactory;
 
 public class ThriftServer {
     private static final Logger LOG = LoggerFactory.getLogger(ThriftServer.class);
-    private Map _storm_conf; //storm configuration
+    private Map _storm_conf; // storm configuration
     protected TProcessor _processor = null;
     private final ThriftConnectionType _type;
     private TServer _server = null;
     private Configuration _login_conf;
-    
+
     public ThriftServer(Map storm_conf, TProcessor processor, ThriftConnectionType type) {
         _storm_conf = storm_conf;
         _processor = processor;
         _type = type;
 
         try {
-            //retrieve authentication configuration 
+            // retrieve authentication configuration
             _login_conf = AuthUtils.GetConfiguration(_storm_conf);
         } catch (Exception x) {
             LOG.error(x.getMessage(), x);
@@ -54,27 +54,30 @@ public class ThriftServer {
 
     /**
      * Is ThriftServer listening to requests?
+     * 
      * @return
      */
     public boolean isServing() {
-        if (_server == null) return false;
+        if (_server == null)
+            return false;
         return _server.isServing();
     }
-    
-    public void serve()  {
+
+    public void serve() {
         try {
-            //locate our thrift transport plugin
-            ITransportPlugin  transportPlugin = AuthUtils.GetTransportPlugin(_type, _storm_conf, _login_conf);
+            // locate our thrift transport plugin
+            ITransportPlugin transportPlugin = AuthUtils.GetTransportPlugin(_type, _storm_conf, _login_conf);
 
-            //server
+            // server
             _server = transportPlugin.getServer(_processor);
 
-            //start accepting requests
+            // start accepting requests
             _server.serve();
         } catch (Exception ex) {
             LOG.error("ThriftServer is being stopped due to: " + ex, ex);
-            if (_server != null) _server.stop();
-            Runtime.getRuntime().halt(1); //shutdown server process since we could not handle Thrift requests any more
+            if (_server != null)
+                _server.stop();
+            Runtime.getRuntime().halt(1); // shutdown server process since we could not handle Thrift requests any more
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DRPCAuthorizerBase.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DRPCAuthorizerBase.java b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DRPCAuthorizerBase.java
index 8951edd..11c4a0f 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DRPCAuthorizerBase.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DRPCAuthorizerBase.java
@@ -22,9 +22,10 @@ public abstract class DRPCAuthorizerBase implements IAuthorizer {
     abstract protected boolean permitClientRequest(ReqContext context, String operation, Map params);
 
     abstract protected boolean permitInvocationRequest(ReqContext context, String operation, Map params);
-    
+
     /**
      * Authorizes request from to the DRPC server.
+     * 
      * @param context the client request context
      * @param operation the operation requested by the DRPC server
      * @param params a Map with any key-value entries of use to the authorization implementation
@@ -33,14 +34,11 @@ public abstract class DRPCAuthorizerBase implements IAuthorizer {
     public boolean permit(ReqContext context, String operation, Map params) {
         if ("execute".equals(operation)) {
             return permitClientRequest(context, operation, params);
-        } else if ("failRequest".equals(operation) || 
-                "fetchRequest".equals(operation) || 
-                "result".equals(operation)) {
+        } else if ("failRequest".equals(operation) || "fetchRequest".equals(operation) || "result".equals(operation)) {
             return permitInvocationRequest(context, operation, params);
         }
         // Deny unsupported operations.
-        LOG.warn("Denying unsupported operation \""+operation+"\" from "+
-                context.remoteAddress());
+        LOG.warn("Denying unsupported operation \"" + operation + "\" from " + context.remoteAddress());
         return false;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java
index 45eaea5..8aa7243 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java
@@ -19,8 +19,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class DRPCSimpleACLAuthorizer extends DRPCAuthorizerBase {
-    public static Logger LOG =
-        LoggerFactory.getLogger(DRPCSimpleACLAuthorizer.class);
+    public static Logger LOG = LoggerFactory.getLogger(DRPCSimpleACLAuthorizer.class);
 
     public static final String CLIENT_USERS_KEY = "client.users";
     public static final String INVOCATION_USER_KEY = "invocation.user";
@@ -33,44 +32,35 @@ public class DRPCSimpleACLAuthorizer extends DRPCAuthorizerBase {
     protected class AclFunctionEntry {
         final public Set<String> clientUsers;
         final public String invocationUser;
-        public AclFunctionEntry(Collection<String> clientUsers,
-                String invocationUser) {
-            this.clientUsers = (clientUsers != null) ?
-                new HashSet<String>(clientUsers) : new HashSet<String>();
+
+        public AclFunctionEntry(Collection<String> clientUsers, String invocationUser) {
+            this.clientUsers = (clientUsers != null) ? new HashSet<String>(clientUsers) : new HashSet<String>();
             this.invocationUser = invocationUser;
         }
     }
 
-    private volatile Map<String,AclFunctionEntry> _acl = null;
+    private volatile Map<String, AclFunctionEntry> _acl = null;
     private volatile long _lastUpdate = 0;
 
-    protected Map<String,AclFunctionEntry> readAclFromConfig() {
-        //Thread safety is mostly around _acl.  If _acl needs to be updated it is changed atomically
-        //More then one thread may be trying to update it at a time, but that is OK, because the
-        //change is atomic
+    protected Map<String, AclFunctionEntry> readAclFromConfig() {
+        // Thread safety is mostly around _acl. If _acl needs to be updated it is changed atomically
+        // More then one thread may be trying to update it at a time, but that is OK, because the
+        // change is atomic
         long now = System.currentTimeMillis();
         if ((now - 5000) > _lastUpdate || _acl == null) {
-            Map<String,AclFunctionEntry> acl = new HashMap<String,AclFunctionEntry>();
+            Map<String, AclFunctionEntry> acl = new HashMap<String, AclFunctionEntry>();
             Map conf = Utils.findAndReadConfigFile(_aclFileName);
             if (conf.containsKey(Config.DRPC_AUTHORIZER_ACL)) {
-                Map<String,Map<String,?>> confAcl =
-                    (Map<String,Map<String,?>>)
-                    conf.get(Config.DRPC_AUTHORIZER_ACL);
+                Map<String, Map<String, ?>> confAcl = (Map<String, Map<String, ?>>) conf.get(Config.DRPC_AUTHORIZER_ACL);
 
                 for (String function : confAcl.keySet()) {
-                    Map<String,?> val = confAcl.get(function);
-                    Collection<String> clientUsers =
-                        val.containsKey(CLIENT_USERS_KEY) ?
-                        (Collection<String>) val.get(CLIENT_USERS_KEY) : null;
-                    String invocationUser =
-                        val.containsKey(INVOCATION_USER_KEY) ?
-                        (String) val.get(INVOCATION_USER_KEY) : null;
-                    acl.put(function,
-                            new AclFunctionEntry(clientUsers, invocationUser));
+                    Map<String, ?> val = confAcl.get(function);
+                    Collection<String> clientUsers = val.containsKey(CLIENT_USERS_KEY) ? (Collection<String>) val.get(CLIENT_USERS_KEY) : null;
+                    String invocationUser = val.containsKey(INVOCATION_USER_KEY) ? (String) val.get(INVOCATION_USER_KEY) : null;
+                    acl.put(function, new AclFunctionEntry(clientUsers, invocationUser));
                 }
             } else if (!_permitWhenMissingFunctionEntry) {
-                LOG.warn("Requiring explicit ACL entries, but none given. " +
-                        "Therefore, all operiations will be denied.");
+                LOG.warn("Requiring explicit ACL entries, but none given. " + "Therefore, all operiations will be denied.");
             }
             _acl = acl;
             _lastUpdate = System.currentTimeMillis();
@@ -80,10 +70,8 @@ public class DRPCSimpleACLAuthorizer extends DRPCAuthorizerBase {
 
     @Override
     public void prepare(Map conf) {
-        Boolean isStrict = 
-                (Boolean) conf.get(Config.DRPC_AUTHORIZER_ACL_STRICT);
-        _permitWhenMissingFunctionEntry = 
-                (isStrict != null && !isStrict) ? true : false;
+        Boolean isStrict = (Boolean) conf.get(Config.DRPC_AUTHORIZER_ACL_STRICT);
+        _permitWhenMissingFunctionEntry = (isStrict != null && !isStrict) ? true : false;
         _aclFileName = (String) conf.get(Config.DRPC_AUTHORIZER_ACL_FILENAME);
         _ptol = AuthUtils.GetPrincipalToLocalPlugin(conf);
     }
@@ -105,11 +93,10 @@ public class DRPCSimpleACLAuthorizer extends DRPCAuthorizerBase {
         return null;
     }
 
-    protected boolean permitClientOrInvocationRequest(ReqContext context, Map params,
-            String fieldName) {
-        Map<String,AclFunctionEntry> acl = readAclFromConfig();
+    protected boolean permitClientOrInvocationRequest(ReqContext context, Map params, String fieldName) {
+        Map<String, AclFunctionEntry> acl = readAclFromConfig();
         String function = (String) params.get(FUNCTION_KEY);
-        if (function != null && ! function.isEmpty()) {
+        if (function != null && !function.isEmpty()) {
             AclFunctionEntry entry = acl.get(function);
             if (entry == null && _permitWhenMissingFunctionEntry) {
                 return true;
@@ -126,16 +113,11 @@ public class DRPCSimpleACLAuthorizer extends DRPCAuthorizerBase {
                 String principal = getUserFromContext(context);
                 String user = getLocalUserFromContext(context);
                 if (value == null) {
-                    LOG.warn("Configuration for function '"+function+"' is "+
-                            "invalid: it should have both an invocation user "+
-                            "and a list of client users defined.");
-                } else if (value instanceof Set && 
-                        (((Set<String>)value).contains(principal) ||
-                        ((Set<String>)value).contains(user))) {
+                    LOG.warn("Configuration for function '" + function + "' is " + "invalid: it should have both an invocation user "
+                            + "and a list of client users defined.");
+                } else if (value instanceof Set && (((Set<String>) value).contains(principal) || ((Set<String>) value).contains(user))) {
                     return true;
-                } else if (value instanceof String && 
-                        (value.equals(principal) ||
-                         value.equals(user))) {
+                } else if (value instanceof String && (value.equals(principal) || value.equals(user))) {
                     return true;
                 }
             }
@@ -144,14 +126,12 @@ public class DRPCSimpleACLAuthorizer extends DRPCAuthorizerBase {
     }
 
     @Override
-    protected boolean permitClientRequest(ReqContext context, String operation,
-            Map params) {
+    protected boolean permitClientRequest(ReqContext context, String operation, Map params) {
         return permitClientOrInvocationRequest(context, params, "clientUsers");
     }
 
     @Override
-    protected boolean permitInvocationRequest(ReqContext context, String operation,
-            Map params) {
+    protected boolean permitInvocationRequest(ReqContext context, String operation, Map params) {
         return permitClientOrInvocationRequest(context, params, "invocationUser");
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DenyAuthorizer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DenyAuthorizer.java b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DenyAuthorizer.java
index 5e84b38..32f809a 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DenyAuthorizer.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/DenyAuthorizer.java
@@ -31,27 +31,27 @@ import org.slf4j.LoggerFactory;
  */
 public class DenyAuthorizer implements IAuthorizer {
     private static final Logger LOG = LoggerFactory.getLogger(DenyAuthorizer.class);
-    
+
     /**
      * Invoked once immediately after construction
-     * @param conf Storm configuration 
+     * 
+     * @param conf Storm configuration
      */
-    public void prepare(Map conf) {        
+    public void prepare(Map conf) {
     }
 
     /**
      * permit() method is invoked for each incoming Thrift request
-     * @param contrext request context 
+     * 
+     * @param contrext request context
      * @param operation operation name
-     * @param topology_storm configuration of targeted topology 
+     * @param topology_storm configuration of targeted topology
      * @return true if the request is authorized, false if reject
      */
     public boolean permit(ReqContext context, String operation, Map topology_conf) {
-        LOG.info("[req "+ context.requestID()+ "] Access "
-                + " from: " + (context.remoteAddress() == null? "null" : context.remoteAddress().toString())
-                + (context.principal() == null? "" : (" principal:"+ context.principal()))
-                +" op:"+operation
-                + (topology_conf == null? "" : (" topoology:"+topology_conf.get(Config.TOPOLOGY_NAME))));
+        LOG.info("[req " + context.requestID() + "] Access " + " from: " + (context.remoteAddress() == null ? "null" : context.remoteAddress().toString())
+                + (context.principal() == null ? "" : (" principal:" + context.principal())) + " op:" + operation
+                + (topology_conf == null ? "" : (" topoology:" + topology_conf.get(Config.TOPOLOGY_NAME))));
         return false;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/ImpersonationAuthorizer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/ImpersonationAuthorizer.java b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/ImpersonationAuthorizer.java
index d6431be..e1a037f 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/ImpersonationAuthorizer.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/ImpersonationAuthorizer.java
@@ -10,7 +10,6 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.util.*;
 
-
 public class ImpersonationAuthorizer implements IAuthorizer {
     private static final Logger LOG = LoggerFactory.getLogger(ImpersonationAuthorizer.class);
     protected static final String WILD_CARD = "*";
@@ -49,16 +48,16 @@ public class ImpersonationAuthorizer implements IAuthorizer {
         String userBeingImpersonated = _ptol.toLocal(context.principal());
         InetAddress remoteAddress = context.remoteAddress();
 
-        LOG.info("user = {}, principal = {} is attmepting to impersonate user = {} for operation = {} from host = {}",
-                impersonatingUser, impersonatingPrincipal, userBeingImpersonated, operation, remoteAddress);
+        LOG.info("user = {}, principal = {} is attmepting to impersonate user = {} for operation = {} from host = {}", impersonatingUser,
+                impersonatingPrincipal, userBeingImpersonated, operation, remoteAddress);
 
         /**
          * no config is present for impersonating principal or user, do not permit impersonation.
          */
         if (!userImpersonationACL.containsKey(impersonatingPrincipal) && !userImpersonationACL.containsKey(impersonatingUser)) {
-            LOG.info("user = {}, principal = {} is trying to impersonate user {}, but config {} does not have entry for impersonating user or principal." +
-                    "Please see SECURITY.MD to learn how to configure users for impersonation."
-                    , impersonatingUser, impersonatingPrincipal, userBeingImpersonated, Config.NIMBUS_IMPERSONATION_ACL);
+            LOG.info("user = {}, principal = {} is trying to impersonate user {}, but config {} does not have entry for impersonating user or principal."
+                    + "Please see SECURITY.MD to learn how to configure users for impersonation.", impersonatingUser, impersonatingPrincipal,
+                    userBeingImpersonated, Config.NIMBUS_IMPERSONATION_ACL);
             return false;
         }
 
@@ -78,18 +77,17 @@ public class ImpersonationAuthorizer implements IAuthorizer {
             authorizedGroups.addAll(userACL.authorizedGroups);
         }
 
-        LOG.debug("user = {}, principal = {} is allowed to impersonate groups = {} from hosts = {} ",
-                impersonatingUser, impersonatingPrincipal, authorizedGroups, authorizedHosts);
+        LOG.debug("user = {}, principal = {} is allowed to impersonate groups = {} from hosts = {} ", impersonatingUser, impersonatingPrincipal,
+                authorizedGroups, authorizedHosts);
 
         if (!isAllowedToImpersonateFromHost(authorizedHosts, remoteAddress)) {
-            LOG.info("user = {}, principal = {} is not allowed to impersonate from host {} ",
-                    impersonatingUser, impersonatingPrincipal, remoteAddress);
+            LOG.info("user = {}, principal = {} is not allowed to impersonate from host {} ", impersonatingUser, impersonatingPrincipal, remoteAddress);
             return false;
         }
 
         if (!isAllowedToImpersonateUser(authorizedGroups, userBeingImpersonated)) {
-            LOG.info("user = {}, principal = {} is not allowed to impersonate any group that user {} is part of.",
-                    impersonatingUser, impersonatingPrincipal, userBeingImpersonated);
+            LOG.info("user = {}, principal = {} is not allowed to impersonate any group that user {} is part of.", impersonatingUser, impersonatingPrincipal,
+                    userBeingImpersonated);
             return false;
         }
 
@@ -98,14 +96,12 @@ public class ImpersonationAuthorizer implements IAuthorizer {
     }
 
     private boolean isAllowedToImpersonateFromHost(Set<String> authorizedHosts, InetAddress remoteAddress) {
-        return authorizedHosts.contains(WILD_CARD) ||
-                authorizedHosts.contains(remoteAddress.getCanonicalHostName()) ||
-                authorizedHosts.contains(remoteAddress.getHostName()) ||
-                authorizedHosts.contains(remoteAddress.getHostAddress());
+        return authorizedHosts.contains(WILD_CARD) || authorizedHosts.contains(remoteAddress.getCanonicalHostName())
+                || authorizedHosts.contains(remoteAddress.getHostName()) || authorizedHosts.contains(remoteAddress.getHostAddress());
     }
 
     private boolean isAllowedToImpersonateUser(Set<String> authorizedGroups, String userBeingImpersonated) {
-        if(authorizedGroups.contains(WILD_CARD)) {
+        if (authorizedGroups.contains(WILD_CARD)) {
             return true;
         }
 
@@ -131,9 +127,9 @@ public class ImpersonationAuthorizer implements IAuthorizer {
 
     protected class ImpersonationACL {
         public String impersonatingUser;
-        //Groups this user is authorized to impersonate.
+        // Groups this user is authorized to impersonate.
         public Set<String> authorizedGroups;
-        //Hosts this user is authorized to impersonate from.
+        // Hosts this user is authorized to impersonate from.
         public Set<String> authorizedHosts;
 
         private ImpersonationACL(String impersonatingUser, Set<String> authorizedGroups, Set<String> authorizedHosts) {
@@ -144,11 +140,8 @@ public class ImpersonationAuthorizer implements IAuthorizer {
 
         @Override
         public String toString() {
-            return "ImpersonationACL{" +
-                    "impersonatingUser='" + impersonatingUser + '\'' +
-                    ", authorizedGroups=" + authorizedGroups +
-                    ", authorizedHosts=" + authorizedHosts +
-                    '}';
+            return "ImpersonationACL{" + "impersonatingUser='" + impersonatingUser + '\'' + ", authorizedGroups=" + authorizedGroups + ", authorizedHosts="
+                    + authorizedHosts + '}';
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/NoopAuthorizer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/NoopAuthorizer.java b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/NoopAuthorizer.java
index 9af44d3..1d88202 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/NoopAuthorizer.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/NoopAuthorizer.java
@@ -34,24 +34,24 @@ public class NoopAuthorizer implements IAuthorizer {
 
     /**
      * Invoked once immediately after construction
-     * @param conf Storm configuration 
+     * 
+     * @param conf Storm configuration
      */
-    public void prepare(Map conf) {        
+    public void prepare(Map conf) {
     }
 
     /**
      * permit() method is invoked for each incoming Thrift request
-     * @param context request context includes info about 
+     * 
+     * @param context request context includes info about
      * @param operation operation name
-     * @param topology_storm configuration of targeted topology 
+     * @param topology_storm configuration of targeted topology
      * @return true if the request is authorized, false if reject
      */
     public boolean permit(ReqContext context, String operation, Map topology_conf) {
-        LOG.info("[req "+ context.requestID()+ "] Access "
-                + " from: " + (context.remoteAddress() == null? "null" : context.remoteAddress().toString())
-                + (context.principal() == null? "" : (" principal:"+ context.principal()))
-                +" op:"+operation
-                + (topology_conf == null? "" : (" topoology:"+topology_conf.get(Config.TOPOLOGY_NAME))));
+        LOG.info("[req " + context.requestID() + "] Access " + " from: " + (context.remoteAddress() == null ? "null" : context.remoteAddress().toString())
+                + (context.principal() == null ? "" : (" principal:" + context.principal())) + " op:" + operation
+                + (topology_conf == null ? "" : (" topoology:" + topology_conf.get(Config.TOPOLOGY_NAME))));
         return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java
index e50a587..40d7a5d 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java
@@ -36,15 +36,15 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * An authorization implementation that simply checks if a user is allowed to perform specific
- * operations.
+ * An authorization implementation that simply checks if a user is allowed to perform specific operations.
  */
 public class SimpleACLAuthorizer implements IAuthorizer {
     private static final Logger LOG = LoggerFactory.getLogger(SimpleACLAuthorizer.class);
 
     protected Set<String> _userCommands = new HashSet<String>(Arrays.asList("submitTopology", "fileUpload", "getNimbusConf", "getClusterInfo"));
     protected Set<String> _supervisorCommands = new HashSet<String>(Arrays.asList("fileDownload"));
-    protected Set<String> _topoCommands = new HashSet<String>(Arrays.asList("killTopology","rebalance","activate","deactivate","getTopologyConf","getTopology","getUserTopology","getTopologyInfo","uploadNewCredentials"));
+    protected Set<String> _topoCommands = new HashSet<String>(Arrays.asList("killTopology", "rebalance", "activate", "deactivate", "getTopologyConf",
+            "getTopology", "getUserTopology", "getTopologyInfo", "uploadNewCredentials"));
 
     protected Set<String> _admins;
     protected Set<String> _supervisors;
@@ -52,8 +52,10 @@ public class SimpleACLAuthorizer implements IAuthorizer {
     protected Set<String> _nimbusGroups;
     protected IPrincipalToLocal _ptol;
     protected IGroupMappingServiceProvider _groupMappingProvider;
+
     /**
      * Invoked once immediately after construction
+     * 
      * @param conf Storm configuration
      */
     @Override
@@ -64,17 +66,17 @@ public class SimpleACLAuthorizer implements IAuthorizer {
         _nimbusGroups = new HashSet<String>();
 
         if (conf.containsKey(Config.NIMBUS_ADMINS)) {
-            _admins.addAll((Collection<String>)conf.get(Config.NIMBUS_ADMINS));
+            _admins.addAll((Collection<String>) conf.get(Config.NIMBUS_ADMINS));
         }
         if (conf.containsKey(Config.NIMBUS_SUPERVISOR_USERS)) {
-            _supervisors.addAll((Collection<String>)conf.get(Config.NIMBUS_SUPERVISOR_USERS));
+            _supervisors.addAll((Collection<String>) conf.get(Config.NIMBUS_SUPERVISOR_USERS));
         }
         if (conf.containsKey(Config.NIMBUS_USERS)) {
-            _nimbusUsers.addAll((Collection<String>)conf.get(Config.NIMBUS_USERS));
+            _nimbusUsers.addAll((Collection<String>) conf.get(Config.NIMBUS_USERS));
         }
 
         if (conf.containsKey(Config.NIMBUS_GROUPS)) {
-            _nimbusGroups.addAll((Collection<String>)conf.get(Config.NIMBUS_GROUPS));
+            _nimbusGroups.addAll((Collection<String>) conf.get(Config.NIMBUS_GROUPS));
         }
 
         _ptol = AuthUtils.GetPrincipalToLocalPlugin(conf);
@@ -83,6 +85,7 @@ public class SimpleACLAuthorizer implements IAuthorizer {
 
     /**
      * permit() method is invoked for each incoming Thrift request
+     * 
      * @param context request context includes info about
      * @param operation operation name
      * @param topology_conf configuration of targeted topology
@@ -90,10 +93,8 @@ public class SimpleACLAuthorizer implements IAuthorizer {
      */
     @Override
     public boolean permit(ReqContext context, String operation, Map topology_conf) {
-        LOG.info("[req " + context.requestID() + "] Access "
-                + " from: " + (context.remoteAddress() == null ? "null" : context.remoteAddress().toString())
-                + (context.principal() == null ? "" : (" principal:" + context.principal()))
-                + " op:" + operation
+        LOG.info("[req " + context.requestID() + "] Access " + " from: " + (context.remoteAddress() == null ? "null" : context.remoteAddress().toString())
+                + (context.principal() == null ? "" : (" principal:" + context.principal())) + " op:" + operation
                 + (topology_conf == null ? "" : (" topoology:" + topology_conf.get(Config.TOPOLOGY_NAME))));
 
         String principal = context.principal().getName();
@@ -103,8 +104,8 @@ public class SimpleACLAuthorizer implements IAuthorizer {
         if (_groupMappingProvider != null) {
             try {
                 userGroups = _groupMappingProvider.getGroups(user);
-            } catch(IOException e) {
-                LOG.warn("Error while trying to fetch user groups",e);
+            } catch (IOException e) {
+                LOG.warn("Error while trying to fetch user groups", e);
             }
         }
 
@@ -123,7 +124,7 @@ public class SimpleACLAuthorizer implements IAuthorizer {
         if (_topoCommands.contains(operation)) {
             Set topoUsers = new HashSet<String>();
             if (topology_conf.containsKey(Config.TOPOLOGY_USERS)) {
-                topoUsers.addAll((Collection<String>)topology_conf.get(Config.TOPOLOGY_USERS));
+                topoUsers.addAll((Collection<String>) topology_conf.get(Config.TOPOLOGY_USERS));
             }
 
             if (topoUsers.contains(principal) || topoUsers.contains(user)) {
@@ -132,18 +133,19 @@ public class SimpleACLAuthorizer implements IAuthorizer {
 
             Set<String> topoGroups = new HashSet<String>();
             if (topology_conf.containsKey(Config.TOPOLOGY_GROUPS) && topology_conf.get(Config.TOPOLOGY_GROUPS) != null) {
-                topoGroups.addAll((Collection<String>)topology_conf.get(Config.TOPOLOGY_GROUPS));
+                topoGroups.addAll((Collection<String>) topology_conf.get(Config.TOPOLOGY_GROUPS));
             }
 
-            if (checkUserGroupAllowed(userGroups, topoGroups)) return true;
+            if (checkUserGroupAllowed(userGroups, topoGroups))
+                return true;
         }
         return false;
     }
 
     private Boolean checkUserGroupAllowed(Set<String> userGroups, Set<String> configuredGroups) {
-        if(userGroups.size() > 0 && configuredGroups.size() > 0) {
+        if (userGroups.size() > 0 && configuredGroups.size() > 0) {
             for (String tgroup : configuredGroups) {
-                if(userGroups.contains(tgroup))
+                if (userGroups.contains(tgroup))
                     return true;
             }
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/SimpleWhitelistAuthorizer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/SimpleWhitelistAuthorizer.java b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/SimpleWhitelistAuthorizer.java
index 55109f9..dbbc945 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/SimpleWhitelistAuthorizer.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/authorizer/SimpleWhitelistAuthorizer.java
@@ -31,8 +31,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * An authorization implementation that simply checks a whitelist of users that
- * are allowed to use the cluster.
+ * An authorization implementation that simply checks a whitelist of users that are allowed to use the cluster.
  */
 public class SimpleWhitelistAuthorizer implements IAuthorizer {
     private static final Logger LOG = LoggerFactory.getLogger(SimpleWhitelistAuthorizer.class);
@@ -41,30 +40,30 @@ public class SimpleWhitelistAuthorizer implements IAuthorizer {
 
     /**
      * Invoked once immediately after construction
-     * @param conf Storm configuration 
+     * 
+     * @param conf Storm configuration
      */
     @Override
     public void prepare(Map conf) {
         users = new HashSet<String>();
         if (conf.containsKey(WHITELIST_USERS_CONF)) {
-            users.addAll((Collection<String>)conf.get(WHITELIST_USERS_CONF));
+            users.addAll((Collection<String>) conf.get(WHITELIST_USERS_CONF));
         }
     }
 
     /**
      * permit() method is invoked for each incoming Thrift request
-     * @param context request context includes info about 
+     * 
+     * @param context request context includes info about
      * @param operation operation name
-     * @param topology_storm configuration of targeted topology 
+     * @param topology_storm configuration of targeted topology
      * @return true if the request is authorized, false if reject
      */
     @Override
     public boolean permit(ReqContext context, String operation, Map topology_conf) {
-        LOG.info("[req "+ context.requestID()+ "] Access "
-                 + " from: " + (context.remoteAddress() == null? "null" : context.remoteAddress().toString())
-                 + (context.principal() == null? "" : (" principal:"+ context.principal()))
-                 +" op:"+operation
-                 + (topology_conf == null? "" : (" topoology:"+topology_conf.get(Config.TOPOLOGY_NAME))));
+        LOG.info("[req " + context.requestID() + "] Access " + " from: " + (context.remoteAddress() == null ? "null" : context.remoteAddress().toString())
+                + (context.principal() == null ? "" : (" principal:" + context.principal())) + " op:" + operation
+                + (topology_conf == null ? "" : (" topoology:" + topology_conf.get(Config.TOPOLOGY_NAME))));
         return context.principal() != null ? users.contains(context.principal().getName()) : false;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/digest/ClientCallbackHandler.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/digest/ClientCallbackHandler.java b/jstorm-core/src/main/java/backtype/storm/security/auth/digest/ClientCallbackHandler.java
index 3caacaa..0e3f626 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/digest/ClientCallbackHandler.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/digest/ClientCallbackHandler.java
@@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory;
 import backtype.storm.security.auth.AuthUtils;
 
 /**
- *  client side callback handler.
+ * client side callback handler.
  */
 public class ClientCallbackHandler implements CallbackHandler {
     private static final String USERNAME = "username";
@@ -51,28 +51,29 @@ public class ClientCallbackHandler implements CallbackHandler {
      * @throws IOException
      */
     public ClientCallbackHandler(Configuration configuration) throws IOException {
-        if (configuration == null) return;
+        if (configuration == null)
+            return;
         AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(AuthUtils.LOGIN_CONTEXT_CLIENT);
         if (configurationEntries == null) {
-            String errorMessage = "Could not find a '"+AuthUtils.LOGIN_CONTEXT_CLIENT
-                    + "' entry in this configuration: Client cannot start.";
+            String errorMessage = "Could not find a '" + AuthUtils.LOGIN_CONTEXT_CLIENT + "' entry in this configuration: Client cannot start.";
             throw new IOException(errorMessage);
         }
 
         _password = "";
-        for(AppConfigurationEntry entry: configurationEntries) {
+        for (AppConfigurationEntry entry : configurationEntries) {
             if (entry.getOptions().get(USERNAME) != null) {
-                _username = (String)entry.getOptions().get(USERNAME);
+                _username = (String) entry.getOptions().get(USERNAME);
             }
             if (entry.getOptions().get(PASSWORD) != null) {
-                _password = (String)entry.getOptions().get(PASSWORD);
+                _password = (String) entry.getOptions().get(PASSWORD);
             }
         }
     }
 
     /**
      * This method is invoked by SASL for authentication challenges
-     * @param callbacks a collection of challenge callbacks 
+     * 
+     * @param callbacks a collection of challenge callbacks
      */
     public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
         for (Callback c : callbacks) {
@@ -82,10 +83,10 @@ public class ClientCallbackHandler implements CallbackHandler {
                 nc.setName(_username);
             } else if (c instanceof PasswordCallback) {
                 LOG.debug("password callback");
-                PasswordCallback pc = (PasswordCallback)c;
+                PasswordCallback pc = (PasswordCallback) c;
                 if (_password != null) {
                     pc.setPassword(_password.toCharArray());
-                } 
+                }
             } else if (c instanceof AuthorizeCallback) {
                 LOG.debug("authorization callback");
                 AuthorizeCallback ac = (AuthorizeCallback) c;

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/digest/DigestSaslTransportPlugin.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/digest/DigestSaslTransportPlugin.java b/jstorm-core/src/main/java/backtype/storm/security/auth/digest/DigestSaslTransportPlugin.java
index ad642d8..7b497c6 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/digest/DigestSaslTransportPlugin.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/digest/DigestSaslTransportPlugin.java
@@ -38,11 +38,11 @@ public class DigestSaslTransportPlugin extends SaslTransportPlugin {
     public static final String DIGEST = "DIGEST-MD5";
     private static final Logger LOG = LoggerFactory.getLogger(DigestSaslTransportPlugin.class);
 
-    protected TTransportFactory getServerTransportFactory() throws IOException {        
-        //create an authentication callback handler
+    protected TTransportFactory getServerTransportFactory() throws IOException {
+        // create an authentication callback handler
         CallbackHandler serer_callback_handler = new ServerCallbackHandler(login_conf);
 
-        //create a transport factory that will invoke our auth callback for digest
+        // create a transport factory that will invoke our auth callback for digest
         TSaslServerTransport.Factory factory = new TSaslServerTransport.Factory();
         factory.addServerDefinition(DIGEST, AuthUtils.SERVICE, "localhost", null, serer_callback_handler);
 
@@ -53,13 +53,8 @@ public class DigestSaslTransportPlugin extends SaslTransportPlugin {
     @Override
     public TTransport connect(TTransport transport, String serverHost, String asUser) throws TTransportException, IOException {
         ClientCallbackHandler client_callback_handler = new ClientCallbackHandler(login_conf);
-        TSaslClientTransport wrapper_transport = new TSaslClientTransport(DIGEST,
-                null,
-                AuthUtils.SERVICE, 
-                serverHost,
-                null,
-                client_callback_handler, 
-                transport);
+        TSaslClientTransport wrapper_transport =
+                new TSaslClientTransport(DIGEST, null, AuthUtils.SERVICE, serverHost, null, client_callback_handler, transport);
 
         wrapper_transport.open();
         LOG.debug("SASL DIGEST-MD5 client transport has been established");