You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/29 00:19:43 UTC
[2/4] git commit: STORM-347. (Security) authentication should allow
for groups not just users. Added Bobby's suggested changes.
STORM-347. (Security) authentication should allow for groups not just
users. Added Bobby's suggested changes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/9e13a356
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/9e13a356
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/9e13a356
Branch: refs/heads/security
Commit: 9e13a356268e1e8bf76ac29c42a441a0f49108be
Parents: f3112fa
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Mon Jul 21 15:13:35 2014 -0700
Committer: Sriharsha Chintalapani <ma...@harsha.io>
Committed: Mon Jul 21 15:13:35 2014 -0700
----------------------------------------------------------------------
conf/defaults.yaml | 3 +
storm-core/src/jvm/backtype/storm/Config.java | 170 ++++++++++---------
.../auth/IGroupMappingServiceProvider.java | 12 --
.../auth/ShellBasedUnixGroupsMapping.java | 26 +--
4 files changed, 107 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/9e13a356/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 83b7b4d..b0ebb4d 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -154,6 +154,9 @@ storm.messaging.netty.transfer.batch.size: 262144
# We check with this interval that whether the Netty channel is writable and try to write pending messages if it is.
storm.messaging.netty.flush.check.interval.ms: 10
+# default number of seconds group mapping service will cache user group
+storm.group.mapping.service.cache.duration.secs: 120
+
### topology.* configs are for specific executing storms
topology.enable.message.timeouts: true
topology.debug: false
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/9e13a356/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index fee5f6e..ea54313 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -28,11 +28,11 @@ import java.util.List;
import java.util.Map;
/**
- * Topology configs are specified as a plain old map. This class provides a
- * convenient way to create a topology config map by providing setter methods for
- * all the configs that can be set. It also makes it easier to do things like add
+ * Topology configs are specified as a plain old map. This class provides a
+ * convenient way to create a topology config map by providing setter methods for
+ * all the configs that can be set. It also makes it easier to do things like add
* serializations.
- *
+ *
* <p>This class also provides constants for all the configurations possible on
* a Storm cluster and Storm topology. Each constant is paired with a schema
* that defines the validity criterion of the corresponding field. Default
@@ -40,7 +40,7 @@ import java.util.Map;
*
* <p>Note that you may put other configurations in any of the configs. Storm
* will ignore anything it doesn't recognize, but your topologies are free to make
- * use of them by reading them in the prepare method of Bolts or the open method of
+ * use of them by reading them in the prepare method of Bolts or the open method of
* Spouts.</p>
*/
public class Config extends HashMap<String, Object> {
@@ -60,39 +60,39 @@ public class Config extends HashMap<String, Object> {
/**
* Netty based messaging: The buffer size for send/recv buffer
*/
- public static final String STORM_MESSAGING_NETTY_BUFFER_SIZE = "storm.messaging.netty.buffer_size";
+ public static final String STORM_MESSAGING_NETTY_BUFFER_SIZE = "storm.messaging.netty.buffer_size";
public static final Object STORM_MESSAGING_NETTY_BUFFER_SIZE_SCHEMA = Number.class;
/**
* Netty based messaging: The max # of retries that a peer will perform when a remote is not accessible
*/
- public static final String STORM_MESSAGING_NETTY_MAX_RETRIES = "storm.messaging.netty.max_retries";
+ public static final String STORM_MESSAGING_NETTY_MAX_RETRIES = "storm.messaging.netty.max_retries";
public static final Object STORM_MESSAGING_NETTY_MAX_RETRIES_SCHEMA = Number.class;
/**
- * Netty based messaging: The min # of milliseconds that a peer will wait.
+ * Netty based messaging: The min # of milliseconds that a peer will wait.
*/
- public static final String STORM_MESSAGING_NETTY_MIN_SLEEP_MS = "storm.messaging.netty.min_wait_ms";
+ public static final String STORM_MESSAGING_NETTY_MIN_SLEEP_MS = "storm.messaging.netty.min_wait_ms";
public static final Object STORM_MESSAGING_NETTY_MIN_SLEEP_MS_SCHEMA = Number.class;
/**
- * Netty based messaging: The max # of milliseconds that a peer will wait.
+ * Netty based messaging: The max # of milliseconds that a peer will wait.
*/
- public static final String STORM_MESSAGING_NETTY_MAX_SLEEP_MS = "storm.messaging.netty.max_wait_ms";
+ public static final String STORM_MESSAGING_NETTY_MAX_SLEEP_MS = "storm.messaging.netty.max_wait_ms";
public static final Object STORM_MESSAGING_NETTY_MAX_SLEEP_MS_SCHEMA = Number.class;
/**
* Netty based messaging: The # of worker threads for the server.
*/
- public static final String STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS = "storm.messaging.netty.server_worker_threads";
+ public static final String STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS = "storm.messaging.netty.server_worker_threads";
public static final Object STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS_SCHEMA = Number.class;
/**
* Netty based messaging: The # of worker threads for the client.
*/
- public static final String STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS = "storm.messaging.netty.client_worker_threads";
+ public static final String STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS = "storm.messaging.netty.client_worker_threads";
public static final Object STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS_SCHEMA = Number.class;
-
+
/**
* If the Netty messaging layer is busy, the Netty client will try to batch message as more as possible up to the size of STORM_NETTY_MESSAGE_BATCH_SIZE bytes
*/
@@ -104,8 +104,8 @@ public class Config extends HashMap<String, Object> {
*/
public static final String STORM_NETTY_FLUSH_CHECK_INTERVAL_MS = "storm.messaging.netty.flush.check.interval.ms";
public static final Object STORM_NETTY_FLUSH_CHECK_INTERVAL_MS_SCHEMA = Number.class;
-
-
+
+
/**
* A list of hosts of ZooKeeper servers used to manage the cluster.
*/
@@ -128,7 +128,7 @@ public class Config extends HashMap<String, Object> {
/**
* A global task scheduler used to assign topologies's tasks to supervisors' wokers.
- *
+ *
* If this is not set, a default system scheduler will be used.
*/
public static final String STORM_SCHEDULER = "storm.scheduler";
@@ -141,9 +141,9 @@ public class Config extends HashMap<String, Object> {
public static final Object STORM_CLUSTER_MODE_SCHEMA = String.class;
/**
- * The hostname the supervisors/workers should report to nimbus. If unset, Storm will
+ * The hostname the supervisors/workers should report to nimbus. If unset, Storm will
* get the hostname to report by calling <code>InetAddress.getLocalHost().getCanonicalHostName()</code>.
- *
+ *
* You should set this config when you dont have a DNS which supervisors/workers
* can utilize to find each other based on hostname got from calls to
* <code>InetAddress.getLocalHost().getCanonicalHostName()</code>.
@@ -164,13 +164,19 @@ public class Config extends HashMap<String, Object> {
public static final Object STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN_SCHEMA = String.class;
/**
+ * Max no.of seconds group mapping service will cache user groups
+ */
+ public static final String STORM_GROUP_MAPPING_SERVICE_CACHE_DURATION_SECS = "storm.group.mapping.service.cache.duration.secs";
+ public static final Object STORM_GROUP_MAPPING_SERVICE_CACHE_DURATION_SECS_SCHEMA = Number.class;
+
+ /**
* The default transport plug-in for Thrift client/server communication
*/
public static final String STORM_THRIFT_TRANSPORT_PLUGIN = "storm.thrift.transport";
public static final Object STORM_THRIFT_TRANSPORT_PLUGIN_SCHEMA = String.class;
/**
- * The serializer class for ListDelegate (tuple payload).
+ * The serializer class for ListDelegate (tuple payload).
* The default serializer will be ListDelegateSerializer
*/
public static final String TOPOLOGY_TUPLE_SERIALIZER = "topology.tuple.serializer";
@@ -184,9 +190,9 @@ public class Config extends HashMap<String, Object> {
public static final Object TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE_SCHEMA = Boolean.class;
/**
- * Whether or not to use ZeroMQ for messaging in local mode. If this is set
- * to false, then Storm will use a pure-Java messaging system. The purpose
- * of this flag is to make it easy to run Storm in local mode by eliminating
+ * Whether or not to use ZeroMQ for messaging in local mode. If this is set
+ * to false, then Storm will use a pure-Java messaging system. The purpose
+ * of this flag is to make it easy to run Storm in local mode by eliminating
* the need for native dependencies, which can be difficult to install.
*
* Defaults to false.
@@ -271,7 +277,7 @@ public class Config extends HashMap<String, Object> {
*/
public static final String STORM_NIMBUS_RETRY_TIMES="storm.nimbus.retry.times";
public static final Object STORM_NIMBUS_RETRY_TIMES_SCHEMA = Number.class;
-
+
/**
* The starting interval between exponential backoff retries of a Nimbus operation.
*/
@@ -311,15 +317,15 @@ public class Config extends HashMap<String, Object> {
/**
* A list of users that are cluster admins and can run any command. To use this set
- * nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
+ * nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
*/
public static final String NIMBUS_ADMINS = "nimbus.admins";
public static final Object NIMBUS_ADMINS_SCHEMA = ConfigValidation.StringsValidator;
/**
- * A list of users that run the supervisors and should be authorized to interact with
+ * A list of users that run the supervisors and should be authorized to interact with
* nimbus as a supervisor would. To use this set
- * nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
+ * nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
*/
public static final String NIMBUS_SUPERVISOR_USERS = "nimbus.supervisor.users";
public static final Object NIMBUS_SUPERVISOR_USERS_SCHEMA = ConfigValidation.StringsValidator;
@@ -392,7 +398,7 @@ public class Config extends HashMap<String, Object> {
public static final Object NIMBUS_TASK_LAUNCH_SECS_SCHEMA = Number.class;
/**
- * Whether or not nimbus should reassign tasks if it detects that a task goes down.
+ * Whether or not nimbus should reassign tasks if it detects that a task goes down.
* Defaults to true, and it's not recommended to change this value.
*/
public static final String NIMBUS_REASSIGN = "nimbus.reassign";
@@ -462,7 +468,7 @@ public class Config extends HashMap<String, Object> {
public static final Object LOGVIEWER_CLEANUP_AGE_MINS_SCHEMA = ConfigValidation.PositiveIntegerValidator;
/**
- * A list of users allowed to view logs via the Log Viewer
+ * A list of users allowed to view logs via the Log Viewer
*/
public static final String LOGS_USERS = "logs.users";
public static final Object LOGS_USERS_SCHEMA = ConfigValidation.StringsValidator;
@@ -594,7 +600,7 @@ public class Config extends HashMap<String, Object> {
public static final Object DRPC_AUTHORIZER_ACL_STRICT_SCHEMA = Boolean.class;
/**
- * DRPC thrift server worker threads
+ * DRPC thrift server worker threads
*/
public static final String DRPC_WORKER_THREADS = "drpc.worker.threads";
public static final Object DRPC_WORKER_THREADS_SCHEMA = Number.class;
@@ -606,7 +612,7 @@ public class Config extends HashMap<String, Object> {
public static final Object DRPC_MAX_BUFFER_SIZE_SCHEMA = Number.class;
/**
- * DRPC thrift server queue size
+ * DRPC thrift server queue size
*/
public static final String DRPC_QUEUE_SIZE = "drpc.queue.size";
public static final Object DRPC_QUEUE_SIZE_SCHEMA = Number.class;
@@ -618,13 +624,13 @@ public class Config extends HashMap<String, Object> {
public static final Object DRPC_INVOCATIONS_THRIFT_TRANSPORT_PLUGIN_SCHEMA = String.class;
/**
- * This port on Storm DRPC is used by DRPC topologies to receive function invocations and send results back.
+ * This port on Storm DRPC is used by DRPC topologies to receive function invocations and send results back.
*/
public static final String DRPC_INVOCATIONS_PORT = "drpc.invocations.port";
public static final Object DRPC_INVOCATIONS_PORT_SCHEMA = Number.class;
/**
- * DRPC invocations thrift server worker threads
+ * DRPC invocations thrift server worker threads
*/
public static final String DRPC_INVOCATIONS_THREADS = "drpc.invocations.threads";
public static final Object DRPC_INVOCATIONS_THREADS_SCHEMA = Number.class;
@@ -667,7 +673,7 @@ public class Config extends HashMap<String, Object> {
*/
public static final String SUPERVISOR_SLOTS_PORTS = "supervisor.slots.ports";
public static final Object SUPERVISOR_SLOTS_PORTS_SCHEMA = ConfigValidation.NumbersValidator;
-
+
/**
* A number representing the maximum number of workers any single topology can acquire.
*/
@@ -739,15 +745,15 @@ public class Config extends HashMap<String, Object> {
public static final Object SUPERVISOR_MONITOR_FREQUENCY_SECS_SCHEMA = Number.class;
/**
- * Should the supervior try to run the worker as the lauching user or not. Defaults to false.
+ * Should the supervior try to run the worker as the lauching user or not. Defaults to false.
*/
public static final String SUPERVISOR_RUN_WORKER_AS_USER = "supervisor.run.worker.as.user";
public static final Object SUPERVISOR_RUN_WORKER_AS_USER_SCHEMA = Boolean.class;
/**
- * Full path to the worker-laucher executable that will be used to lauch workers when
+ * Full path to the worker-laucher executable that will be used to lauch workers when
* SUPERVISOR_RUN_WORKER_AS_USER is set to true.
- */
+ */
public static final String SUPERVISOR_WORKER_LAUNCHER = "supervisor.worker.launcher";
public static final Object SUPERVISOR_WORKER_LAUNCHER_SCHEMA = String.class;
@@ -772,7 +778,7 @@ public class Config extends HashMap<String, Object> {
*/
public static final String WORKER_RECEIVER_THREAD_COUNT = "topology.worker.receiver.thread.count";
public static final Object WORKER_RECEIVER_THREAD_COUNT_SCHEMA = Number.class;
-
+
/**
* How often this worker should heartbeat to the supervisor.
*/
@@ -806,7 +812,7 @@ public class Config extends HashMap<String, Object> {
/**
* A list of users that are allowed to interact with the topology. To use this set
- * nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
+ * nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
*/
public static final String TOPOLOGY_USERS = "topology.users";
public static final Object TOPOLOGY_USERS_SCHEMA = ConfigValidation.StringsValidator;
@@ -843,8 +849,8 @@ public class Config extends HashMap<String, Object> {
/**
* How many instances to create for a spout/bolt. A task runs on a thread with zero or more
* other tasks for the same spout/bolt. The number of tasks for a spout/bolt is always
- * the same throughout the lifetime of a topology, but the number of executors (threads) for
- * a spout/bolt can change over time. This allows a topology to scale to more or less resources
+ * the same throughout the lifetime of a topology, but the number of executors (threads) for
+ * a spout/bolt can change over time. This allows a topology to scale to more or less resources
* without redeploying the topology or violating the constraints of Storm (such as a fields grouping
* guaranteeing that the same value goes to the same task).
*/
@@ -883,8 +889,8 @@ public class Config extends HashMap<String, Object> {
/**
* A list of classes that customize storm's kryo instance during start-up.
- * Each listed class name must implement IKryoDecorator. During start-up the
- * listed class is instantiated with 0 arguments, then its 'decorate' method
+ * Each listed class name must implement IKryoDecorator. During start-up the
+ * listed class is instantiated with 0 arguments, then its 'decorate' method
* is called with storm's kryo instance as the only argument.
*/
public static final String TOPOLOGY_KRYO_DECORATORS = "topology.kryo.decorators";
@@ -914,7 +920,7 @@ public class Config extends HashMap<String, Object> {
/*
* A list of classes implementing IMetricsConsumer (See storm.yaml.example for exact config format).
- * Each listed class will be routed all the metrics data generated by the storm metrics API.
+ * Each listed class will be routed all the metrics data generated by the storm metrics API.
* Each listed class maps 1:1 to a system bolt named __metrics_ClassName#N, and it's parallelism is configurable.
*/
public static final String TOPOLOGY_METRICS_CONSUMER_REGISTER = "topology.metrics.consumer.register";
@@ -930,24 +936,24 @@ public class Config extends HashMap<String, Object> {
/**
- * The maximum number of tuples that can be pending on a spout task at any given time.
- * This config applies to individual tasks, not to spouts or topologies as a whole.
- *
+ * The maximum number of tuples that can be pending on a spout task at any given time.
+ * This config applies to individual tasks, not to spouts or topologies as a whole.
+ *
* A pending tuple is one that has been emitted from a spout but has not been acked or failed yet.
- * Note that this config parameter has no effect for unreliable spouts that don't tag
+ * Note that this config parameter has no effect for unreliable spouts that don't tag
* their tuples with a message id.
*/
- public static final String TOPOLOGY_MAX_SPOUT_PENDING="topology.max.spout.pending";
+ public static final String TOPOLOGY_MAX_SPOUT_PENDING="topology.max.spout.pending";
public static final Object TOPOLOGY_MAX_SPOUT_PENDING_SCHEMA = Number.class;
/**
* A class that implements a strategy for what to do when a spout needs to wait. Waiting is
* triggered in one of two conditions:
- *
+ *
* 1. nextTuple emits no tuples
* 2. The spout has hit maxSpoutPending and can't emit any more tuples
*/
- public static final String TOPOLOGY_SPOUT_WAIT_STRATEGY="topology.spout.wait.strategy";
+ public static final String TOPOLOGY_SPOUT_WAIT_STRATEGY="topology.spout.wait.strategy";
public static final Object TOPOLOGY_SPOUT_WAIT_STRATEGY_SCHEMA = String.class;
/**
@@ -970,7 +976,7 @@ public class Config extends HashMap<String, Object> {
public static final Object TOPOLOGY_STATS_SAMPLE_RATE_SCHEMA = Number.class;
/**
- * The time period that builtin metrics data in bucketed into.
+ * The time period that builtin metrics data in bucketed into.
*/
public static final String TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS="topology.builtin.metrics.bucket.size.secs";
public static final Object TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS_SCHEMA = Number.class;
@@ -1003,7 +1009,7 @@ public class Config extends HashMap<String, Object> {
/**
* A list of task hooks that are automatically added to every spout and bolt in the topology. An example
- * of when you'd do this is to add a hook that integrates with your internal
+ * of when you'd do this is to add a hook that integrates with your internal
* monitoring system. These hooks are instantiated using the zero-arg constructor.
*/
public static final String TOPOLOGY_AUTO_TASK_HOOKS="topology.auto.task.hooks";
@@ -1017,7 +1023,7 @@ public class Config extends HashMap<String, Object> {
public static final Object TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE_SCHEMA = ConfigValidation.PowerOf2Validator;
/**
- * The maximum number of messages to batch from the thread receiving off the network to the
+ * The maximum number of messages to batch from the thread receiving off the network to the
* executor queues. Must be a power of 2.
*/
public static final String TOPOLOGY_RECEIVER_BUFFER_SIZE="topology.receiver.buffer.size";
@@ -1051,14 +1057,14 @@ public class Config extends HashMap<String, Object> {
public static final Object TOPOLOGY_DISRUPTOR_WAIT_STRATEGY_SCHEMA = String.class;
/**
- * The size of the shared thread pool for worker tasks to make use of. The thread pool can be accessed
+ * The size of the shared thread pool for worker tasks to make use of. The thread pool can be accessed
* via the TopologyContext.
*/
public static final String TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE="topology.worker.shared.thread.pool.size";
public static final Object TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE_SCHEMA = Number.class;
/**
- * The interval in seconds to use for determining whether to throttle error reported to Zookeeper. For example,
+ * The interval in seconds to use for determining whether to throttle error reported to Zookeeper. For example,
* an interval of 10 seconds with topology.max.error.report.per.interval set to 5 will only allow 5 errors to be
* reported to Zookeeper per task for every 10 second interval of time.
*/
@@ -1081,7 +1087,7 @@ public class Config extends HashMap<String, Object> {
/**
* Name of the topology. This config is automatically set by Storm when the topology is submitted.
*/
- public final static String TOPOLOGY_NAME="topology.name";
+ public final static String TOPOLOGY_NAME="topology.name";
public static final Object TOPOLOGY_NAME_SCHEMA = String.class;
/**
@@ -1095,19 +1101,19 @@ public class Config extends HashMap<String, Object> {
*/
public static final String TOPOLOGY_SUBMITTER_USER = "topology.submitter.user";
public static final Object TOPOLOGY_SUBMITTER_USER_SCHEMA = String.class;
-
+
/**
* Array of components that scheduler should try to place on separate hosts.
*/
public static final String TOPOLOGY_SPREAD_COMPONENTS = "topology.spread.components";
public static final Object TOPOLOGY_SPREAD_COMPONENTS_SCHEMA = ConfigValidation.StringsValidator;
-
+
/**
* A list of IAutoCredentials that the topology should load and use.
*/
public static final String TOPOLOGY_AUTO_CREDENTIALS = "topology.auto-credentials";
public static final Object TOPOLOGY_AUTO_CREDENTIALS_SCHEMA = ConfigValidation.StringsValidator;
-
+
/**
* Max pending tuples in one ShellBolt
*/
@@ -1157,9 +1163,9 @@ public class Config extends HashMap<String, Object> {
/**
* This value is passed to spawned JVMs (e.g., Nimbus, Supervisor, and Workers)
- * for the java.library.path value. java.library.path tells the JVM where
+ * for the java.library.path value. java.library.path tells the JVM where
* to look for native libraries. It is necessary to set this config correctly since
- * Storm uses the ZeroMQ and JZMQ native libs.
+ * Storm uses the ZeroMQ and JZMQ native libs.
*/
public static final String JAVA_LIBRARY_PATH = "java.library.path";
public static final Object JAVA_LIBRARY_PATH_SCHEMA = String.class;
@@ -1178,7 +1184,7 @@ public class Config extends HashMap<String, Object> {
*/
public static final String ISOLATION_SCHEDULER_MACHINES = "isolation.scheduler.machines";
public static final Object ISOLATION_SCHEDULER_MACHINES_SCHEMA = ConfigValidation.MapOfStringToNumberValidator;
-
+
/**
* A map from the user name to the number of machines that should that user is allowed to use. Set storm.scheduler
* to backtype.storm.scheduler.multitenant.MultitenantScheduler
@@ -1192,15 +1198,15 @@ public class Config extends HashMap<String, Object> {
*/
public static final String TOPOLOGY_ISOLATED_MACHINES = "topology.isolate.machines";
public static final Object TOPOLOGY_ISOLATED_MACHINES_SCHEMA = Number.class;
-
+
public static void setDebug(Map conf, boolean isOn) {
conf.put(Config.TOPOLOGY_DEBUG, isOn);
- }
+ }
public void setDebug(boolean isOn) {
setDebug(this, isOn);
}
-
+
public static void setNumWorkers(Map conf, int workers) {
conf.put(Config.TOPOLOGY_WORKERS, workers);
}
@@ -1216,7 +1222,7 @@ public class Config extends HashMap<String, Object> {
public void setNumAckers(int numExecutors) {
setNumAckers(this, numExecutors);
}
-
+
public static void setMessageTimeoutSecs(Map conf, int secs) {
conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, secs);
}
@@ -1224,7 +1230,7 @@ public class Config extends HashMap<String, Object> {
public void setMessageTimeoutSecs(int secs) {
setMessageTimeoutSecs(this, secs);
}
-
+
public static void registerSerialization(Map conf, Class klass) {
getRegisteredSerializations(conf).add(klass.getName());
}
@@ -1232,17 +1238,17 @@ public class Config extends HashMap<String, Object> {
public void registerSerialization(Class klass) {
registerSerialization(this, klass);
}
-
+
public static void registerSerialization(Map conf, Class klass, Class<? extends Serializer> serializerClass) {
Map<String, String> register = new HashMap<String, String>();
register.put(klass.getName(), serializerClass.getName());
- getRegisteredSerializations(conf).add(register);
+ getRegisteredSerializations(conf).add(register);
}
public void registerSerialization(Class klass, Class<? extends Serializer> serializerClass) {
registerSerialization(this, klass, serializerClass);
}
-
+
public void registerMetricsConsumer(Class klass, Object argument, long parallelismHint) {
HashMap m = new HashMap();
m.put("class", klass.getCanonicalName());
@@ -1270,7 +1276,7 @@ public class Config extends HashMap<String, Object> {
public void registerDecorator(Class<? extends IKryoDecorator> klass) {
registerDecorator(this, klass);
}
-
+
public static void setKryoFactory(Map conf, Class<? extends IKryoFactory> klass) {
conf.put(Config.TOPOLOGY_KRYO_FACTORY, klass.getName());
}
@@ -1286,7 +1292,7 @@ public class Config extends HashMap<String, Object> {
public void setSkipMissingKryoRegistrations(boolean skip) {
setSkipMissingKryoRegistrations(this, skip);
}
-
+
public static void setMaxTaskParallelism(Map conf, int max) {
conf.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, max);
}
@@ -1294,7 +1300,7 @@ public class Config extends HashMap<String, Object> {
public void setMaxTaskParallelism(int max) {
setMaxTaskParallelism(this, max);
}
-
+
public static void setMaxSpoutPending(Map conf, int max) {
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, max);
}
@@ -1302,23 +1308,23 @@ public class Config extends HashMap<String, Object> {
public void setMaxSpoutPending(int max) {
setMaxSpoutPending(this, max);
}
-
+
public static void setStatsSampleRate(Map conf, double rate) {
conf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, rate);
- }
+ }
public void setStatsSampleRate(double rate) {
setStatsSampleRate(this, rate);
- }
+ }
public static void setFallBackOnJavaSerialization(Map conf, boolean fallback) {
conf.put(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION, fallback);
- }
+ }
public void setFallBackOnJavaSerialization(boolean fallback) {
setFallBackOnJavaSerialization(this, fallback);
- }
-
+ }
+
private static List getRegisteredSerializations(Map conf) {
List ret;
if(!conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) {
@@ -1329,13 +1335,13 @@ public class Config extends HashMap<String, Object> {
conf.put(Config.TOPOLOGY_KRYO_REGISTER, ret);
return ret;
}
-
+
private static List getRegisteredDecorators(Map conf) {
List ret;
if(!conf.containsKey(Config.TOPOLOGY_KRYO_DECORATORS)) {
ret = new ArrayList();
} else {
- ret = new ArrayList((List) conf.get(Config.TOPOLOGY_KRYO_DECORATORS));
+ ret = new ArrayList((List) conf.get(Config.TOPOLOGY_KRYO_DECORATORS));
}
conf.put(Config.TOPOLOGY_KRYO_DECORATORS, ret);
return ret;
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/9e13a356/storm-core/src/jvm/backtype/storm/security/auth/IGroupMappingServiceProvider.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/IGroupMappingServiceProvider.java b/storm-core/src/jvm/backtype/storm/security/auth/IGroupMappingServiceProvider.java
index 0b49dec..5590b81 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/IGroupMappingServiceProvider.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/IGroupMappingServiceProvider.java
@@ -39,16 +39,4 @@ public interface IGroupMappingServiceProvider {
*/
public Set<String> getGroups(String user) throws IOException;
- /**
- * Refresh the cache of groups and user mapping
- * @throws IOException
- */
- public void cacheGroupsRefresh() throws IOException;
- /**
- * Caches the group user information
- * @param groups list of groups to add to cache
- * @throws IOException
- */
- public void cacheGroupsAdd(Set<String> groups) throws IOException;
-
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/9e13a356/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedUnixGroupsMapping.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedUnixGroupsMapping.java b/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedUnixGroupsMapping.java
index b8c8323..438b938 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedUnixGroupsMapping.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedUnixGroupsMapping.java
@@ -23,7 +23,10 @@ import java.util.Set;
import java.util.HashSet;
import java.util.Map;
import java.util.StringTokenizer;
+import backtype.storm.Config;
+import backtype.storm.utils.Utils;
import backtype.storm.utils.ShellUtils;
+import backtype.storm.utils.TimeCacheMap;
import backtype.storm.utils.ShellUtils.ExitCodeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,12 +36,17 @@ public class ShellBasedUnixGroupsMapping implements
IGroupMappingServiceProvider {
public static Logger LOG = LoggerFactory.getLogger(ShellBasedUnixGroupsMapping.class);
+ public TimeCacheMap<String, Set<String>> cachedGroups;
/**
* Invoked once immediately after construction
* @param storm_conf Storm configuration
*/
- public void prepare(Map storm_conf) {}
+ @Override
+ public void prepare(Map storm_conf) {
+ int timeout = Utils.getInt(storm_conf.get(Config.STORM_GROUP_MAPPING_SERVICE_CACHE_DURATION_SECS));
+ cachedGroups = new TimeCacheMap<String, Set<String>>(timeout);
+ }
/**
* Returns list of groups for a user
@@ -48,15 +56,13 @@ public class ShellBasedUnixGroupsMapping implements
*/
@Override
public Set<String> getGroups(String user) throws IOException {
- return getUnixGroups(user);
- }
-
- @Override
- public void cacheGroupsRefresh() throws IOException {
- }
-
- @Override
- public void cacheGroupsAdd(Set<String> groups) throws IOException {
+ if(cachedGroups.containsKey(user)) {
+ return cachedGroups.get(user);
+ }
+ Set<String> groups = getUnixGroups(user);
+ if(!groups.isEmpty())
+ cachedGroups.put(user,groups);
+ return groups;
}
/**