You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/29 00:19:43 UTC

[2/4] git commit: STORM-347. (Security) authentication should allow for groups not just users. Added Bobby's suggested changes.

STORM-347. (Security) authentication should allow for groups not just
users. Added Bobby's suggested changes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/9e13a356
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/9e13a356
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/9e13a356

Branch: refs/heads/security
Commit: 9e13a356268e1e8bf76ac29c42a441a0f49108be
Parents: f3112fa
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Mon Jul 21 15:13:35 2014 -0700
Committer: Sriharsha Chintalapani <ma...@harsha.io>
Committed: Mon Jul 21 15:13:35 2014 -0700

----------------------------------------------------------------------
 conf/defaults.yaml                              |   3 +
 storm-core/src/jvm/backtype/storm/Config.java   | 170 ++++++++++---------
 .../auth/IGroupMappingServiceProvider.java      |  12 --
 .../auth/ShellBasedUnixGroupsMapping.java       |  26 +--
 4 files changed, 107 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/9e13a356/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 83b7b4d..b0ebb4d 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -154,6 +154,9 @@ storm.messaging.netty.transfer.batch.size: 262144
 # We check with this interval that whether the Netty channel is writable and try to write pending messages if it is.
 storm.messaging.netty.flush.check.interval.ms: 10
 
+# default number of seconds group mapping service will cache user group
+storm.group.mapping.service.cache.duration.secs: 120
+
 ### topology.* configs are for specific executing storms
 topology.enable.message.timeouts: true
 topology.debug: false

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/9e13a356/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index fee5f6e..ea54313 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -28,11 +28,11 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * Topology configs are specified as a plain old map. This class provides a 
- * convenient way to create a topology config map by providing setter methods for 
- * all the configs that can be set. It also makes it easier to do things like add 
+ * Topology configs are specified as a plain old map. This class provides a
+ * convenient way to create a topology config map by providing setter methods for
+ * all the configs that can be set. It also makes it easier to do things like add
  * serializations.
- * 
+ *
  * <p>This class also provides constants for all the configurations possible on
  * a Storm cluster and Storm topology. Each constant is paired with a schema
  * that defines the validity criterion of the corresponding field. Default
@@ -40,7 +40,7 @@ import java.util.Map;
  *
  * <p>Note that you may put other configurations in any of the configs. Storm
  * will ignore anything it doesn't recognize, but your topologies are free to make
- * use of them by reading them in the prepare method of Bolts or the open method of 
+ * use of them by reading them in the prepare method of Bolts or the open method of
  * Spouts.</p>
  */
 public class Config extends HashMap<String, Object> {
@@ -60,39 +60,39 @@ public class Config extends HashMap<String, Object> {
     /**
      * Netty based messaging: The buffer size for send/recv buffer
      */
-    public static final String STORM_MESSAGING_NETTY_BUFFER_SIZE = "storm.messaging.netty.buffer_size"; 
+    public static final String STORM_MESSAGING_NETTY_BUFFER_SIZE = "storm.messaging.netty.buffer_size";
     public static final Object STORM_MESSAGING_NETTY_BUFFER_SIZE_SCHEMA = Number.class;
 
     /**
      * Netty based messaging: The max # of retries that a peer will perform when a remote is not accessible
      */
-    public static final String STORM_MESSAGING_NETTY_MAX_RETRIES = "storm.messaging.netty.max_retries"; 
+    public static final String STORM_MESSAGING_NETTY_MAX_RETRIES = "storm.messaging.netty.max_retries";
     public static final Object STORM_MESSAGING_NETTY_MAX_RETRIES_SCHEMA = Number.class;
 
     /**
-     * Netty based messaging: The min # of milliseconds that a peer will wait. 
+     * Netty based messaging: The min # of milliseconds that a peer will wait.
      */
-    public static final String STORM_MESSAGING_NETTY_MIN_SLEEP_MS = "storm.messaging.netty.min_wait_ms"; 
+    public static final String STORM_MESSAGING_NETTY_MIN_SLEEP_MS = "storm.messaging.netty.min_wait_ms";
     public static final Object STORM_MESSAGING_NETTY_MIN_SLEEP_MS_SCHEMA = Number.class;
 
     /**
-     * Netty based messaging: The max # of milliseconds that a peer will wait. 
+     * Netty based messaging: The max # of milliseconds that a peer will wait.
      */
-    public static final String STORM_MESSAGING_NETTY_MAX_SLEEP_MS = "storm.messaging.netty.max_wait_ms"; 
+    public static final String STORM_MESSAGING_NETTY_MAX_SLEEP_MS = "storm.messaging.netty.max_wait_ms";
     public static final Object STORM_MESSAGING_NETTY_MAX_SLEEP_MS_SCHEMA = Number.class;
 
     /**
      * Netty based messaging: The # of worker threads for the server.
      */
-    public static final String STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS = "storm.messaging.netty.server_worker_threads"; 
+    public static final String STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS = "storm.messaging.netty.server_worker_threads";
     public static final Object STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS_SCHEMA = Number.class;
 
     /**
      * Netty based messaging: The # of worker threads for the client.
      */
-    public static final String STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS = "storm.messaging.netty.client_worker_threads"; 
+    public static final String STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS = "storm.messaging.netty.client_worker_threads";
     public static final Object STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS_SCHEMA = Number.class;
-    
+
     /**
      * If the Netty messaging layer is busy, the Netty client will try to batch message as more as possible up to the size of STORM_NETTY_MESSAGE_BATCH_SIZE bytes
      */
@@ -104,8 +104,8 @@ public class Config extends HashMap<String, Object> {
      */
     public static final String STORM_NETTY_FLUSH_CHECK_INTERVAL_MS = "storm.messaging.netty.flush.check.interval.ms";
     public static final Object STORM_NETTY_FLUSH_CHECK_INTERVAL_MS_SCHEMA = Number.class;
-    
-    
+
+
     /**
      * A list of hosts of ZooKeeper servers used to manage the cluster.
      */
@@ -128,7 +128,7 @@ public class Config extends HashMap<String, Object> {
 
     /**
      * A global task scheduler used to assign topologies's tasks to supervisors' wokers.
-     * 
+     *
      * If this is not set, a default system scheduler will be used.
      */
     public static final String STORM_SCHEDULER = "storm.scheduler";
@@ -141,9 +141,9 @@ public class Config extends HashMap<String, Object> {
     public static final Object STORM_CLUSTER_MODE_SCHEMA = String.class;
 
     /**
-     * The hostname the supervisors/workers should report to nimbus. If unset, Storm will 
+     * The hostname the supervisors/workers should report to nimbus. If unset, Storm will
      * get the hostname to report by calling <code>InetAddress.getLocalHost().getCanonicalHostName()</code>.
-     * 
+     *
      * You should set this config when you dont have a DNS which supervisors/workers
      * can utilize to find each other based on hostname got from calls to
      * <code>InetAddress.getLocalHost().getCanonicalHostName()</code>.
@@ -164,13 +164,19 @@ public class Config extends HashMap<String, Object> {
     public static final Object STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN_SCHEMA = String.class;
 
     /**
+     * Max no.of seconds group mapping service will cache user groups
+     */
+    public static final String STORM_GROUP_MAPPING_SERVICE_CACHE_DURATION_SECS = "storm.group.mapping.service.cache.duration.secs";
+    public static final Object STORM_GROUP_MAPPING_SERVICE_CACHE_DURATION_SECS_SCHEMA = Number.class;
+
+    /**
      * The default transport plug-in for Thrift client/server communication
      */
     public static final String STORM_THRIFT_TRANSPORT_PLUGIN = "storm.thrift.transport";
     public static final Object STORM_THRIFT_TRANSPORT_PLUGIN_SCHEMA = String.class;
 
     /**
-     * The serializer class for ListDelegate (tuple payload). 
+     * The serializer class for ListDelegate (tuple payload).
      * The default serializer will be ListDelegateSerializer
      */
     public static final String TOPOLOGY_TUPLE_SERIALIZER = "topology.tuple.serializer";
@@ -184,9 +190,9 @@ public class Config extends HashMap<String, Object> {
     public static final Object TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE_SCHEMA = Boolean.class;
 
     /**
-     * Whether or not to use ZeroMQ for messaging in local mode. If this is set 
-     * to false, then Storm will use a pure-Java messaging system. The purpose 
-     * of this flag is to make it easy to run Storm in local mode by eliminating 
+     * Whether or not to use ZeroMQ for messaging in local mode. If this is set
+     * to false, then Storm will use a pure-Java messaging system. The purpose
+     * of this flag is to make it easy to run Storm in local mode by eliminating
      * the need for native dependencies, which can be difficult to install.
      *
      * Defaults to false.
@@ -271,7 +277,7 @@ public class Config extends HashMap<String, Object> {
      */
     public static final String STORM_NIMBUS_RETRY_TIMES="storm.nimbus.retry.times";
     public static final Object STORM_NIMBUS_RETRY_TIMES_SCHEMA = Number.class;
-    
+
     /**
      * The starting interval between exponential backoff retries of a Nimbus operation.
      */
@@ -311,15 +317,15 @@ public class Config extends HashMap<String, Object> {
 
     /**
      * A list of users that are cluster admins and can run any command.  To use this set
-     * nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer  
+     * nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
      */
     public static final String NIMBUS_ADMINS = "nimbus.admins";
     public static final Object NIMBUS_ADMINS_SCHEMA = ConfigValidation.StringsValidator;
 
     /**
-     * A list of users that run the supervisors and should be authorized to interact with 
+     * A list of users that run the supervisors and should be authorized to interact with
      * nimbus as a supervisor would.  To use this set
-     * nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer  
+     * nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
      */
     public static final String NIMBUS_SUPERVISOR_USERS = "nimbus.supervisor.users";
     public static final Object NIMBUS_SUPERVISOR_USERS_SCHEMA = ConfigValidation.StringsValidator;
@@ -392,7 +398,7 @@ public class Config extends HashMap<String, Object> {
     public static final Object NIMBUS_TASK_LAUNCH_SECS_SCHEMA = Number.class;
 
     /**
-     * Whether or not nimbus should reassign tasks if it detects that a task goes down. 
+     * Whether or not nimbus should reassign tasks if it detects that a task goes down.
      * Defaults to true, and it's not recommended to change this value.
      */
     public static final String NIMBUS_REASSIGN = "nimbus.reassign";
@@ -462,7 +468,7 @@ public class Config extends HashMap<String, Object> {
     public static final Object LOGVIEWER_CLEANUP_AGE_MINS_SCHEMA = ConfigValidation.PositiveIntegerValidator;
 
     /**
-     * A list of users allowed to view logs via the Log Viewer 
+     * A list of users allowed to view logs via the Log Viewer
      */
     public static final String LOGS_USERS = "logs.users";
     public static final Object LOGS_USERS_SCHEMA = ConfigValidation.StringsValidator;
@@ -594,7 +600,7 @@ public class Config extends HashMap<String, Object> {
     public static final Object DRPC_AUTHORIZER_ACL_STRICT_SCHEMA = Boolean.class;
 
     /**
-     * DRPC thrift server worker threads 
+     * DRPC thrift server worker threads
      */
     public static final String DRPC_WORKER_THREADS = "drpc.worker.threads";
     public static final Object DRPC_WORKER_THREADS_SCHEMA = Number.class;
@@ -606,7 +612,7 @@ public class Config extends HashMap<String, Object> {
     public static final Object DRPC_MAX_BUFFER_SIZE_SCHEMA = Number.class;
 
     /**
-     * DRPC thrift server queue size 
+     * DRPC thrift server queue size
      */
     public static final String DRPC_QUEUE_SIZE = "drpc.queue.size";
     public static final Object DRPC_QUEUE_SIZE_SCHEMA = Number.class;
@@ -618,13 +624,13 @@ public class Config extends HashMap<String, Object> {
     public static final Object DRPC_INVOCATIONS_THRIFT_TRANSPORT_PLUGIN_SCHEMA = String.class;
 
     /**
-     * This port on Storm DRPC is used by DRPC topologies to receive function invocations and send results back. 
+     * This port on Storm DRPC is used by DRPC topologies to receive function invocations and send results back.
      */
     public static final String DRPC_INVOCATIONS_PORT = "drpc.invocations.port";
     public static final Object DRPC_INVOCATIONS_PORT_SCHEMA = Number.class;
 
     /**
-     * DRPC invocations thrift server worker threads 
+     * DRPC invocations thrift server worker threads
      */
     public static final String DRPC_INVOCATIONS_THREADS = "drpc.invocations.threads";
     public static final Object DRPC_INVOCATIONS_THREADS_SCHEMA = Number.class;
@@ -667,7 +673,7 @@ public class Config extends HashMap<String, Object> {
      */
     public static final String SUPERVISOR_SLOTS_PORTS = "supervisor.slots.ports";
     public static final Object SUPERVISOR_SLOTS_PORTS_SCHEMA = ConfigValidation.NumbersValidator;
-    
+
     /**
      * A number representing the maximum number of workers any single topology can acquire.
      */
@@ -739,15 +745,15 @@ public class Config extends HashMap<String, Object> {
     public static final Object SUPERVISOR_MONITOR_FREQUENCY_SECS_SCHEMA = Number.class;
 
     /**
-     * Should the supervior try to run the worker as the lauching user or not.  Defaults to false. 
+     * Should the supervior try to run the worker as the lauching user or not.  Defaults to false.
      */
     public static final String SUPERVISOR_RUN_WORKER_AS_USER = "supervisor.run.worker.as.user";
     public static final Object SUPERVISOR_RUN_WORKER_AS_USER_SCHEMA = Boolean.class;
 
     /**
-     * Full path to the worker-laucher executable that will be used to lauch workers when 
+     * Full path to the worker-laucher executable that will be used to lauch workers when
      * SUPERVISOR_RUN_WORKER_AS_USER is set to true.
-     */ 
+     */
     public static final String SUPERVISOR_WORKER_LAUNCHER = "supervisor.worker.launcher";
     public static final Object SUPERVISOR_WORKER_LAUNCHER_SCHEMA = String.class;
 
@@ -772,7 +778,7 @@ public class Config extends HashMap<String, Object> {
      */
     public static final String WORKER_RECEIVER_THREAD_COUNT = "topology.worker.receiver.thread.count";
     public static final Object WORKER_RECEIVER_THREAD_COUNT_SCHEMA = Number.class;
-    
+
     /**
      * How often this worker should heartbeat to the supervisor.
      */
@@ -806,7 +812,7 @@ public class Config extends HashMap<String, Object> {
 
     /**
      * A list of users that are allowed to interact with the topology.  To use this set
-     * nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer  
+     * nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
      */
     public static final String TOPOLOGY_USERS = "topology.users";
     public static final Object TOPOLOGY_USERS_SCHEMA = ConfigValidation.StringsValidator;
@@ -843,8 +849,8 @@ public class Config extends HashMap<String, Object> {
     /**
      * How many instances to create for a spout/bolt. A task runs on a thread with zero or more
      * other tasks for the same spout/bolt. The number of tasks for a spout/bolt is always
-     * the same throughout the lifetime of a topology, but the number of executors (threads) for 
-     * a spout/bolt can change over time. This allows a topology to scale to more or less resources 
+     * the same throughout the lifetime of a topology, but the number of executors (threads) for
+     * a spout/bolt can change over time. This allows a topology to scale to more or less resources
      * without redeploying the topology or violating the constraints of Storm (such as a fields grouping
      * guaranteeing that the same value goes to the same task).
      */
@@ -883,8 +889,8 @@ public class Config extends HashMap<String, Object> {
 
     /**
      * A list of classes that customize storm's kryo instance during start-up.
-     * Each listed class name must implement IKryoDecorator. During start-up the 
-     * listed class is instantiated with 0 arguments, then its 'decorate' method 
+     * Each listed class name must implement IKryoDecorator. During start-up the
+     * listed class is instantiated with 0 arguments, then its 'decorate' method
      * is called with storm's kryo instance as the only argument.
      */
     public static final String TOPOLOGY_KRYO_DECORATORS = "topology.kryo.decorators";
@@ -914,7 +920,7 @@ public class Config extends HashMap<String, Object> {
 
     /*
      * A list of classes implementing IMetricsConsumer (See storm.yaml.example for exact config format).
-     * Each listed class will be routed all the metrics data generated by the storm metrics API. 
+     * Each listed class will be routed all the metrics data generated by the storm metrics API.
      * Each listed class maps 1:1 to a system bolt named __metrics_ClassName#N, and it's parallelism is configurable.
      */
     public static final String TOPOLOGY_METRICS_CONSUMER_REGISTER = "topology.metrics.consumer.register";
@@ -930,24 +936,24 @@ public class Config extends HashMap<String, Object> {
 
 
     /**
-     * The maximum number of tuples that can be pending on a spout task at any given time. 
-     * This config applies to individual tasks, not to spouts or topologies as a whole. 
-     * 
+     * The maximum number of tuples that can be pending on a spout task at any given time.
+     * This config applies to individual tasks, not to spouts or topologies as a whole.
+     *
      * A pending tuple is one that has been emitted from a spout but has not been acked or failed yet.
-     * Note that this config parameter has no effect for unreliable spouts that don't tag 
+     * Note that this config parameter has no effect for unreliable spouts that don't tag
      * their tuples with a message id.
      */
-    public static final String TOPOLOGY_MAX_SPOUT_PENDING="topology.max.spout.pending"; 
+    public static final String TOPOLOGY_MAX_SPOUT_PENDING="topology.max.spout.pending";
     public static final Object TOPOLOGY_MAX_SPOUT_PENDING_SCHEMA = Number.class;
 
     /**
      * A class that implements a strategy for what to do when a spout needs to wait. Waiting is
      * triggered in one of two conditions:
-     * 
+     *
      * 1. nextTuple emits no tuples
      * 2. The spout has hit maxSpoutPending and can't emit any more tuples
      */
-    public static final String TOPOLOGY_SPOUT_WAIT_STRATEGY="topology.spout.wait.strategy"; 
+    public static final String TOPOLOGY_SPOUT_WAIT_STRATEGY="topology.spout.wait.strategy";
     public static final Object TOPOLOGY_SPOUT_WAIT_STRATEGY_SCHEMA = String.class;
 
     /**
@@ -970,7 +976,7 @@ public class Config extends HashMap<String, Object> {
     public static final Object TOPOLOGY_STATS_SAMPLE_RATE_SCHEMA = Number.class;
 
     /**
-     * The time period that builtin metrics data in bucketed into. 
+     * The time period that builtin metrics data in bucketed into.
      */
     public static final String TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS="topology.builtin.metrics.bucket.size.secs";
     public static final Object TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS_SCHEMA = Number.class;
@@ -1003,7 +1009,7 @@ public class Config extends HashMap<String, Object> {
 
     /**
      * A list of task hooks that are automatically added to every spout and bolt in the topology. An example
-     * of when you'd do this is to add a hook that integrates with your internal 
+     * of when you'd do this is to add a hook that integrates with your internal
      * monitoring system. These hooks are instantiated using the zero-arg constructor.
      */
     public static final String TOPOLOGY_AUTO_TASK_HOOKS="topology.auto.task.hooks";
@@ -1017,7 +1023,7 @@ public class Config extends HashMap<String, Object> {
     public static final Object TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE_SCHEMA = ConfigValidation.PowerOf2Validator;
 
     /**
-     * The maximum number of messages to batch from the thread receiving off the network to the 
+     * The maximum number of messages to batch from the thread receiving off the network to the
      * executor queues. Must be a power of 2.
      */
     public static final String TOPOLOGY_RECEIVER_BUFFER_SIZE="topology.receiver.buffer.size";
@@ -1051,14 +1057,14 @@ public class Config extends HashMap<String, Object> {
     public static final Object TOPOLOGY_DISRUPTOR_WAIT_STRATEGY_SCHEMA = String.class;
 
    /**
-    * The size of the shared thread pool for worker tasks to make use of. The thread pool can be accessed 
+    * The size of the shared thread pool for worker tasks to make use of. The thread pool can be accessed
     * via the TopologyContext.
     */
     public static final String TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE="topology.worker.shared.thread.pool.size";
     public static final Object TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE_SCHEMA = Number.class;
 
     /**
-     * The interval in seconds to use for determining whether to throttle error reported to Zookeeper. For example, 
+     * The interval in seconds to use for determining whether to throttle error reported to Zookeeper. For example,
      * an interval of 10 seconds with topology.max.error.report.per.interval set to 5 will only allow 5 errors to be
      * reported to Zookeeper per task for every 10 second interval of time.
      */
@@ -1081,7 +1087,7 @@ public class Config extends HashMap<String, Object> {
     /**
      * Name of the topology. This config is automatically set by Storm when the topology is submitted.
      */
-    public final static String TOPOLOGY_NAME="topology.name";  
+    public final static String TOPOLOGY_NAME="topology.name";
     public static final Object TOPOLOGY_NAME_SCHEMA = String.class;
 
     /**
@@ -1095,19 +1101,19 @@ public class Config extends HashMap<String, Object> {
      */
     public static final String TOPOLOGY_SUBMITTER_USER = "topology.submitter.user";
     public static final Object TOPOLOGY_SUBMITTER_USER_SCHEMA = String.class;
-    
+
     /**
      * Array of components that scheduler should try to place on separate hosts.
      */
     public static final String TOPOLOGY_SPREAD_COMPONENTS = "topology.spread.components";
     public static final Object TOPOLOGY_SPREAD_COMPONENTS_SCHEMA = ConfigValidation.StringsValidator;
-   
+
     /**
      * A list of IAutoCredentials that the topology should load and use.
      */
     public static final String TOPOLOGY_AUTO_CREDENTIALS = "topology.auto-credentials";
     public static final Object TOPOLOGY_AUTO_CREDENTIALS_SCHEMA = ConfigValidation.StringsValidator;
- 
+
     /**
      * Max pending tuples in one ShellBolt
      */
@@ -1157,9 +1163,9 @@ public class Config extends HashMap<String, Object> {
 
     /**
      * This value is passed to spawned JVMs (e.g., Nimbus, Supervisor, and Workers)
-     * for the java.library.path value. java.library.path tells the JVM where 
+     * for the java.library.path value. java.library.path tells the JVM where
      * to look for native libraries. It is necessary to set this config correctly since
-     * Storm uses the ZeroMQ and JZMQ native libs. 
+     * Storm uses the ZeroMQ and JZMQ native libs.
      */
     public static final String JAVA_LIBRARY_PATH = "java.library.path";
     public static final Object JAVA_LIBRARY_PATH_SCHEMA = String.class;
@@ -1178,7 +1184,7 @@ public class Config extends HashMap<String, Object> {
      */
     public static final String ISOLATION_SCHEDULER_MACHINES = "isolation.scheduler.machines";
     public static final Object ISOLATION_SCHEDULER_MACHINES_SCHEMA = ConfigValidation.MapOfStringToNumberValidator;
-    
+
     /**
      * A map from the user name to the number of machines that should that user is allowed to use. Set storm.scheduler
      * to backtype.storm.scheduler.multitenant.MultitenantScheduler
@@ -1192,15 +1198,15 @@ public class Config extends HashMap<String, Object> {
      */
     public static final String TOPOLOGY_ISOLATED_MACHINES = "topology.isolate.machines";
     public static final Object TOPOLOGY_ISOLATED_MACHINES_SCHEMA = Number.class;
-    
+
     public static void setDebug(Map conf, boolean isOn) {
         conf.put(Config.TOPOLOGY_DEBUG, isOn);
-    } 
+    }
 
     public void setDebug(boolean isOn) {
         setDebug(this, isOn);
     }
-    
+
     public static void setNumWorkers(Map conf, int workers) {
         conf.put(Config.TOPOLOGY_WORKERS, workers);
     }
@@ -1216,7 +1222,7 @@ public class Config extends HashMap<String, Object> {
     public void setNumAckers(int numExecutors) {
         setNumAckers(this, numExecutors);
     }
-    
+
     public static void setMessageTimeoutSecs(Map conf, int secs) {
         conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, secs);
     }
@@ -1224,7 +1230,7 @@ public class Config extends HashMap<String, Object> {
     public void setMessageTimeoutSecs(int secs) {
         setMessageTimeoutSecs(this, secs);
     }
-    
+
     public static void registerSerialization(Map conf, Class klass) {
         getRegisteredSerializations(conf).add(klass.getName());
     }
@@ -1232,17 +1238,17 @@ public class Config extends HashMap<String, Object> {
     public void registerSerialization(Class klass) {
         registerSerialization(this, klass);
     }
-    
+
     public static void registerSerialization(Map conf, Class klass, Class<? extends Serializer> serializerClass) {
         Map<String, String> register = new HashMap<String, String>();
         register.put(klass.getName(), serializerClass.getName());
-        getRegisteredSerializations(conf).add(register);        
+        getRegisteredSerializations(conf).add(register);
     }
 
     public void registerSerialization(Class klass, Class<? extends Serializer> serializerClass) {
         registerSerialization(this, klass, serializerClass);
     }
-    
+
     public void registerMetricsConsumer(Class klass, Object argument, long parallelismHint) {
         HashMap m = new HashMap();
         m.put("class", klass.getCanonicalName());
@@ -1270,7 +1276,7 @@ public class Config extends HashMap<String, Object> {
     public void registerDecorator(Class<? extends IKryoDecorator> klass) {
         registerDecorator(this, klass);
     }
-    
+
     public static void setKryoFactory(Map conf, Class<? extends IKryoFactory> klass) {
         conf.put(Config.TOPOLOGY_KRYO_FACTORY, klass.getName());
     }
@@ -1286,7 +1292,7 @@ public class Config extends HashMap<String, Object> {
     public void setSkipMissingKryoRegistrations(boolean skip) {
        setSkipMissingKryoRegistrations(this, skip);
     }
-    
+
     public static void setMaxTaskParallelism(Map conf, int max) {
         conf.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, max);
     }
@@ -1294,7 +1300,7 @@ public class Config extends HashMap<String, Object> {
     public void setMaxTaskParallelism(int max) {
         setMaxTaskParallelism(this, max);
     }
-    
+
     public static void setMaxSpoutPending(Map conf, int max) {
         conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, max);
     }
@@ -1302,23 +1308,23 @@ public class Config extends HashMap<String, Object> {
     public void setMaxSpoutPending(int max) {
         setMaxSpoutPending(this, max);
     }
-    
+
     public static void setStatsSampleRate(Map conf, double rate) {
         conf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, rate);
-    }    
+    }
 
     public void setStatsSampleRate(double rate) {
         setStatsSampleRate(this, rate);
-    }    
+    }
 
     public static void setFallBackOnJavaSerialization(Map conf, boolean fallback) {
         conf.put(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION, fallback);
-    }    
+    }
 
     public void setFallBackOnJavaSerialization(boolean fallback) {
         setFallBackOnJavaSerialization(this, fallback);
-    }    
-    
+    }
+
     private static List getRegisteredSerializations(Map conf) {
         List ret;
         if(!conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) {
@@ -1329,13 +1335,13 @@ public class Config extends HashMap<String, Object> {
         conf.put(Config.TOPOLOGY_KRYO_REGISTER, ret);
         return ret;
     }
-    
+
     private static List getRegisteredDecorators(Map conf) {
         List ret;
         if(!conf.containsKey(Config.TOPOLOGY_KRYO_DECORATORS)) {
             ret = new ArrayList();
         } else {
-            ret = new ArrayList((List) conf.get(Config.TOPOLOGY_KRYO_DECORATORS));            
+            ret = new ArrayList((List) conf.get(Config.TOPOLOGY_KRYO_DECORATORS));
         }
         conf.put(Config.TOPOLOGY_KRYO_DECORATORS, ret);
         return ret;

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/9e13a356/storm-core/src/jvm/backtype/storm/security/auth/IGroupMappingServiceProvider.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/IGroupMappingServiceProvider.java b/storm-core/src/jvm/backtype/storm/security/auth/IGroupMappingServiceProvider.java
index 0b49dec..5590b81 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/IGroupMappingServiceProvider.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/IGroupMappingServiceProvider.java
@@ -39,16 +39,4 @@ public interface IGroupMappingServiceProvider {
      */
     public Set<String> getGroups(String user) throws IOException;
 
-    /**
-     * Refresh the cache of groups and user mapping
-     * @throws IOException
-     */
-    public void cacheGroupsRefresh() throws IOException;
-    /**
-     * Caches the group user information
-     * @param groups list of groups to add to cache
-     * @throws IOException
-     */
-    public void cacheGroupsAdd(Set<String> groups) throws IOException;
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/9e13a356/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedUnixGroupsMapping.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedUnixGroupsMapping.java b/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedUnixGroupsMapping.java
index b8c8323..438b938 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedUnixGroupsMapping.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedUnixGroupsMapping.java
@@ -23,7 +23,10 @@ import java.util.Set;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.StringTokenizer;
+import backtype.storm.Config;
+import backtype.storm.utils.Utils;
 import backtype.storm.utils.ShellUtils;
+import backtype.storm.utils.TimeCacheMap;
 import backtype.storm.utils.ShellUtils.ExitCodeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,12 +36,17 @@ public class ShellBasedUnixGroupsMapping implements
                                              IGroupMappingServiceProvider {
 
     public static Logger LOG = LoggerFactory.getLogger(ShellBasedUnixGroupsMapping.class);
+    public TimeCacheMap<String, Set<String>> cachedGroups;
 
     /**
      * Invoked once immediately after construction
      * @param storm_conf Storm configuration
      */
-    public void prepare(Map storm_conf) {}
+    @Override
+    public void prepare(Map storm_conf) {
+        int timeout = Utils.getInt(storm_conf.get(Config.STORM_GROUP_MAPPING_SERVICE_CACHE_DURATION_SECS));
+        cachedGroups = new TimeCacheMap<String, Set<String>>(timeout);
+    }
 
     /**
      * Returns list of groups for a user
@@ -48,15 +56,13 @@ public class ShellBasedUnixGroupsMapping implements
      */
     @Override
     public Set<String> getGroups(String user) throws IOException {
-        return getUnixGroups(user);
-    }
-
-    @Override
-    public void cacheGroupsRefresh() throws IOException {
-    }
-
-    @Override
-    public void cacheGroupsAdd(Set<String> groups) throws IOException {
+        if(cachedGroups.containsKey(user)) {
+            return cachedGroups.get(user);
+        }
+        Set<String> groups = getUnixGroups(user);
+        if(!groups.isEmpty())
+            cachedGroups.put(user,groups);
+        return groups;
     }
 
     /**