You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/29 00:19:42 UTC

[1/4] git commit: STORM-347. (Security) authentication should allow for groups not just users.

Repository: incubator-storm
Updated Branches:
  refs/heads/security 642ed7431 -> ff8336b70


STORM-347. (Security) authentication should allow for groups not just users.


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/f3112fa7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/f3112fa7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/f3112fa7

Branch: refs/heads/security
Commit: f3112fa76e98038da83170513a476499dde0bb41
Parents: 65aee65
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Thu Jun 26 15:05:21 2014 -0700
Committer: Sriharsha Chintalapani <ma...@harsha.io>
Committed: Thu Jun 26 15:05:21 2014 -0700

----------------------------------------------------------------------
 conf/defaults.yaml                              |   3 +-
 storm-core/src/jvm/backtype/storm/Config.java   |   6 +
 .../backtype/storm/security/auth/AuthUtils.java |  49 +-
 .../auth/IGroupMappingServiceProvider.java      |  54 ++
 .../auth/ShellBasedUnixGroupsMapping.java       |  88 ++++
 .../auth/authorizer/SimpleACLAuthorizer.java    |  29 +-
 .../jvm/backtype/storm/utils/ShellUtils.java    | 498 +++++++++++++++++++
 7 files changed, 702 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f3112fa7/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 7f17054..83b7b4d 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -38,6 +38,7 @@ storm.cluster.mode: "distributed" # can be distributed or local
 storm.local.mode.zmq: false
 storm.thrift.transport: "backtype.storm.security.auth.SimpleTransportPlugin"
 storm.principal.tolocal: "backtype.storm.security.auth.DefaultPrincipalToLocal"
+storm.group.mapping.service: "backtype.storm.security.auth.ShellBasedUnixGroupsMapping"
 storm.messaging.transport: "backtype.storm.messaging.netty.Context"
 storm.nimbus.retry.times: 5
 storm.nimbus.retry.interval.millis: 2000
@@ -128,7 +129,7 @@ worker.childopts: "-Xmx768m"
 worker.gc.childopts: ""
 worker.heartbeat.frequency.secs: 1
 
-# control how many worker receiver threads we need per worker 
+# control how many worker receiver threads we need per worker
 topology.worker.receiver.thread.count: 1
 
 task.heartbeat.frequency.secs: 3

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f3112fa7/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 3b3f7e5..fee5f6e 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -158,6 +158,12 @@ public class Config extends HashMap<String, Object> {
     public static final Object STORM_PRINCIPAL_TO_LOCAL_PLUGIN_SCHEMA = String.class;
 
     /**
+     * The plugin that will provide user groups service
+     */
+    public static final String STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN = "storm.group.mapping.service";
+    public static final Object STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN_SCHEMA = String.class;
+
+    /**
      * The default transport plug-in for Thrift client/server communication
      */
     public static final String STORM_THRIFT_TRANSPORT_PLUGIN = "storm.thrift.transport";

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f3112fa7/storm-core/src/jvm/backtype/storm/security/auth/AuthUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/AuthUtils.java b/storm-core/src/jvm/backtype/storm/security/auth/AuthUtils.java
index b5118c7..7cfc927 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/AuthUtils.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/AuthUtils.java
@@ -37,21 +37,21 @@ import java.util.concurrent.ExecutorService;
 
 public class AuthUtils {
     private static final Logger LOG = LoggerFactory.getLogger(AuthUtils.class);
-    public static final String LOGIN_CONTEXT_SERVER = "StormServer"; 
-    public static final String LOGIN_CONTEXT_CLIENT = "StormClient"; 
+    public static final String LOGIN_CONTEXT_SERVER = "StormServer";
+    public static final String LOGIN_CONTEXT_CLIENT = "StormClient";
     public static final String SERVICE = "storm_thrift_server";
 
     /**
-     * Construct a JAAS configuration object per storm configuration file 
-     * @param storm_conf Storm configuration 
+     * Construct a JAAS configuration object per storm configuration file
+     * @param storm_conf Storm configuration
      * @return JAAS configuration object
      */
     public static Configuration GetConfiguration(Map storm_conf) {
         Configuration login_conf = null;
 
-        //find login file configuration from Storm configuration  
+        //find login file configuration from Storm configuration
         String loginConfigurationFile = (String)storm_conf.get("java.security.auth.login.config");
-        if ((loginConfigurationFile != null) && (loginConfigurationFile.length()>0)) { 
+        if ((loginConfigurationFile != null) && (loginConfigurationFile.length()>0)) {
             File config_file = new File(loginConfigurationFile);
             if (! config_file.canRead()) {
                 throw new RuntimeException("File " + loginConfigurationFile +
@@ -64,7 +64,7 @@ public class AuthUtils {
                 throw new RuntimeException(ex);
             }
         }
-        
+
         return login_conf;
     }
 
@@ -87,10 +87,28 @@ public class AuthUtils {
     }
 
     /**
+     * Construct a group mapping service provider plugin
+     * @param conf storm configuration
+     * @return the plugin
+     */
+    public static IGroupMappingServiceProvider GetGroupMappingServiceProviderPlugin(Map storm_conf) {
+        IGroupMappingServiceProvider gmsp = null;
+        try {
+            String gmsp_klassName = (String) storm_conf.get(Config.STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN);
+            Class klass = Class.forName(gmsp_klassName);
+            gmsp = (IGroupMappingServiceProvider)klass.newInstance();
+            gmsp.prepare(storm_conf);
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+        return gmsp;
+    }
+
+    /**
      * Get all of the configured Credential Renwer Plugins.
      * @param storm_conf the storm configuration to use.
      * @return the configured credential renewers.
-     */ 
+     */
     public static Collection<ICredentialsRenewer> GetCredentialRenewers(Map conf) {
         try {
             Set<ICredentialsRenewer> ret = new HashSet<ICredentialsRenewer>();
@@ -112,7 +130,7 @@ public class AuthUtils {
      * Get all of the configured AutoCredential Plugins.
      * @param storm_conf the storm configuration to use.
      * @return the configured auto credentials.
-     */ 
+     */
     public static Collection<IAutoCredentials> GetAutoCredentials(Map storm_conf) {
         try {
             Set<IAutoCredentials> autos = new HashSet<IAutoCredentials>();
@@ -137,7 +155,7 @@ public class AuthUtils {
      * @param autos the IAutoCredentials to call to populate the subject.
      * @param credentials the credentials to pull from
      * @return the populated subject.
-     */ 
+     */
     public static Subject populateSubject(Subject subject, Collection<IAutoCredentials> autos, Map<String,String> credentials) {
         try {
             if (subject == null) {
@@ -154,10 +172,10 @@ public class AuthUtils {
 
     /**
      * Update a subject from credentials using the IAutoCredentials.
-     * @param subject the subject to update 
+     * @param subject the subject to update
      * @param autos the IAutoCredentials to call to update the subject.
      * @param credentials the credentials to pull from
-     */ 
+     */
     public static void updateSubject(Subject subject, Collection<IAutoCredentials> autos, Map<String,String> credentials) {
         if (subject == null) {
             throw new RuntimeException("The subject cannot be null when updating a subject with credentials");
@@ -186,7 +204,7 @@ public class AuthUtils {
             transportPlugin.prepare(type, storm_conf, login_conf);
         } catch(Exception e) {
             throw new RuntimeException(e);
-        } 
+        }
         return transportPlugin;
     }
 
@@ -233,11 +251,10 @@ public class AuthUtils {
         }
 
         for(AppConfigurationEntry entry: configurationEntries) {
-            Object val = entry.getOptions().get(key); 
-            if (val != null) 
+            Object val = entry.getOptions().get(key);
+            if (val != null)
                 return (String)val;
         }
         return null;
     }
 }
-

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f3112fa7/storm-core/src/jvm/backtype/storm/security/auth/IGroupMappingServiceProvider.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/IGroupMappingServiceProvider.java b/storm-core/src/jvm/backtype/storm/security/auth/IGroupMappingServiceProvider.java
new file mode 100644
index 0000000..0b49dec
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/security/auth/IGroupMappingServiceProvider.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.security.auth;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.Map;
+
+public interface IGroupMappingServiceProvider {
+
+    /**
+     * Invoked once immediately after construction
+     * @param storm_conf Storm configuration
+     */
+    void prepare(Map storm_conf);
+
+    /**
+     * Get all various group memberships of a given user.
+     * Returns EMPTY list in case of non-existing user
+     * @param user User's name
+     * @return group memberships of user
+     * @throws IOException
+     */
+    public Set<String> getGroups(String user) throws IOException;
+
+    /**
+     * Refresh the cache of groups and user mapping
+     * @throws IOException
+     */
+    public void cacheGroupsRefresh() throws IOException;
+    /**
+     * Caches the group user information
+     * @param groups list of groups to add to cache
+     * @throws IOException
+     */
+    public void cacheGroupsAdd(Set<String> groups) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f3112fa7/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedUnixGroupsMapping.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedUnixGroupsMapping.java b/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedUnixGroupsMapping.java
new file mode 100644
index 0000000..b8c8323
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedUnixGroupsMapping.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.security.auth;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.StringTokenizer;
+import backtype.storm.utils.ShellUtils;
+import backtype.storm.utils.ShellUtils.ExitCodeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ShellBasedUnixGroupsMapping implements
+                                             IGroupMappingServiceProvider {
+
+    public static Logger LOG = LoggerFactory.getLogger(ShellBasedUnixGroupsMapping.class);
+
+    /**
+     * Invoked once immediately after construction
+     * @param storm_conf Storm configuration
+     */
+    public void prepare(Map storm_conf) {}
+
+    /**
+     * Returns list of groups for a user
+     *
+     * @param user get groups for this user
+     * @return list of groups for a given user
+     */
+    @Override
+    public Set<String> getGroups(String user) throws IOException {
+        return getUnixGroups(user);
+    }
+
+    @Override
+    public void cacheGroupsRefresh() throws IOException {
+    }
+
+    @Override
+    public void cacheGroupsAdd(Set<String> groups) throws IOException {
+    }
+
+    /**
+     * Get the current user's group list from Unix by running the command 'groups'
+     * NOTE. For non-existing user it will return EMPTY list
+     * @param user user name
+     * @return the groups set that the <code>user</code> belongs to
+     * @throws IOException if encounter any error when running the command
+     */
+    private static Set<String> getUnixGroups(final String user) throws IOException {
+        String result = "";
+        try {
+            result = ShellUtils.execCommand(ShellUtils.getGroupsForUserCommand(user));
+        } catch (ExitCodeException e) {
+            // if we didn't get the group - just return empty list;
+            LOG.warn("got exception trying to get groups for user " + user, e);
+            return new HashSet<String>();
+        }
+
+        StringTokenizer tokenizer =
+            new StringTokenizer(result, ShellUtils.TOKEN_SEPARATOR_REGEX);
+        Set<String> groups = new HashSet<String>();
+        while (tokenizer.hasMoreTokens()) {
+            groups.add(tokenizer.nextToken());
+        }
+        return groups;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f3112fa7/storm-core/src/jvm/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java b/storm-core/src/jvm/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java
index ef13750..0b867e8 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java
@@ -23,12 +23,14 @@ import java.util.Map;
 import java.util.Set;
 import java.util.HashSet;
 import java.util.Collection;
+import java.io.IOException;
 
 import backtype.storm.Config;
 import backtype.storm.security.auth.IAuthorizer;
 import backtype.storm.security.auth.ReqContext;
 import backtype.storm.security.auth.AuthUtils;
 import backtype.storm.security.auth.IPrincipalToLocal;
+import backtype.storm.security.auth.IGroupMappingServiceProvider;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,10 +49,10 @@ public class SimpleACLAuthorizer implements IAuthorizer {
     protected Set<String> _admins;
     protected Set<String> _supervisors;
     protected IPrincipalToLocal _ptol;
-
+    protected IGroupMappingServiceProvider _groups;
     /**
      * Invoked once immediately after construction
-     * @param conf Storm configuration 
+     * @param conf Storm configuration
      */
     @Override
     public void prepare(Map conf) {
@@ -64,27 +66,26 @@ public class SimpleACLAuthorizer implements IAuthorizer {
             _supervisors.addAll((Collection<String>)conf.get(Config.NIMBUS_SUPERVISOR_USERS));
         }
         _ptol = AuthUtils.GetPrincipalToLocalPlugin(conf);
+        _groups = AuthUtils.GetGroupMappingServiceProviderPlugin(conf);
     }
 
     /**
      * permit() method is invoked for each incoming Thrift request
-     * @param context request context includes info about 
+     * @param context request context includes info about
      * @param operation operation name
-     * @param topology_storm configuration of targeted topology 
+     * @param topology_storm configuration of targeted topology
      * @return true if the request is authorized, false if reject
      */
     @Override
     public boolean permit(ReqContext context, String operation, Map topology_conf) {
-
         LOG.info("[req "+ context.requestID()+ "] Access "
                  + " from: " + (context.remoteAddress() == null? "null" : context.remoteAddress().toString())
                  + (context.principal() == null? "" : (" principal:"+ context.principal()))
                  +" op:"+operation
                  + (topology_conf == null? "" : (" topoology:"+topology_conf.get(Config.TOPOLOGY_NAME))));
-       
+
         String principal = context.principal().getName();
         String user = _ptol.toLocal(context.principal());
-
         if (_admins.contains(principal) || _admins.contains(user)) {
             return true;
         }
@@ -106,8 +107,20 @@ public class SimpleACLAuthorizer implements IAuthorizer {
             if (topoUsers.contains(principal) || topoUsers.contains(user)) {
                 return true;
             }
+            if(_groups != null) {
+                try {
+                    String topologySubmitterUser = (String) topology_conf.get(Config.TOPOLOGY_SUBMITTER_USER);
+                    Set<String> userGroups = _groups.getGroups(user);
+                    Set<String> topoUserGroups = _groups.getGroups(topologySubmitterUser);
+                    for (String tgroup : topoUserGroups) {
+                        if(userGroups.contains(tgroup))
+                            return true;
+                    }
+                } catch(IOException e) {
+                    LOG.warn("Error while trying to fetch user groups",e);
+                }
+            }
         }
-         
         return false;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f3112fa7/storm-core/src/jvm/backtype/storm/utils/ShellUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/ShellUtils.java b/storm-core/src/jvm/backtype/storm/utils/ShellUtils.java
new file mode 100644
index 0000000..1065ff9
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/utils/ShellUtils.java
@@ -0,0 +1,498 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.utils;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+abstract public class ShellUtils {
+    public static Logger LOG = LoggerFactory.getLogger(ShellUtils.class);
+
+    // OSType detection
+    public enum OSType {
+        OS_TYPE_LINUX,
+        OS_TYPE_WIN,
+        OS_TYPE_SOLARIS,
+        OS_TYPE_MAC,
+        OS_TYPE_FREEBSD,
+        OS_TYPE_OTHER
+    }
+
+    public static final OSType osType = getOSType();
+
+    static private OSType getOSType() {
+        String osName = System.getProperty("os.name");
+        if (osName.startsWith("Windows")) {
+            return OSType.OS_TYPE_WIN;
+        } else if (osName.contains("SunOS") || osName.contains("Solaris")) {
+            return OSType.OS_TYPE_SOLARIS;
+        } else if (osName.contains("Mac")) {
+            return OSType.OS_TYPE_MAC;
+        } else if (osName.contains("FreeBSD")) {
+            return OSType.OS_TYPE_FREEBSD;
+        } else if (osName.startsWith("Linux")) {
+            return OSType.OS_TYPE_LINUX;
+        } else {
+            // Some other form of Unix
+            return OSType.OS_TYPE_OTHER;
+        }
+    }
+
+    // Helper static vars for each platform
+    public static final boolean WINDOWS = (osType == OSType.OS_TYPE_WIN);
+    public static final boolean SOLARIS = (osType == OSType.OS_TYPE_SOLARIS);
+    public static final boolean MAC     = (osType == OSType.OS_TYPE_MAC);
+    public static final boolean FREEBSD = (osType == OSType.OS_TYPE_FREEBSD);
+    public static final boolean LINUX   = (osType == OSType.OS_TYPE_LINUX);
+    public static final boolean OTHER   = (osType == OSType.OS_TYPE_OTHER);
+
+
+    /** Token separator regex used to parse Shell tool outputs */
+    public static final String TOKEN_SEPARATOR_REGEX
+        = WINDOWS ? "[|\n\r]" : "[ \t\n\r\f]";
+
+    private long    interval;   // refresh interval in msec
+    private long    lastTime;   // last time the command was performed
+    final private boolean redirectErrorStream; // merge stdout and stderr
+    private Map<String, String> environment; // env for the command execution
+    private File dir;
+    private Process process; // sub process used to execute the command
+    private int exitCode;
+    /**Time after which the executing script would be timedout*/
+    protected long timeOutInterval = 0L;
+    /** If or not script timed out*/
+    private AtomicBoolean timedOut;
+
+    /**If or not script finished executing*/
+    private volatile AtomicBoolean completed;
+
+    public ShellUtils() {
+        this(0L);
+    }
+
+    public ShellUtils(long interval) {
+        this(interval, false);
+    }
+
+    /**
+     * @param interval the minimum duration to wait before re-executing the
+     *        command.
+     */
+    public ShellUtils(long interval, boolean redirectErrorStream) {
+        this.interval = interval;
+        this.lastTime = (interval<0) ? 0 : -interval;
+        this.redirectErrorStream = redirectErrorStream;
+    }
+
+    /** set the environment for the command
+     * @param env Mapping of environment variables
+     */
+    protected void setEnvironment(Map<String, String> env) {
+        this.environment = env;
+    }
+
+    /** set the working directory
+     * @param dir The directory where the command would be executed
+     */
+    protected void setWorkingDirectory(File dir) {
+        this.dir = dir;
+    }
+
+    /** a Unix command to get the current user's groups list */
+    public static String[] getGroupsCommand() {
+        return (WINDOWS)? new String[]{"cmd", "/c", "groups"}
+        : new String[]{"bash", "-c", "groups"};
+    }
+
+    /**
+     * a Unix command to get a given user's groups list.
+     * If the OS is not WINDOWS, the command will get the user's primary group
+     * first and finally get the groups list which includes the primary group.
+     * i.e. the user's primary group will be included twice.
+     */
+    public static String[] getGroupsForUserCommand(final String user) {
+        //'groups username' command return is non-consistent across different unixes
+        return new String [] {"bash", "-c", "id -gn " + user
+                         + "&& id -Gn " + user};
+    }
+
+
+    /** check to see if a command needs to be executed and execute if needed */
+    protected void run() throws IOException {
+        if (lastTime + interval > System.currentTimeMillis())
+            return;
+        exitCode = 0; // reset for next run
+        runCommand();
+    }
+
+    /** Run a command */
+    private void runCommand() throws IOException {
+        ProcessBuilder builder = new ProcessBuilder(getExecString());
+        Timer timeOutTimer = null;
+        ShellTimeoutTimerTask timeoutTimerTask = null;
+        timedOut = new AtomicBoolean(false);
+        completed = new AtomicBoolean(false);
+
+        if (environment != null) {
+            builder.environment().putAll(this.environment);
+        }
+        if (dir != null) {
+            builder.directory(this.dir);
+        }
+
+        builder.redirectErrorStream(redirectErrorStream);
+        process = builder.start();
+
+        if (timeOutInterval > 0) {
+            timeOutTimer = new Timer("Shell command timeout");
+            timeoutTimerTask = new ShellTimeoutTimerTask(this);
+            //One time scheduling.
+            timeOutTimer.schedule(timeoutTimerTask, timeOutInterval);
+        }
+        final BufferedReader errReader =
+            new BufferedReader(new InputStreamReader(process
+                                                     .getErrorStream()));
+        BufferedReader inReader =
+            new BufferedReader(new InputStreamReader(process
+                                                     .getInputStream()));
+        final StringBuffer errMsg = new StringBuffer();
+
+        // read error and input streams as this would free up the buffers
+        // free the error stream buffer
+        Thread errThread = new Thread() {
+                @Override
+                public void run() {
+                    try {
+                        String line = errReader.readLine();
+                        while((line != null) && !isInterrupted()) {
+                            errMsg.append(line);
+                            errMsg.append(System.getProperty("line.separator"));
+                            line = errReader.readLine();
+                        }
+                    } catch(IOException ioe) {
+                        LOG.warn("Error reading the error stream", ioe);
+                    }
+                }
+            };
+        try {
+            errThread.start();
+        } catch (IllegalStateException ise) { }
+        try {
+            parseExecResult(inReader); // parse the output
+            // clear the input stream buffer
+            String line = inReader.readLine();
+            while(line != null) {
+                line = inReader.readLine();
+            }
+            // wait for the process to finish and check the exit code
+            exitCode  = process.waitFor();
+            // make sure that the error thread exits
+            joinThread(errThread);
+            completed.set(true);
+            //the timeout thread handling
+            //taken care in finally block
+            if (exitCode != 0) {
+                throw new ExitCodeException(exitCode, errMsg.toString());
+            }
+        } catch (InterruptedException ie) {
+            throw new IOException(ie.toString());
+        } finally {
+            if (timeOutTimer != null) {
+                timeOutTimer.cancel();
+            }
+            // close the input stream
+            try {
+                // JDK 7 tries to automatically drain the input streams for us
+                // when the process exits, but since close is not synchronized,
+                // it creates a race if we close the stream first and the same
+                // fd is recycled.  the stream draining thread will attempt to
+                // drain that fd!!  it may block, OOM, or cause bizarre behavior
+                // see: https://bugs.openjdk.java.net/browse/JDK-8024521
+                //      issue is fixed in build 7u60
+                InputStream stdout = process.getInputStream();
+                synchronized (stdout) {
+                    inReader.close();
+                }
+            } catch (IOException ioe) {
+                LOG.warn("Error while closing the input stream", ioe);
+            }
+            if (!completed.get()) {
+                errThread.interrupt();
+                joinThread(errThread);
+            }
+            try {
+                InputStream stderr = process.getErrorStream();
+                synchronized (stderr) {
+                    errReader.close();
+                }
+            } catch (IOException ioe) {
+                LOG.warn("Error while closing the error stream", ioe);
+            }
+            process.destroy();
+            lastTime = System.currentTimeMillis();
+        }
+    }
+
+    private static void joinThread(Thread t) {
+        while (t.isAlive()) {
+            try {
+                t.join();
+            } catch (InterruptedException ie) {
+                if (LOG.isWarnEnabled()) {
+                    LOG.warn("Interrupted while joining on: " + t, ie);
+                }
+                t.interrupt(); // propagate interrupt
+            }
+        }
+    }
+
+    /** return an array containing the command name & its parameters */
+    protected abstract String[] getExecString();
+
+    /** Parse the execution result */
+    protected abstract void parseExecResult(BufferedReader lines)
+        throws IOException;
+
+    /** get the current sub-process executing the given command
+     * @return process executing the command
+     */
+    public Process getProcess() {
+        return process;
+    }
+
+    /**
+     * This is an IOException with exit code added.
+     */
+    public static class ExitCodeException extends IOException {
+        int exitCode;
+
+        public ExitCodeException(int exitCode, String message) {
+            super(message);
+            this.exitCode = exitCode;
+        }
+
+        public int getExitCode() {
+            return exitCode;
+        }
+    }
+
+    /**
+     * A simple shell command executor.
+     *
+     * <code>ShellCommandExecutor</code>should be used in cases where the output
+     * of the command needs no explicit parsing and where the command, working
+     * directory and the environment remains unchanged. The output of the command
+     * is stored as-is and is expected to be small.
+     */
+    public static class ShellCommandExecutor extends ShellUtils {
+
+        private String[] command;
+        private StringBuffer output;
+
+
+        public ShellCommandExecutor(String[] execString) {
+            this(execString, null);
+        }
+
+        public ShellCommandExecutor(String[] execString, File dir) {
+            this(execString, dir, null);
+        }
+
+        public ShellCommandExecutor(String[] execString, File dir,
+                                    Map<String, String> env) {
+            this(execString, dir, env , 0L);
+        }
+
+        /**
+         * Create a new instance of the ShellCommandExecutor to execute a command.
+         *
+         * @param execString The command to execute with arguments
+         * @param dir If not-null, specifies the directory which should be set
+         *            as the current working directory for the command.
+         *            If null, the current working directory is not modified.
+         * @param env If not-null, environment of the command will include the
+         *            key-value pairs specified in the map. If null, the current
+         *            environment is not modified.
+         * @param timeout Specifies the time in milliseconds, after which the
+         *                command will be killed and the status marked as timedout.
+         *                If 0, the command will not be timed out.
+         */
+        public ShellCommandExecutor(String[] execString, File dir,
+                                    Map<String, String> env, long timeout) {
+            command = execString.clone();
+            if (dir != null) {
+                setWorkingDirectory(dir);
+            }
+            if (env != null) {
+                setEnvironment(env);
+            }
+            timeOutInterval = timeout;
+        }
+
+
+        /** Execute the shell command. */
+        public void execute() throws IOException {
+            this.run();
+        }
+
+        @Override
+        public String[] getExecString() {
+            return command;
+        }
+
+        @Override
+        protected void parseExecResult(BufferedReader lines) throws IOException {
+            output = new StringBuffer();
+            char[] buf = new char[512];
+            int nRead;
+            while ( (nRead = lines.read(buf, 0, buf.length)) > 0 ) {
+                output.append(buf, 0, nRead);
+            }
+        }
+
+        /** Get the output of the shell command.*/
+        public String getOutput() {
+            return (output == null) ? "" : output.toString();
+        }
+
+        /**
+         * Returns the commands of this instance.
+         * Arguments with spaces in are presented with quotes round; other
+         * arguments are presented raw
+         *
+         * @return a string representation of the object.
+         */
+        @Override
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            String[] args = getExecString();
+            for (String s : args) {
+                if (s.indexOf(' ') >= 0) {
+                    builder.append('"').append(s).append('"');
+                } else {
+                    builder.append(s);
+                }
+                builder.append(' ');
+            }
+            return builder.toString();
+        }
+    }
+
+    /**
+     * To check if the passed script to shell command executor timed out or
+     * not.
+     *
+     * @return if the script timed out.
+     */
+    public boolean isTimedOut() {
+        return timedOut.get();
+    }
+
+    /**
+     * Set if the command has timed out.
+     *
+     */
+    private void setTimedOut() {
+        this.timedOut.set(true);
+    }
+
+
+    /**
+     * Static method to execute a shell command.
+     * Covers most of the simple cases without requiring the user to implement
+     * the <code>Shell</code> interface.
+     * @param cmd shell command to execute.
+     * @return the output of the executed command.
+     */
+    public static String execCommand(String ... cmd) throws IOException {
+        return execCommand(null, cmd, 0L);
+    }
+
+    /**
+     * Static method to execute a shell command.
+     * Covers most of the simple cases without requiring the user to implement
+     * the <code>Shell</code> interface.
+     * @param env the map of environment key=value
+     * @param cmd shell command to execute.
+     * @param timeout time in milliseconds after which script should be marked timeout
+     * @return the output of the executed command.o
+     */
+
+    public static String execCommand(Map<String, String> env, String[] cmd,
+                                     long timeout) throws IOException {
+        ShellCommandExecutor exec = new ShellCommandExecutor(cmd, null, env,
+                                                             timeout);
+        exec.execute();
+        return exec.getOutput();
+    }
+
+    /**
+     * Static method to execute a shell command.
+     * Covers most of the simple cases without requiring the user to implement
+     * the <code>Shell</code> interface.
+     * @param env the map of environment key=value
+     * @param cmd shell command to execute.
+     * @return the output of the executed command.
+     */
+    public static String execCommand(Map<String,String> env, String ... cmd)
+        throws IOException {
+        return execCommand(env, cmd, 0L);
+    }
+
+    /**
+     * Timer which is used to timeout scripts spawned off by shell.
+     */
+    private static class ShellTimeoutTimerTask extends TimerTask {
+
+        private ShellUtils shell;
+
+        public ShellTimeoutTimerTask(ShellUtils shell) {
+            this.shell = shell;
+        }
+
+        @Override
+        public void run() {
+            Process p = shell.getProcess();
+            try {
+                p.exitValue();
+            } catch (Exception e) {
+                //Process has not terminated.
+                //So check if it has completed
+                //if not just destroy it.
+                if (p != null && !shell.completed.get()) {
+                    shell.setTimedOut();
+                    p.destroy();
+                }
+            }
+        }
+    }
+
+}


[3/4] git commit: STORM-347. (Security) authentication should allow for groups not just users. Added Unit test for ShellBaasedGroupsMapping.

Posted by bo...@apache.org.
STORM-347. (Security) authentication should allow for groups not just
users. Added Unit test for ShellBaasedGroupsMapping.


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/fb882ca1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/fb882ca1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/fb882ca1

Branch: refs/heads/security
Commit: fb882ca18f124c7f6b65b7edeb2c0a652e254c3a
Parents: 9e13a35
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Mon Jul 28 12:51:13 2014 -0700
Committer: Sriharsha Chintalapani <ma...@harsha.io>
Committed: Mon Jul 28 12:51:13 2014 -0700

----------------------------------------------------------------------
 conf/defaults.yaml                              |  2 +-
 .../security/auth/ShellBasedGroupsMapping.java  | 94 ++++++++++++++++++++
 .../auth/ShellBasedUnixGroupsMapping.java       | 94 --------------------
 .../backtype/storm/security/auth/auth_test.clj  | 11 ++-
 4 files changed, 105 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fb882ca1/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index b0ebb4d..d4283a4 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -38,7 +38,7 @@ storm.cluster.mode: "distributed" # can be distributed or local
 storm.local.mode.zmq: false
 storm.thrift.transport: "backtype.storm.security.auth.SimpleTransportPlugin"
 storm.principal.tolocal: "backtype.storm.security.auth.DefaultPrincipalToLocal"
-storm.group.mapping.service: "backtype.storm.security.auth.ShellBasedUnixGroupsMapping"
+storm.group.mapping.service: "backtype.storm.security.auth.ShellBasedGroupsMapping"
 storm.messaging.transport: "backtype.storm.messaging.netty.Context"
 storm.nimbus.retry.times: 5
 storm.nimbus.retry.interval.millis: 2000

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fb882ca1/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedGroupsMapping.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedGroupsMapping.java b/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedGroupsMapping.java
new file mode 100644
index 0000000..62a4c7e
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedGroupsMapping.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.security.auth;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.StringTokenizer;
+import backtype.storm.Config;
+import backtype.storm.utils.Utils;
+import backtype.storm.utils.ShellUtils;
+import backtype.storm.utils.TimeCacheMap;
+import backtype.storm.utils.ShellUtils.ExitCodeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ShellBasedGroupsMapping implements
+                                             IGroupMappingServiceProvider {
+
+    public static Logger LOG = LoggerFactory.getLogger(ShellBasedGroupsMapping.class);
+    public TimeCacheMap<String, Set<String>> cachedGroups;
+
+    /**
+     * Invoked once immediately after construction
+     * @param storm_conf Storm configuration
+     */
+    @Override
+    public void prepare(Map storm_conf) {
+        int timeout = Utils.getInt(storm_conf.get(Config.STORM_GROUP_MAPPING_SERVICE_CACHE_DURATION_SECS));
+        cachedGroups = new TimeCacheMap<String, Set<String>>(timeout);
+    }
+
+    /**
+     * Returns list of groups for a user
+     *
+     * @param user get groups for this user
+     * @return list of groups for a given user
+     */
+    @Override
+    public Set<String> getGroups(String user) throws IOException {
+        if(cachedGroups.containsKey(user)) {
+            return cachedGroups.get(user);
+        }
+        Set<String> groups = getUnixGroups(user);
+        if(!groups.isEmpty())
+            cachedGroups.put(user,groups);
+        return groups;
+    }
+
+    /**
+     * Get the current user's group list from Unix by running the command 'groups'
+     * NOTE. For non-existing user it will return EMPTY list
+     * @param user user name
+     * @return the groups set that the <code>user</code> belongs to
+     * @throws IOException if encounter any error when running the command
+     */
+    private static Set<String> getUnixGroups(final String user) throws IOException {
+        String result = "";
+        try {
+            result = ShellUtils.execCommand(ShellUtils.getGroupsForUserCommand(user));
+        } catch (ExitCodeException e) {
+            // if we didn't get the group - just return empty list;
+            LOG.warn("got exception trying to get groups for user " + user, e);
+            return new HashSet<String>();
+        }
+
+        StringTokenizer tokenizer =
+            new StringTokenizer(result, ShellUtils.TOKEN_SEPARATOR_REGEX);
+        Set<String> groups = new HashSet<String>();
+        while (tokenizer.hasMoreTokens()) {
+            groups.add(tokenizer.nextToken());
+        }
+        return groups;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fb882ca1/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedUnixGroupsMapping.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedUnixGroupsMapping.java b/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedUnixGroupsMapping.java
deleted file mode 100644
index 438b938..0000000
--- a/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedUnixGroupsMapping.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package backtype.storm.security.auth;
-
-import java.io.IOException;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.StringTokenizer;
-import backtype.storm.Config;
-import backtype.storm.utils.Utils;
-import backtype.storm.utils.ShellUtils;
-import backtype.storm.utils.TimeCacheMap;
-import backtype.storm.utils.ShellUtils.ExitCodeException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class ShellBasedUnixGroupsMapping implements
-                                             IGroupMappingServiceProvider {
-
-    public static Logger LOG = LoggerFactory.getLogger(ShellBasedUnixGroupsMapping.class);
-    public TimeCacheMap<String, Set<String>> cachedGroups;
-
-    /**
-     * Invoked once immediately after construction
-     * @param storm_conf Storm configuration
-     */
-    @Override
-    public void prepare(Map storm_conf) {
-        int timeout = Utils.getInt(storm_conf.get(Config.STORM_GROUP_MAPPING_SERVICE_CACHE_DURATION_SECS));
-        cachedGroups = new TimeCacheMap<String, Set<String>>(timeout);
-    }
-
-    /**
-     * Returns list of groups for a user
-     *
-     * @param user get groups for this user
-     * @return list of groups for a given user
-     */
-    @Override
-    public Set<String> getGroups(String user) throws IOException {
-        if(cachedGroups.containsKey(user)) {
-            return cachedGroups.get(user);
-        }
-        Set<String> groups = getUnixGroups(user);
-        if(!groups.isEmpty())
-            cachedGroups.put(user,groups);
-        return groups;
-    }
-
-    /**
-     * Get the current user's group list from Unix by running the command 'groups'
-     * NOTE. For non-existing user it will return EMPTY list
-     * @param user user name
-     * @return the groups set that the <code>user</code> belongs to
-     * @throws IOException if encounter any error when running the command
-     */
-    private static Set<String> getUnixGroups(final String user) throws IOException {
-        String result = "";
-        try {
-            result = ShellUtils.execCommand(ShellUtils.getGroupsForUserCommand(user));
-        } catch (ExitCodeException e) {
-            // if we didn't get the group - just return empty list;
-            LOG.warn("got exception trying to get groups for user " + user, e);
-            return new HashSet<String>();
-        }
-
-        StringTokenizer tokenizer =
-            new StringTokenizer(result, ShellUtils.TOKEN_SEPARATOR_REGEX);
-        Set<String> groups = new HashSet<String>();
-        while (tokenizer.hasMoreTokens()) {
-            groups.add(tokenizer.nextToken());
-        }
-        return groups;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fb882ca1/storm-core/test/clj/backtype/storm/security/auth/auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/security/auth/auth_test.clj b/storm-core/test/clj/backtype/storm/security/auth/auth_test.clj
index 6fdd485..12411e7 100644
--- a/storm-core/test/clj/backtype/storm/security/auth/auth_test.clj
+++ b/storm-core/test/clj/backtype/storm/security/auth/auth_test.clj
@@ -26,7 +26,7 @@
   (:import [backtype.storm.generated AuthorizationException])
   (:import [backtype.storm.utils NimbusClient])
   (:import [backtype.storm.security.auth.authorizer SimpleWhitelistAuthorizer SimpleACLAuthorizer])
-  (:import [backtype.storm.security.auth AuthUtils ThriftServer ThriftClient 
+  (:import [backtype.storm.security.auth AuthUtils ThriftServer ThriftClient ShellBasedGroupsMapping 
             ReqContext SimpleTransportPlugin KerberosPrincipalToLocal ThriftConnectionType])
   (:use [backtype.storm bootstrap util])
   (:use [backtype.storm.daemon common])
@@ -278,6 +278,15 @@
   (is (= false (.permit authorizer (ReqContext. supervisor-user) "getTopologyInfo" {TOPOLOGY-USERS ["user-a"]})))
 ))
 
+(deftest shell-based-groups-mapping-test
+  (let [cluster-conf (merge (read-storm-config))
+        groups (ShellBasedGroupsMapping. )
+        user-name (System/getProperty "user.name")]
+    (.prepare groups cluster-conf)
+    (>= 0 (.size (.getGroups groups user-name)))
+    (>= 0 (.size (.getGroups groups "userDoesNotExist")))
+    (>= 0 (.size (.getGroups groups nil)))))
+
 (deftest simple-acl-same-user-auth-test
   (let [cluster-conf (merge (read-storm-config)
                        {NIMBUS-ADMINS ["admin"]


[4/4] git commit: Merge branch 'STORM-347' of https://github.com/harshach/incubator-storm into STORM-347

Posted by bo...@apache.org.
Merge branch 'STORM-347' of https://github.com/harshach/incubator-storm into STORM-347

STORM-347: Authentication should allow for groups not just users.


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/ff8336b7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/ff8336b7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/ff8336b7

Branch: refs/heads/security
Commit: ff8336b70d20bccb29f0f5acd8c7f7e4323dca94
Parents: 642ed74 fb882ca
Author: Robert (Bobby) Evans <bo...@apache.org>
Authored: Mon Jul 28 16:59:42 2014 -0500
Committer: Robert (Bobby) Evans <bo...@apache.org>
Committed: Mon Jul 28 16:59:42 2014 -0500

----------------------------------------------------------------------
 conf/defaults.yaml                              |   6 +-
 storm-core/src/jvm/backtype/storm/Config.java   | 176 ++++---
 .../backtype/storm/security/auth/AuthUtils.java |  49 +-
 .../auth/IGroupMappingServiceProvider.java      |  42 ++
 .../security/auth/ShellBasedGroupsMapping.java  |  94 ++++
 .../auth/authorizer/SimpleACLAuthorizer.java    |  29 +-
 .../jvm/backtype/storm/utils/ShellUtils.java    | 498 +++++++++++++++++++
 .../backtype/storm/security/auth/auth_test.clj  |  11 +-
 8 files changed, 797 insertions(+), 108 deletions(-)
----------------------------------------------------------------------



[2/4] git commit: STORM-347. (Security) authentication should allow for groups not just users. Added Bobby's suggested changes.

Posted by bo...@apache.org.
STORM-347. (Security) authentication should allow for groups not just
users. Added Bobby's suggested changes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/9e13a356
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/9e13a356
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/9e13a356

Branch: refs/heads/security
Commit: 9e13a356268e1e8bf76ac29c42a441a0f49108be
Parents: f3112fa
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Mon Jul 21 15:13:35 2014 -0700
Committer: Sriharsha Chintalapani <ma...@harsha.io>
Committed: Mon Jul 21 15:13:35 2014 -0700

----------------------------------------------------------------------
 conf/defaults.yaml                              |   3 +
 storm-core/src/jvm/backtype/storm/Config.java   | 170 ++++++++++---------
 .../auth/IGroupMappingServiceProvider.java      |  12 --
 .../auth/ShellBasedUnixGroupsMapping.java       |  26 +--
 4 files changed, 107 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/9e13a356/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 83b7b4d..b0ebb4d 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -154,6 +154,9 @@ storm.messaging.netty.transfer.batch.size: 262144
 # We check with this interval that whether the Netty channel is writable and try to write pending messages if it is.
 storm.messaging.netty.flush.check.interval.ms: 10
 
+# default number of seconds group mapping service will cache user group
+storm.group.mapping.service.cache.duration.secs: 120
+
 ### topology.* configs are for specific executing storms
 topology.enable.message.timeouts: true
 topology.debug: false

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/9e13a356/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index fee5f6e..ea54313 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -28,11 +28,11 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * Topology configs are specified as a plain old map. This class provides a 
- * convenient way to create a topology config map by providing setter methods for 
- * all the configs that can be set. It also makes it easier to do things like add 
+ * Topology configs are specified as a plain old map. This class provides a
+ * convenient way to create a topology config map by providing setter methods for
+ * all the configs that can be set. It also makes it easier to do things like add
  * serializations.
- * 
+ *
  * <p>This class also provides constants for all the configurations possible on
  * a Storm cluster and Storm topology. Each constant is paired with a schema
  * that defines the validity criterion of the corresponding field. Default
@@ -40,7 +40,7 @@ import java.util.Map;
  *
  * <p>Note that you may put other configurations in any of the configs. Storm
  * will ignore anything it doesn't recognize, but your topologies are free to make
- * use of them by reading them in the prepare method of Bolts or the open method of 
+ * use of them by reading them in the prepare method of Bolts or the open method of
  * Spouts.</p>
  */
 public class Config extends HashMap<String, Object> {
@@ -60,39 +60,39 @@ public class Config extends HashMap<String, Object> {
     /**
      * Netty based messaging: The buffer size for send/recv buffer
      */
-    public static final String STORM_MESSAGING_NETTY_BUFFER_SIZE = "storm.messaging.netty.buffer_size"; 
+    public static final String STORM_MESSAGING_NETTY_BUFFER_SIZE = "storm.messaging.netty.buffer_size";
     public static final Object STORM_MESSAGING_NETTY_BUFFER_SIZE_SCHEMA = Number.class;
 
     /**
      * Netty based messaging: The max # of retries that a peer will perform when a remote is not accessible
      */
-    public static final String STORM_MESSAGING_NETTY_MAX_RETRIES = "storm.messaging.netty.max_retries"; 
+    public static final String STORM_MESSAGING_NETTY_MAX_RETRIES = "storm.messaging.netty.max_retries";
     public static final Object STORM_MESSAGING_NETTY_MAX_RETRIES_SCHEMA = Number.class;
 
     /**
-     * Netty based messaging: The min # of milliseconds that a peer will wait. 
+     * Netty based messaging: The min # of milliseconds that a peer will wait.
      */
-    public static final String STORM_MESSAGING_NETTY_MIN_SLEEP_MS = "storm.messaging.netty.min_wait_ms"; 
+    public static final String STORM_MESSAGING_NETTY_MIN_SLEEP_MS = "storm.messaging.netty.min_wait_ms";
     public static final Object STORM_MESSAGING_NETTY_MIN_SLEEP_MS_SCHEMA = Number.class;
 
     /**
-     * Netty based messaging: The max # of milliseconds that a peer will wait. 
+     * Netty based messaging: The max # of milliseconds that a peer will wait.
      */
-    public static final String STORM_MESSAGING_NETTY_MAX_SLEEP_MS = "storm.messaging.netty.max_wait_ms"; 
+    public static final String STORM_MESSAGING_NETTY_MAX_SLEEP_MS = "storm.messaging.netty.max_wait_ms";
     public static final Object STORM_MESSAGING_NETTY_MAX_SLEEP_MS_SCHEMA = Number.class;
 
     /**
      * Netty based messaging: The # of worker threads for the server.
      */
-    public static final String STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS = "storm.messaging.netty.server_worker_threads"; 
+    public static final String STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS = "storm.messaging.netty.server_worker_threads";
     public static final Object STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS_SCHEMA = Number.class;
 
     /**
      * Netty based messaging: The # of worker threads for the client.
      */
-    public static final String STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS = "storm.messaging.netty.client_worker_threads"; 
+    public static final String STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS = "storm.messaging.netty.client_worker_threads";
     public static final Object STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS_SCHEMA = Number.class;
-    
+
     /**
      * If the Netty messaging layer is busy, the Netty client will try to batch message as more as possible up to the size of STORM_NETTY_MESSAGE_BATCH_SIZE bytes
      */
@@ -104,8 +104,8 @@ public class Config extends HashMap<String, Object> {
      */
     public static final String STORM_NETTY_FLUSH_CHECK_INTERVAL_MS = "storm.messaging.netty.flush.check.interval.ms";
     public static final Object STORM_NETTY_FLUSH_CHECK_INTERVAL_MS_SCHEMA = Number.class;
-    
-    
+
+
     /**
      * A list of hosts of ZooKeeper servers used to manage the cluster.
      */
@@ -128,7 +128,7 @@ public class Config extends HashMap<String, Object> {
 
     /**
      * A global task scheduler used to assign topologies's tasks to supervisors' wokers.
-     * 
+     *
      * If this is not set, a default system scheduler will be used.
      */
     public static final String STORM_SCHEDULER = "storm.scheduler";
@@ -141,9 +141,9 @@ public class Config extends HashMap<String, Object> {
     public static final Object STORM_CLUSTER_MODE_SCHEMA = String.class;
 
     /**
-     * The hostname the supervisors/workers should report to nimbus. If unset, Storm will 
+     * The hostname the supervisors/workers should report to nimbus. If unset, Storm will
      * get the hostname to report by calling <code>InetAddress.getLocalHost().getCanonicalHostName()</code>.
-     * 
+     *
      * You should set this config when you dont have a DNS which supervisors/workers
      * can utilize to find each other based on hostname got from calls to
      * <code>InetAddress.getLocalHost().getCanonicalHostName()</code>.
@@ -164,13 +164,19 @@ public class Config extends HashMap<String, Object> {
     public static final Object STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN_SCHEMA = String.class;
 
     /**
+     * Max no.of seconds group mapping service will cache user groups
+     */
+    public static final String STORM_GROUP_MAPPING_SERVICE_CACHE_DURATION_SECS = "storm.group.mapping.service.cache.duration.secs";
+    public static final Object STORM_GROUP_MAPPING_SERVICE_CACHE_DURATION_SECS_SCHEMA = Number.class;
+
+    /**
      * The default transport plug-in for Thrift client/server communication
      */
     public static final String STORM_THRIFT_TRANSPORT_PLUGIN = "storm.thrift.transport";
     public static final Object STORM_THRIFT_TRANSPORT_PLUGIN_SCHEMA = String.class;
 
     /**
-     * The serializer class for ListDelegate (tuple payload). 
+     * The serializer class for ListDelegate (tuple payload).
      * The default serializer will be ListDelegateSerializer
      */
     public static final String TOPOLOGY_TUPLE_SERIALIZER = "topology.tuple.serializer";
@@ -184,9 +190,9 @@ public class Config extends HashMap<String, Object> {
     public static final Object TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE_SCHEMA = Boolean.class;
 
     /**
-     * Whether or not to use ZeroMQ for messaging in local mode. If this is set 
-     * to false, then Storm will use a pure-Java messaging system. The purpose 
-     * of this flag is to make it easy to run Storm in local mode by eliminating 
+     * Whether or not to use ZeroMQ for messaging in local mode. If this is set
+     * to false, then Storm will use a pure-Java messaging system. The purpose
+     * of this flag is to make it easy to run Storm in local mode by eliminating
      * the need for native dependencies, which can be difficult to install.
      *
      * Defaults to false.
@@ -271,7 +277,7 @@ public class Config extends HashMap<String, Object> {
      */
     public static final String STORM_NIMBUS_RETRY_TIMES="storm.nimbus.retry.times";
     public static final Object STORM_NIMBUS_RETRY_TIMES_SCHEMA = Number.class;
-    
+
     /**
      * The starting interval between exponential backoff retries of a Nimbus operation.
      */
@@ -311,15 +317,15 @@ public class Config extends HashMap<String, Object> {
 
     /**
      * A list of users that are cluster admins and can run any command.  To use this set
-     * nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer  
+     * nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
      */
     public static final String NIMBUS_ADMINS = "nimbus.admins";
     public static final Object NIMBUS_ADMINS_SCHEMA = ConfigValidation.StringsValidator;
 
     /**
-     * A list of users that run the supervisors and should be authorized to interact with 
+     * A list of users that run the supervisors and should be authorized to interact with
      * nimbus as a supervisor would.  To use this set
-     * nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer  
+     * nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
      */
     public static final String NIMBUS_SUPERVISOR_USERS = "nimbus.supervisor.users";
     public static final Object NIMBUS_SUPERVISOR_USERS_SCHEMA = ConfigValidation.StringsValidator;
@@ -392,7 +398,7 @@ public class Config extends HashMap<String, Object> {
     public static final Object NIMBUS_TASK_LAUNCH_SECS_SCHEMA = Number.class;
 
     /**
-     * Whether or not nimbus should reassign tasks if it detects that a task goes down. 
+     * Whether or not nimbus should reassign tasks if it detects that a task goes down.
      * Defaults to true, and it's not recommended to change this value.
      */
     public static final String NIMBUS_REASSIGN = "nimbus.reassign";
@@ -462,7 +468,7 @@ public class Config extends HashMap<String, Object> {
     public static final Object LOGVIEWER_CLEANUP_AGE_MINS_SCHEMA = ConfigValidation.PositiveIntegerValidator;
 
     /**
-     * A list of users allowed to view logs via the Log Viewer 
+     * A list of users allowed to view logs via the Log Viewer
      */
     public static final String LOGS_USERS = "logs.users";
     public static final Object LOGS_USERS_SCHEMA = ConfigValidation.StringsValidator;
@@ -594,7 +600,7 @@ public class Config extends HashMap<String, Object> {
     public static final Object DRPC_AUTHORIZER_ACL_STRICT_SCHEMA = Boolean.class;
 
     /**
-     * DRPC thrift server worker threads 
+     * DRPC thrift server worker threads
      */
     public static final String DRPC_WORKER_THREADS = "drpc.worker.threads";
     public static final Object DRPC_WORKER_THREADS_SCHEMA = Number.class;
@@ -606,7 +612,7 @@ public class Config extends HashMap<String, Object> {
     public static final Object DRPC_MAX_BUFFER_SIZE_SCHEMA = Number.class;
 
     /**
-     * DRPC thrift server queue size 
+     * DRPC thrift server queue size
      */
     public static final String DRPC_QUEUE_SIZE = "drpc.queue.size";
     public static final Object DRPC_QUEUE_SIZE_SCHEMA = Number.class;
@@ -618,13 +624,13 @@ public class Config extends HashMap<String, Object> {
     public static final Object DRPC_INVOCATIONS_THRIFT_TRANSPORT_PLUGIN_SCHEMA = String.class;
 
     /**
-     * This port on Storm DRPC is used by DRPC topologies to receive function invocations and send results back. 
+     * This port on Storm DRPC is used by DRPC topologies to receive function invocations and send results back.
      */
     public static final String DRPC_INVOCATIONS_PORT = "drpc.invocations.port";
     public static final Object DRPC_INVOCATIONS_PORT_SCHEMA = Number.class;
 
     /**
-     * DRPC invocations thrift server worker threads 
+     * DRPC invocations thrift server worker threads
      */
     public static final String DRPC_INVOCATIONS_THREADS = "drpc.invocations.threads";
     public static final Object DRPC_INVOCATIONS_THREADS_SCHEMA = Number.class;
@@ -667,7 +673,7 @@ public class Config extends HashMap<String, Object> {
      */
     public static final String SUPERVISOR_SLOTS_PORTS = "supervisor.slots.ports";
     public static final Object SUPERVISOR_SLOTS_PORTS_SCHEMA = ConfigValidation.NumbersValidator;
-    
+
     /**
      * A number representing the maximum number of workers any single topology can acquire.
      */
@@ -739,15 +745,15 @@ public class Config extends HashMap<String, Object> {
     public static final Object SUPERVISOR_MONITOR_FREQUENCY_SECS_SCHEMA = Number.class;
 
     /**
-     * Should the supervior try to run the worker as the lauching user or not.  Defaults to false. 
+     * Should the supervior try to run the worker as the lauching user or not.  Defaults to false.
      */
     public static final String SUPERVISOR_RUN_WORKER_AS_USER = "supervisor.run.worker.as.user";
     public static final Object SUPERVISOR_RUN_WORKER_AS_USER_SCHEMA = Boolean.class;
 
     /**
-     * Full path to the worker-laucher executable that will be used to lauch workers when 
+     * Full path to the worker-laucher executable that will be used to lauch workers when
      * SUPERVISOR_RUN_WORKER_AS_USER is set to true.
-     */ 
+     */
     public static final String SUPERVISOR_WORKER_LAUNCHER = "supervisor.worker.launcher";
     public static final Object SUPERVISOR_WORKER_LAUNCHER_SCHEMA = String.class;
 
@@ -772,7 +778,7 @@ public class Config extends HashMap<String, Object> {
      */
     public static final String WORKER_RECEIVER_THREAD_COUNT = "topology.worker.receiver.thread.count";
     public static final Object WORKER_RECEIVER_THREAD_COUNT_SCHEMA = Number.class;
-    
+
     /**
      * How often this worker should heartbeat to the supervisor.
      */
@@ -806,7 +812,7 @@ public class Config extends HashMap<String, Object> {
 
     /**
      * A list of users that are allowed to interact with the topology.  To use this set
-     * nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer  
+     * nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
      */
     public static final String TOPOLOGY_USERS = "topology.users";
     public static final Object TOPOLOGY_USERS_SCHEMA = ConfigValidation.StringsValidator;
@@ -843,8 +849,8 @@ public class Config extends HashMap<String, Object> {
     /**
      * How many instances to create for a spout/bolt. A task runs on a thread with zero or more
      * other tasks for the same spout/bolt. The number of tasks for a spout/bolt is always
-     * the same throughout the lifetime of a topology, but the number of executors (threads) for 
-     * a spout/bolt can change over time. This allows a topology to scale to more or less resources 
+     * the same throughout the lifetime of a topology, but the number of executors (threads) for
+     * a spout/bolt can change over time. This allows a topology to scale to more or less resources
      * without redeploying the topology or violating the constraints of Storm (such as a fields grouping
      * guaranteeing that the same value goes to the same task).
      */
@@ -883,8 +889,8 @@ public class Config extends HashMap<String, Object> {
 
     /**
      * A list of classes that customize storm's kryo instance during start-up.
-     * Each listed class name must implement IKryoDecorator. During start-up the 
-     * listed class is instantiated with 0 arguments, then its 'decorate' method 
+     * Each listed class name must implement IKryoDecorator. During start-up the
+     * listed class is instantiated with 0 arguments, then its 'decorate' method
      * is called with storm's kryo instance as the only argument.
      */
     public static final String TOPOLOGY_KRYO_DECORATORS = "topology.kryo.decorators";
@@ -914,7 +920,7 @@ public class Config extends HashMap<String, Object> {
 
     /*
      * A list of classes implementing IMetricsConsumer (See storm.yaml.example for exact config format).
-     * Each listed class will be routed all the metrics data generated by the storm metrics API. 
+     * Each listed class will be routed all the metrics data generated by the storm metrics API.
      * Each listed class maps 1:1 to a system bolt named __metrics_ClassName#N, and it's parallelism is configurable.
      */
     public static final String TOPOLOGY_METRICS_CONSUMER_REGISTER = "topology.metrics.consumer.register";
@@ -930,24 +936,24 @@ public class Config extends HashMap<String, Object> {
 
 
     /**
-     * The maximum number of tuples that can be pending on a spout task at any given time. 
-     * This config applies to individual tasks, not to spouts or topologies as a whole. 
-     * 
+     * The maximum number of tuples that can be pending on a spout task at any given time.
+     * This config applies to individual tasks, not to spouts or topologies as a whole.
+     *
      * A pending tuple is one that has been emitted from a spout but has not been acked or failed yet.
-     * Note that this config parameter has no effect for unreliable spouts that don't tag 
+     * Note that this config parameter has no effect for unreliable spouts that don't tag
      * their tuples with a message id.
      */
-    public static final String TOPOLOGY_MAX_SPOUT_PENDING="topology.max.spout.pending"; 
+    public static final String TOPOLOGY_MAX_SPOUT_PENDING="topology.max.spout.pending";
     public static final Object TOPOLOGY_MAX_SPOUT_PENDING_SCHEMA = Number.class;
 
     /**
      * A class that implements a strategy for what to do when a spout needs to wait. Waiting is
      * triggered in one of two conditions:
-     * 
+     *
      * 1. nextTuple emits no tuples
      * 2. The spout has hit maxSpoutPending and can't emit any more tuples
      */
-    public static final String TOPOLOGY_SPOUT_WAIT_STRATEGY="topology.spout.wait.strategy"; 
+    public static final String TOPOLOGY_SPOUT_WAIT_STRATEGY="topology.spout.wait.strategy";
     public static final Object TOPOLOGY_SPOUT_WAIT_STRATEGY_SCHEMA = String.class;
 
     /**
@@ -970,7 +976,7 @@ public class Config extends HashMap<String, Object> {
     public static final Object TOPOLOGY_STATS_SAMPLE_RATE_SCHEMA = Number.class;
 
     /**
-     * The time period that builtin metrics data in bucketed into. 
+     * The time period that builtin metrics data in bucketed into.
      */
     public static final String TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS="topology.builtin.metrics.bucket.size.secs";
     public static final Object TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS_SCHEMA = Number.class;
@@ -1003,7 +1009,7 @@ public class Config extends HashMap<String, Object> {
 
     /**
      * A list of task hooks that are automatically added to every spout and bolt in the topology. An example
-     * of when you'd do this is to add a hook that integrates with your internal 
+     * of when you'd do this is to add a hook that integrates with your internal
      * monitoring system. These hooks are instantiated using the zero-arg constructor.
      */
     public static final String TOPOLOGY_AUTO_TASK_HOOKS="topology.auto.task.hooks";
@@ -1017,7 +1023,7 @@ public class Config extends HashMap<String, Object> {
     public static final Object TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE_SCHEMA = ConfigValidation.PowerOf2Validator;
 
     /**
-     * The maximum number of messages to batch from the thread receiving off the network to the 
+     * The maximum number of messages to batch from the thread receiving off the network to the
      * executor queues. Must be a power of 2.
      */
     public static final String TOPOLOGY_RECEIVER_BUFFER_SIZE="topology.receiver.buffer.size";
@@ -1051,14 +1057,14 @@ public class Config extends HashMap<String, Object> {
     public static final Object TOPOLOGY_DISRUPTOR_WAIT_STRATEGY_SCHEMA = String.class;
 
    /**
-    * The size of the shared thread pool for worker tasks to make use of. The thread pool can be accessed 
+    * The size of the shared thread pool for worker tasks to make use of. The thread pool can be accessed
     * via the TopologyContext.
     */
     public static final String TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE="topology.worker.shared.thread.pool.size";
     public static final Object TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE_SCHEMA = Number.class;
 
     /**
-     * The interval in seconds to use for determining whether to throttle error reported to Zookeeper. For example, 
+     * The interval in seconds to use for determining whether to throttle error reported to Zookeeper. For example,
      * an interval of 10 seconds with topology.max.error.report.per.interval set to 5 will only allow 5 errors to be
      * reported to Zookeeper per task for every 10 second interval of time.
      */
@@ -1081,7 +1087,7 @@ public class Config extends HashMap<String, Object> {
     /**
      * Name of the topology. This config is automatically set by Storm when the topology is submitted.
      */
-    public final static String TOPOLOGY_NAME="topology.name";  
+    public final static String TOPOLOGY_NAME="topology.name";
     public static final Object TOPOLOGY_NAME_SCHEMA = String.class;
 
     /**
@@ -1095,19 +1101,19 @@ public class Config extends HashMap<String, Object> {
      */
     public static final String TOPOLOGY_SUBMITTER_USER = "topology.submitter.user";
     public static final Object TOPOLOGY_SUBMITTER_USER_SCHEMA = String.class;
-    
+
     /**
      * Array of components that scheduler should try to place on separate hosts.
      */
     public static final String TOPOLOGY_SPREAD_COMPONENTS = "topology.spread.components";
     public static final Object TOPOLOGY_SPREAD_COMPONENTS_SCHEMA = ConfigValidation.StringsValidator;
-   
+
     /**
      * A list of IAutoCredentials that the topology should load and use.
      */
     public static final String TOPOLOGY_AUTO_CREDENTIALS = "topology.auto-credentials";
     public static final Object TOPOLOGY_AUTO_CREDENTIALS_SCHEMA = ConfigValidation.StringsValidator;
- 
+
     /**
      * Max pending tuples in one ShellBolt
      */
@@ -1157,9 +1163,9 @@ public class Config extends HashMap<String, Object> {
 
     /**
      * This value is passed to spawned JVMs (e.g., Nimbus, Supervisor, and Workers)
-     * for the java.library.path value. java.library.path tells the JVM where 
+     * for the java.library.path value. java.library.path tells the JVM where
      * to look for native libraries. It is necessary to set this config correctly since
-     * Storm uses the ZeroMQ and JZMQ native libs. 
+     * Storm uses the ZeroMQ and JZMQ native libs.
      */
     public static final String JAVA_LIBRARY_PATH = "java.library.path";
     public static final Object JAVA_LIBRARY_PATH_SCHEMA = String.class;
@@ -1178,7 +1184,7 @@ public class Config extends HashMap<String, Object> {
      */
     public static final String ISOLATION_SCHEDULER_MACHINES = "isolation.scheduler.machines";
     public static final Object ISOLATION_SCHEDULER_MACHINES_SCHEMA = ConfigValidation.MapOfStringToNumberValidator;
-    
+
     /**
      * A map from the user name to the number of machines that should that user is allowed to use. Set storm.scheduler
      * to backtype.storm.scheduler.multitenant.MultitenantScheduler
@@ -1192,15 +1198,15 @@ public class Config extends HashMap<String, Object> {
      */
     public static final String TOPOLOGY_ISOLATED_MACHINES = "topology.isolate.machines";
     public static final Object TOPOLOGY_ISOLATED_MACHINES_SCHEMA = Number.class;
-    
+
     public static void setDebug(Map conf, boolean isOn) {
         conf.put(Config.TOPOLOGY_DEBUG, isOn);
-    } 
+    }
 
     public void setDebug(boolean isOn) {
         setDebug(this, isOn);
     }
-    
+
     public static void setNumWorkers(Map conf, int workers) {
         conf.put(Config.TOPOLOGY_WORKERS, workers);
     }
@@ -1216,7 +1222,7 @@ public class Config extends HashMap<String, Object> {
     public void setNumAckers(int numExecutors) {
         setNumAckers(this, numExecutors);
     }
-    
+
     public static void setMessageTimeoutSecs(Map conf, int secs) {
         conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, secs);
     }
@@ -1224,7 +1230,7 @@ public class Config extends HashMap<String, Object> {
     public void setMessageTimeoutSecs(int secs) {
         setMessageTimeoutSecs(this, secs);
     }
-    
+
     public static void registerSerialization(Map conf, Class klass) {
         getRegisteredSerializations(conf).add(klass.getName());
     }
@@ -1232,17 +1238,17 @@ public class Config extends HashMap<String, Object> {
     public void registerSerialization(Class klass) {
         registerSerialization(this, klass);
     }
-    
+
     public static void registerSerialization(Map conf, Class klass, Class<? extends Serializer> serializerClass) {
         Map<String, String> register = new HashMap<String, String>();
         register.put(klass.getName(), serializerClass.getName());
-        getRegisteredSerializations(conf).add(register);        
+        getRegisteredSerializations(conf).add(register);
     }
 
     public void registerSerialization(Class klass, Class<? extends Serializer> serializerClass) {
         registerSerialization(this, klass, serializerClass);
     }
-    
+
     public void registerMetricsConsumer(Class klass, Object argument, long parallelismHint) {
         HashMap m = new HashMap();
         m.put("class", klass.getCanonicalName());
@@ -1270,7 +1276,7 @@ public class Config extends HashMap<String, Object> {
     public void registerDecorator(Class<? extends IKryoDecorator> klass) {
         registerDecorator(this, klass);
     }
-    
+
     public static void setKryoFactory(Map conf, Class<? extends IKryoFactory> klass) {
         conf.put(Config.TOPOLOGY_KRYO_FACTORY, klass.getName());
     }
@@ -1286,7 +1292,7 @@ public class Config extends HashMap<String, Object> {
     public void setSkipMissingKryoRegistrations(boolean skip) {
        setSkipMissingKryoRegistrations(this, skip);
     }
-    
+
     public static void setMaxTaskParallelism(Map conf, int max) {
         conf.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, max);
     }
@@ -1294,7 +1300,7 @@ public class Config extends HashMap<String, Object> {
     public void setMaxTaskParallelism(int max) {
         setMaxTaskParallelism(this, max);
     }
-    
+
     public static void setMaxSpoutPending(Map conf, int max) {
         conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, max);
     }
@@ -1302,23 +1308,23 @@ public class Config extends HashMap<String, Object> {
     public void setMaxSpoutPending(int max) {
         setMaxSpoutPending(this, max);
     }
-    
+
     public static void setStatsSampleRate(Map conf, double rate) {
         conf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, rate);
-    }    
+    }
 
     public void setStatsSampleRate(double rate) {
         setStatsSampleRate(this, rate);
-    }    
+    }
 
     public static void setFallBackOnJavaSerialization(Map conf, boolean fallback) {
         conf.put(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION, fallback);
-    }    
+    }
 
     public void setFallBackOnJavaSerialization(boolean fallback) {
         setFallBackOnJavaSerialization(this, fallback);
-    }    
-    
+    }
+
     private static List getRegisteredSerializations(Map conf) {
         List ret;
         if(!conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) {
@@ -1329,13 +1335,13 @@ public class Config extends HashMap<String, Object> {
         conf.put(Config.TOPOLOGY_KRYO_REGISTER, ret);
         return ret;
     }
-    
+
     private static List getRegisteredDecorators(Map conf) {
         List ret;
         if(!conf.containsKey(Config.TOPOLOGY_KRYO_DECORATORS)) {
             ret = new ArrayList();
         } else {
-            ret = new ArrayList((List) conf.get(Config.TOPOLOGY_KRYO_DECORATORS));            
+            ret = new ArrayList((List) conf.get(Config.TOPOLOGY_KRYO_DECORATORS));
         }
         conf.put(Config.TOPOLOGY_KRYO_DECORATORS, ret);
         return ret;

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/9e13a356/storm-core/src/jvm/backtype/storm/security/auth/IGroupMappingServiceProvider.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/IGroupMappingServiceProvider.java b/storm-core/src/jvm/backtype/storm/security/auth/IGroupMappingServiceProvider.java
index 0b49dec..5590b81 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/IGroupMappingServiceProvider.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/IGroupMappingServiceProvider.java
@@ -39,16 +39,4 @@ public interface IGroupMappingServiceProvider {
      */
     public Set<String> getGroups(String user) throws IOException;
 
-    /**
-     * Refresh the cache of groups and user mapping
-     * @throws IOException
-     */
-    public void cacheGroupsRefresh() throws IOException;
-    /**
-     * Caches the group user information
-     * @param groups list of groups to add to cache
-     * @throws IOException
-     */
-    public void cacheGroupsAdd(Set<String> groups) throws IOException;
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/9e13a356/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedUnixGroupsMapping.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedUnixGroupsMapping.java b/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedUnixGroupsMapping.java
index b8c8323..438b938 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedUnixGroupsMapping.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedUnixGroupsMapping.java
@@ -23,7 +23,10 @@ import java.util.Set;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.StringTokenizer;
+import backtype.storm.Config;
+import backtype.storm.utils.Utils;
 import backtype.storm.utils.ShellUtils;
+import backtype.storm.utils.TimeCacheMap;
 import backtype.storm.utils.ShellUtils.ExitCodeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,12 +36,17 @@ public class ShellBasedUnixGroupsMapping implements
                                              IGroupMappingServiceProvider {
 
     public static Logger LOG = LoggerFactory.getLogger(ShellBasedUnixGroupsMapping.class);
+    public TimeCacheMap<String, Set<String>> cachedGroups;
 
     /**
      * Invoked once immediately after construction
      * @param storm_conf Storm configuration
      */
-    public void prepare(Map storm_conf) {}
+    @Override
+    public void prepare(Map storm_conf) {
+        int timeout = Utils.getInt(storm_conf.get(Config.STORM_GROUP_MAPPING_SERVICE_CACHE_DURATION_SECS));
+        cachedGroups = new TimeCacheMap<String, Set<String>>(timeout);
+    }
 
     /**
      * Returns list of groups for a user
@@ -48,15 +56,13 @@ public class ShellBasedUnixGroupsMapping implements
      */
     @Override
     public Set<String> getGroups(String user) throws IOException {
-        return getUnixGroups(user);
-    }
-
-    @Override
-    public void cacheGroupsRefresh() throws IOException {
-    }
-
-    @Override
-    public void cacheGroupsAdd(Set<String> groups) throws IOException {
+        if(cachedGroups.containsKey(user)) {
+            return cachedGroups.get(user);
+        }
+        Set<String> groups = getUnixGroups(user);
+        if(!groups.isEmpty())
+            cachedGroups.put(user,groups);
+        return groups;
     }
 
     /**