You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/29 00:19:42 UTC
[1/4] git commit: STORM-347. (Security) authentication should allow
for groups not just users.
Repository: incubator-storm
Updated Branches:
refs/heads/security 642ed7431 -> ff8336b70
STORM-347. (Security) authentication should allow for groups not just users.
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/f3112fa7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/f3112fa7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/f3112fa7
Branch: refs/heads/security
Commit: f3112fa76e98038da83170513a476499dde0bb41
Parents: 65aee65
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Thu Jun 26 15:05:21 2014 -0700
Committer: Sriharsha Chintalapani <ma...@harsha.io>
Committed: Thu Jun 26 15:05:21 2014 -0700
----------------------------------------------------------------------
conf/defaults.yaml | 3 +-
storm-core/src/jvm/backtype/storm/Config.java | 6 +
.../backtype/storm/security/auth/AuthUtils.java | 49 +-
.../auth/IGroupMappingServiceProvider.java | 54 ++
.../auth/ShellBasedUnixGroupsMapping.java | 88 ++++
.../auth/authorizer/SimpleACLAuthorizer.java | 29 +-
.../jvm/backtype/storm/utils/ShellUtils.java | 498 +++++++++++++++++++
7 files changed, 702 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f3112fa7/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 7f17054..83b7b4d 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -38,6 +38,7 @@ storm.cluster.mode: "distributed" # can be distributed or local
storm.local.mode.zmq: false
storm.thrift.transport: "backtype.storm.security.auth.SimpleTransportPlugin"
storm.principal.tolocal: "backtype.storm.security.auth.DefaultPrincipalToLocal"
+storm.group.mapping.service: "backtype.storm.security.auth.ShellBasedUnixGroupsMapping"
storm.messaging.transport: "backtype.storm.messaging.netty.Context"
storm.nimbus.retry.times: 5
storm.nimbus.retry.interval.millis: 2000
@@ -128,7 +129,7 @@ worker.childopts: "-Xmx768m"
worker.gc.childopts: ""
worker.heartbeat.frequency.secs: 1
-# control how many worker receiver threads we need per worker
+# control how many worker receiver threads we need per worker
topology.worker.receiver.thread.count: 1
task.heartbeat.frequency.secs: 3
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f3112fa7/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 3b3f7e5..fee5f6e 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -158,6 +158,12 @@ public class Config extends HashMap<String, Object> {
public static final Object STORM_PRINCIPAL_TO_LOCAL_PLUGIN_SCHEMA = String.class;
/**
+ * The plugin that will provide user groups service
+ */
+ public static final String STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN = "storm.group.mapping.service";
+ public static final Object STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN_SCHEMA = String.class;
+
+ /**
* The default transport plug-in for Thrift client/server communication
*/
public static final String STORM_THRIFT_TRANSPORT_PLUGIN = "storm.thrift.transport";
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f3112fa7/storm-core/src/jvm/backtype/storm/security/auth/AuthUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/AuthUtils.java b/storm-core/src/jvm/backtype/storm/security/auth/AuthUtils.java
index b5118c7..7cfc927 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/AuthUtils.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/AuthUtils.java
@@ -37,21 +37,21 @@ import java.util.concurrent.ExecutorService;
public class AuthUtils {
private static final Logger LOG = LoggerFactory.getLogger(AuthUtils.class);
- public static final String LOGIN_CONTEXT_SERVER = "StormServer";
- public static final String LOGIN_CONTEXT_CLIENT = "StormClient";
+ public static final String LOGIN_CONTEXT_SERVER = "StormServer";
+ public static final String LOGIN_CONTEXT_CLIENT = "StormClient";
public static final String SERVICE = "storm_thrift_server";
/**
- * Construct a JAAS configuration object per storm configuration file
- * @param storm_conf Storm configuration
+ * Construct a JAAS configuration object per storm configuration file
+ * @param storm_conf Storm configuration
* @return JAAS configuration object
*/
public static Configuration GetConfiguration(Map storm_conf) {
Configuration login_conf = null;
- //find login file configuration from Storm configuration
+ //find login file configuration from Storm configuration
String loginConfigurationFile = (String)storm_conf.get("java.security.auth.login.config");
- if ((loginConfigurationFile != null) && (loginConfigurationFile.length()>0)) {
+ if ((loginConfigurationFile != null) && (loginConfigurationFile.length()>0)) {
File config_file = new File(loginConfigurationFile);
if (! config_file.canRead()) {
throw new RuntimeException("File " + loginConfigurationFile +
@@ -64,7 +64,7 @@ public class AuthUtils {
throw new RuntimeException(ex);
}
}
-
+
return login_conf;
}
@@ -87,10 +87,28 @@ public class AuthUtils {
}
/**
+ * Construct a group mapping service provider plugin
+ * @param conf storm configuration
+ * @return the plugin
+ */
+ public static IGroupMappingServiceProvider GetGroupMappingServiceProviderPlugin(Map storm_conf) {
+ IGroupMappingServiceProvider gmsp = null;
+ try {
+ String gmsp_klassName = (String) storm_conf.get(Config.STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN);
+ Class klass = Class.forName(gmsp_klassName);
+ gmsp = (IGroupMappingServiceProvider)klass.newInstance();
+ gmsp.prepare(storm_conf);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return gmsp;
+ }
+
+ /**
* Get all of the configured Credential Renwer Plugins.
* @param storm_conf the storm configuration to use.
* @return the configured credential renewers.
- */
+ */
public static Collection<ICredentialsRenewer> GetCredentialRenewers(Map conf) {
try {
Set<ICredentialsRenewer> ret = new HashSet<ICredentialsRenewer>();
@@ -112,7 +130,7 @@ public class AuthUtils {
* Get all of the configured AutoCredential Plugins.
* @param storm_conf the storm configuration to use.
* @return the configured auto credentials.
- */
+ */
public static Collection<IAutoCredentials> GetAutoCredentials(Map storm_conf) {
try {
Set<IAutoCredentials> autos = new HashSet<IAutoCredentials>();
@@ -137,7 +155,7 @@ public class AuthUtils {
* @param autos the IAutoCredentials to call to populate the subject.
* @param credentials the credentials to pull from
* @return the populated subject.
- */
+ */
public static Subject populateSubject(Subject subject, Collection<IAutoCredentials> autos, Map<String,String> credentials) {
try {
if (subject == null) {
@@ -154,10 +172,10 @@ public class AuthUtils {
/**
* Update a subject from credentials using the IAutoCredentials.
- * @param subject the subject to update
+ * @param subject the subject to update
* @param autos the IAutoCredentials to call to update the subject.
* @param credentials the credentials to pull from
- */
+ */
public static void updateSubject(Subject subject, Collection<IAutoCredentials> autos, Map<String,String> credentials) {
if (subject == null) {
throw new RuntimeException("The subject cannot be null when updating a subject with credentials");
@@ -186,7 +204,7 @@ public class AuthUtils {
transportPlugin.prepare(type, storm_conf, login_conf);
} catch(Exception e) {
throw new RuntimeException(e);
- }
+ }
return transportPlugin;
}
@@ -233,11 +251,10 @@ public class AuthUtils {
}
for(AppConfigurationEntry entry: configurationEntries) {
- Object val = entry.getOptions().get(key);
- if (val != null)
+ Object val = entry.getOptions().get(key);
+ if (val != null)
return (String)val;
}
return null;
}
}
-
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f3112fa7/storm-core/src/jvm/backtype/storm/security/auth/IGroupMappingServiceProvider.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/IGroupMappingServiceProvider.java b/storm-core/src/jvm/backtype/storm/security/auth/IGroupMappingServiceProvider.java
new file mode 100644
index 0000000..0b49dec
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/security/auth/IGroupMappingServiceProvider.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.security.auth;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.Map;
+
+public interface IGroupMappingServiceProvider {
+
+ /**
+ * Invoked once immediately after construction
+ * @param storm_conf Storm configuration
+ */
+ void prepare(Map storm_conf);
+
+ /**
+ * Get all various group memberships of a given user.
+ * Returns EMPTY list in case of non-existing user
+ * @param user User's name
+ * @return group memberships of user
+ * @throws IOException
+ */
+ public Set<String> getGroups(String user) throws IOException;
+
+ /**
+ * Refresh the cache of groups and user mapping
+ * @throws IOException
+ */
+ public void cacheGroupsRefresh() throws IOException;
+ /**
+ * Caches the group user information
+ * @param groups list of groups to add to cache
+ * @throws IOException
+ */
+ public void cacheGroupsAdd(Set<String> groups) throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f3112fa7/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedUnixGroupsMapping.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedUnixGroupsMapping.java b/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedUnixGroupsMapping.java
new file mode 100644
index 0000000..b8c8323
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedUnixGroupsMapping.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.security.auth;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.StringTokenizer;
+import backtype.storm.utils.ShellUtils;
+import backtype.storm.utils.ShellUtils.ExitCodeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ShellBasedUnixGroupsMapping implements
+ IGroupMappingServiceProvider {
+
+ public static Logger LOG = LoggerFactory.getLogger(ShellBasedUnixGroupsMapping.class);
+
+ /**
+ * Invoked once immediately after construction
+ * @param storm_conf Storm configuration
+ */
+ public void prepare(Map storm_conf) {}
+
+ /**
+ * Returns list of groups for a user
+ *
+ * @param user get groups for this user
+ * @return list of groups for a given user
+ */
+ @Override
+ public Set<String> getGroups(String user) throws IOException {
+ return getUnixGroups(user);
+ }
+
+ @Override
+ public void cacheGroupsRefresh() throws IOException {
+ }
+
+ @Override
+ public void cacheGroupsAdd(Set<String> groups) throws IOException {
+ }
+
+ /**
+ * Get the current user's group list from Unix by running the command 'groups'
+ * NOTE. For non-existing user it will return EMPTY list
+ * @param user user name
+ * @return the groups set that the <code>user</code> belongs to
+ * @throws IOException if encounter any error when running the command
+ */
+ private static Set<String> getUnixGroups(final String user) throws IOException {
+ String result = "";
+ try {
+ result = ShellUtils.execCommand(ShellUtils.getGroupsForUserCommand(user));
+ } catch (ExitCodeException e) {
+ // if we didn't get the group - just return empty list;
+ LOG.warn("got exception trying to get groups for user " + user, e);
+ return new HashSet<String>();
+ }
+
+ StringTokenizer tokenizer =
+ new StringTokenizer(result, ShellUtils.TOKEN_SEPARATOR_REGEX);
+ Set<String> groups = new HashSet<String>();
+ while (tokenizer.hasMoreTokens()) {
+ groups.add(tokenizer.nextToken());
+ }
+ return groups;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f3112fa7/storm-core/src/jvm/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java b/storm-core/src/jvm/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java
index ef13750..0b867e8 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java
@@ -23,12 +23,14 @@ import java.util.Map;
import java.util.Set;
import java.util.HashSet;
import java.util.Collection;
+import java.io.IOException;
import backtype.storm.Config;
import backtype.storm.security.auth.IAuthorizer;
import backtype.storm.security.auth.ReqContext;
import backtype.storm.security.auth.AuthUtils;
import backtype.storm.security.auth.IPrincipalToLocal;
+import backtype.storm.security.auth.IGroupMappingServiceProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,10 +49,10 @@ public class SimpleACLAuthorizer implements IAuthorizer {
protected Set<String> _admins;
protected Set<String> _supervisors;
protected IPrincipalToLocal _ptol;
-
+ protected IGroupMappingServiceProvider _groups;
/**
* Invoked once immediately after construction
- * @param conf Storm configuration
+ * @param conf Storm configuration
*/
@Override
public void prepare(Map conf) {
@@ -64,27 +66,26 @@ public class SimpleACLAuthorizer implements IAuthorizer {
_supervisors.addAll((Collection<String>)conf.get(Config.NIMBUS_SUPERVISOR_USERS));
}
_ptol = AuthUtils.GetPrincipalToLocalPlugin(conf);
+ _groups = AuthUtils.GetGroupMappingServiceProviderPlugin(conf);
}
/**
* permit() method is invoked for each incoming Thrift request
- * @param context request context includes info about
+ * @param context request context includes info about
* @param operation operation name
- * @param topology_storm configuration of targeted topology
+ * @param topology_storm configuration of targeted topology
* @return true if the request is authorized, false if reject
*/
@Override
public boolean permit(ReqContext context, String operation, Map topology_conf) {
-
LOG.info("[req "+ context.requestID()+ "] Access "
+ " from: " + (context.remoteAddress() == null? "null" : context.remoteAddress().toString())
+ (context.principal() == null? "" : (" principal:"+ context.principal()))
+" op:"+operation
+ (topology_conf == null? "" : (" topoology:"+topology_conf.get(Config.TOPOLOGY_NAME))));
-
+
String principal = context.principal().getName();
String user = _ptol.toLocal(context.principal());
-
if (_admins.contains(principal) || _admins.contains(user)) {
return true;
}
@@ -106,8 +107,20 @@ public class SimpleACLAuthorizer implements IAuthorizer {
if (topoUsers.contains(principal) || topoUsers.contains(user)) {
return true;
}
+ if(_groups != null) {
+ try {
+ String topologySubmitterUser = (String) topology_conf.get(Config.TOPOLOGY_SUBMITTER_USER);
+ Set<String> userGroups = _groups.getGroups(user);
+ Set<String> topoUserGroups = _groups.getGroups(topologySubmitterUser);
+ for (String tgroup : topoUserGroups) {
+ if(userGroups.contains(tgroup))
+ return true;
+ }
+ } catch(IOException e) {
+ LOG.warn("Error while trying to fetch user groups",e);
+ }
+ }
}
-
return false;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f3112fa7/storm-core/src/jvm/backtype/storm/utils/ShellUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/ShellUtils.java b/storm-core/src/jvm/backtype/storm/utils/ShellUtils.java
new file mode 100644
index 0000000..1065ff9
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/utils/ShellUtils.java
@@ -0,0 +1,498 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.utils;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+abstract public class ShellUtils {
+ public static Logger LOG = LoggerFactory.getLogger(ShellUtils.class);
+
+ // OSType detection
+ public enum OSType {
+ OS_TYPE_LINUX,
+ OS_TYPE_WIN,
+ OS_TYPE_SOLARIS,
+ OS_TYPE_MAC,
+ OS_TYPE_FREEBSD,
+ OS_TYPE_OTHER
+ }
+
+ public static final OSType osType = getOSType();
+
+ static private OSType getOSType() {
+ String osName = System.getProperty("os.name");
+ if (osName.startsWith("Windows")) {
+ return OSType.OS_TYPE_WIN;
+ } else if (osName.contains("SunOS") || osName.contains("Solaris")) {
+ return OSType.OS_TYPE_SOLARIS;
+ } else if (osName.contains("Mac")) {
+ return OSType.OS_TYPE_MAC;
+ } else if (osName.contains("FreeBSD")) {
+ return OSType.OS_TYPE_FREEBSD;
+ } else if (osName.startsWith("Linux")) {
+ return OSType.OS_TYPE_LINUX;
+ } else {
+ // Some other form of Unix
+ return OSType.OS_TYPE_OTHER;
+ }
+ }
+
+ // Helper static vars for each platform
+ public static final boolean WINDOWS = (osType == OSType.OS_TYPE_WIN);
+ public static final boolean SOLARIS = (osType == OSType.OS_TYPE_SOLARIS);
+ public static final boolean MAC = (osType == OSType.OS_TYPE_MAC);
+ public static final boolean FREEBSD = (osType == OSType.OS_TYPE_FREEBSD);
+ public static final boolean LINUX = (osType == OSType.OS_TYPE_LINUX);
+ public static final boolean OTHER = (osType == OSType.OS_TYPE_OTHER);
+
+
+ /** Token separator regex used to parse Shell tool outputs */
+ public static final String TOKEN_SEPARATOR_REGEX
+ = WINDOWS ? "[|\n\r]" : "[ \t\n\r\f]";
+
+ private long interval; // refresh interval in msec
+ private long lastTime; // last time the command was performed
+ final private boolean redirectErrorStream; // merge stdout and stderr
+ private Map<String, String> environment; // env for the command execution
+ private File dir;
+ private Process process; // sub process used to execute the command
+ private int exitCode;
+ /**Time after which the executing script would be timedout*/
+ protected long timeOutInterval = 0L;
+ /** If or not script timed out*/
+ private AtomicBoolean timedOut;
+
+ /**If or not script finished executing*/
+ private volatile AtomicBoolean completed;
+
+ public ShellUtils() {
+ this(0L);
+ }
+
+ public ShellUtils(long interval) {
+ this(interval, false);
+ }
+
+ /**
+ * @param interval the minimum duration to wait before re-executing the
+ * command.
+ */
+ public ShellUtils(long interval, boolean redirectErrorStream) {
+ this.interval = interval;
+ this.lastTime = (interval<0) ? 0 : -interval;
+ this.redirectErrorStream = redirectErrorStream;
+ }
+
+ /** set the environment for the command
+ * @param env Mapping of environment variables
+ */
+ protected void setEnvironment(Map<String, String> env) {
+ this.environment = env;
+ }
+
+ /** set the working directory
+ * @param dir The directory where the command would be executed
+ */
+ protected void setWorkingDirectory(File dir) {
+ this.dir = dir;
+ }
+
+ /** a Unix command to get the current user's groups list */
+ public static String[] getGroupsCommand() {
+ return (WINDOWS)? new String[]{"cmd", "/c", "groups"}
+ : new String[]{"bash", "-c", "groups"};
+ }
+
+ /**
+ * a Unix command to get a given user's groups list.
+ * If the OS is not WINDOWS, the command will get the user's primary group
+ * first and finally get the groups list which includes the primary group.
+ * i.e. the user's primary group will be included twice.
+ */
+ public static String[] getGroupsForUserCommand(final String user) {
+ //'groups username' command return is non-consistent across different unixes
+ return new String [] {"bash", "-c", "id -gn " + user
+ + "&& id -Gn " + user};
+ }
+
+
+ /** check to see if a command needs to be executed and execute if needed */
+ protected void run() throws IOException {
+ if (lastTime + interval > System.currentTimeMillis())
+ return;
+ exitCode = 0; // reset for next run
+ runCommand();
+ }
+
+ /** Run a command */
+ private void runCommand() throws IOException {
+ ProcessBuilder builder = new ProcessBuilder(getExecString());
+ Timer timeOutTimer = null;
+ ShellTimeoutTimerTask timeoutTimerTask = null;
+ timedOut = new AtomicBoolean(false);
+ completed = new AtomicBoolean(false);
+
+ if (environment != null) {
+ builder.environment().putAll(this.environment);
+ }
+ if (dir != null) {
+ builder.directory(this.dir);
+ }
+
+ builder.redirectErrorStream(redirectErrorStream);
+ process = builder.start();
+
+ if (timeOutInterval > 0) {
+ timeOutTimer = new Timer("Shell command timeout");
+ timeoutTimerTask = new ShellTimeoutTimerTask(this);
+ //One time scheduling.
+ timeOutTimer.schedule(timeoutTimerTask, timeOutInterval);
+ }
+ final BufferedReader errReader =
+ new BufferedReader(new InputStreamReader(process
+ .getErrorStream()));
+ BufferedReader inReader =
+ new BufferedReader(new InputStreamReader(process
+ .getInputStream()));
+ final StringBuffer errMsg = new StringBuffer();
+
+ // read error and input streams as this would free up the buffers
+ // free the error stream buffer
+ Thread errThread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ String line = errReader.readLine();
+ while((line != null) && !isInterrupted()) {
+ errMsg.append(line);
+ errMsg.append(System.getProperty("line.separator"));
+ line = errReader.readLine();
+ }
+ } catch(IOException ioe) {
+ LOG.warn("Error reading the error stream", ioe);
+ }
+ }
+ };
+ try {
+ errThread.start();
+ } catch (IllegalStateException ise) { }
+ try {
+ parseExecResult(inReader); // parse the output
+ // clear the input stream buffer
+ String line = inReader.readLine();
+ while(line != null) {
+ line = inReader.readLine();
+ }
+ // wait for the process to finish and check the exit code
+ exitCode = process.waitFor();
+ // make sure that the error thread exits
+ joinThread(errThread);
+ completed.set(true);
+ //the timeout thread handling
+ //taken care in finally block
+ if (exitCode != 0) {
+ throw new ExitCodeException(exitCode, errMsg.toString());
+ }
+ } catch (InterruptedException ie) {
+ throw new IOException(ie.toString());
+ } finally {
+ if (timeOutTimer != null) {
+ timeOutTimer.cancel();
+ }
+ // close the input stream
+ try {
+ // JDK 7 tries to automatically drain the input streams for us
+ // when the process exits, but since close is not synchronized,
+ // it creates a race if we close the stream first and the same
+ // fd is recycled. the stream draining thread will attempt to
+ // drain that fd!! it may block, OOM, or cause bizarre behavior
+ // see: https://bugs.openjdk.java.net/browse/JDK-8024521
+ // issue is fixed in build 7u60
+ InputStream stdout = process.getInputStream();
+ synchronized (stdout) {
+ inReader.close();
+ }
+ } catch (IOException ioe) {
+ LOG.warn("Error while closing the input stream", ioe);
+ }
+ if (!completed.get()) {
+ errThread.interrupt();
+ joinThread(errThread);
+ }
+ try {
+ InputStream stderr = process.getErrorStream();
+ synchronized (stderr) {
+ errReader.close();
+ }
+ } catch (IOException ioe) {
+ LOG.warn("Error while closing the error stream", ioe);
+ }
+ process.destroy();
+ lastTime = System.currentTimeMillis();
+ }
+ }
+
+ private static void joinThread(Thread t) {
+ while (t.isAlive()) {
+ try {
+ t.join();
+ } catch (InterruptedException ie) {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Interrupted while joining on: " + t, ie);
+ }
+ t.interrupt(); // propagate interrupt
+ }
+ }
+ }
+
+ /** return an array containing the command name & its parameters */
+ protected abstract String[] getExecString();
+
+ /** Parse the execution result */
+ protected abstract void parseExecResult(BufferedReader lines)
+ throws IOException;
+
+ /** get the current sub-process executing the given command
+ * @return process executing the command
+ */
+ public Process getProcess() {
+ return process;
+ }
+
+ /**
+ * This is an IOException with exit code added.
+ */
+ public static class ExitCodeException extends IOException {
+ int exitCode;
+
+ public ExitCodeException(int exitCode, String message) {
+ super(message);
+ this.exitCode = exitCode;
+ }
+
+ public int getExitCode() {
+ return exitCode;
+ }
+ }
+
+ /**
+ * A simple shell command executor.
+ *
+ * <code>ShellCommandExecutor</code>should be used in cases where the output
+ * of the command needs no explicit parsing and where the command, working
+ * directory and the environment remains unchanged. The output of the command
+ * is stored as-is and is expected to be small.
+ */
+ public static class ShellCommandExecutor extends ShellUtils {
+
+ private String[] command;
+ private StringBuffer output;
+
+
+ public ShellCommandExecutor(String[] execString) {
+ this(execString, null);
+ }
+
+ public ShellCommandExecutor(String[] execString, File dir) {
+ this(execString, dir, null);
+ }
+
+ public ShellCommandExecutor(String[] execString, File dir,
+ Map<String, String> env) {
+ this(execString, dir, env , 0L);
+ }
+
+ /**
+ * Create a new instance of the ShellCommandExecutor to execute a command.
+ *
+ * @param execString The command to execute with arguments
+ * @param dir If not-null, specifies the directory which should be set
+ * as the current working directory for the command.
+ * If null, the current working directory is not modified.
+ * @param env If not-null, environment of the command will include the
+ * key-value pairs specified in the map. If null, the current
+ * environment is not modified.
+ * @param timeout Specifies the time in milliseconds, after which the
+ * command will be killed and the status marked as timedout.
+ * If 0, the command will not be timed out.
+ */
+ public ShellCommandExecutor(String[] execString, File dir,
+ Map<String, String> env, long timeout) {
+ command = execString.clone();
+ if (dir != null) {
+ setWorkingDirectory(dir);
+ }
+ if (env != null) {
+ setEnvironment(env);
+ }
+ timeOutInterval = timeout;
+ }
+
+
+ /** Execute the shell command. */
+ public void execute() throws IOException {
+ this.run();
+ }
+
+ @Override
+ public String[] getExecString() {
+ return command;
+ }
+
+ @Override
+ protected void parseExecResult(BufferedReader lines) throws IOException {
+ output = new StringBuffer();
+ char[] buf = new char[512];
+ int nRead;
+ while ( (nRead = lines.read(buf, 0, buf.length)) > 0 ) {
+ output.append(buf, 0, nRead);
+ }
+ }
+
+ /** Get the output of the shell command.*/
+ public String getOutput() {
+ return (output == null) ? "" : output.toString();
+ }
+
+ /**
+ * Returns the commands of this instance.
+ * Arguments with spaces in are presented with quotes round; other
+ * arguments are presented raw
+ *
+ * @return a string representation of the object.
+ */
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ String[] args = getExecString();
+ for (String s : args) {
+ if (s.indexOf(' ') >= 0) {
+ builder.append('"').append(s).append('"');
+ } else {
+ builder.append(s);
+ }
+ builder.append(' ');
+ }
+ return builder.toString();
+ }
+ }
+
+ /**
+ * To check if the passed script to shell command executor timed out or
+ * not.
+ *
+ * @return if the script timed out.
+ */
+ public boolean isTimedOut() {
+ return timedOut.get();
+ }
+
+ /**
+ * Set if the command has timed out.
+ *
+ */
+ private void setTimedOut() {
+ this.timedOut.set(true);
+ }
+
+
+ /**
+ * Static method to execute a shell command.
+ * Covers most of the simple cases without requiring the user to implement
+ * the <code>Shell</code> interface.
+ * @param cmd shell command to execute.
+ * @return the output of the executed command.
+ */
+ public static String execCommand(String ... cmd) throws IOException {
+ return execCommand(null, cmd, 0L);
+ }
+
+ /**
+ * Static method to execute a shell command.
+ * Covers most of the simple cases without requiring the user to implement
+ * the <code>Shell</code> interface.
+ * @param env the map of environment key=value
+ * @param cmd shell command to execute.
+ * @param timeout time in milliseconds after which script should be marked timeout
+ * @return the output of the executed command.o
+ */
+
+ public static String execCommand(Map<String, String> env, String[] cmd,
+ long timeout) throws IOException {
+ ShellCommandExecutor exec = new ShellCommandExecutor(cmd, null, env,
+ timeout);
+ exec.execute();
+ return exec.getOutput();
+ }
+
+ /**
+ * Static method to execute a shell command.
+ * Covers most of the simple cases without requiring the user to implement
+ * the <code>Shell</code> interface.
+ * @param env the map of environment key=value
+ * @param cmd shell command to execute.
+ * @return the output of the executed command.
+ */
+ public static String execCommand(Map<String,String> env, String ... cmd)
+ throws IOException {
+ return execCommand(env, cmd, 0L);
+ }
+
+ /**
+ * Timer which is used to timeout scripts spawned off by shell.
+ */
+ private static class ShellTimeoutTimerTask extends TimerTask {
+
+ private ShellUtils shell;
+
+ public ShellTimeoutTimerTask(ShellUtils shell) {
+ this.shell = shell;
+ }
+
+ @Override
+ public void run() {
+ Process p = shell.getProcess();
+ try {
+ p.exitValue();
+ } catch (Exception e) {
+ //Process has not terminated.
+ //So check if it has completed
+ //if not just destroy it.
+ if (p != null && !shell.completed.get()) {
+ shell.setTimedOut();
+ p.destroy();
+ }
+ }
+ }
+ }
+
+}
[3/4] git commit: STORM-347. (Security) authentication should allow
for groups not just users. Added Unit test for ShellBaasedGroupsMapping.
Posted by bo...@apache.org.
STORM-347. (Security) authentication should allow for groups not just
users. Added Unit test for ShellBaasedGroupsMapping.
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/fb882ca1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/fb882ca1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/fb882ca1
Branch: refs/heads/security
Commit: fb882ca18f124c7f6b65b7edeb2c0a652e254c3a
Parents: 9e13a35
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Mon Jul 28 12:51:13 2014 -0700
Committer: Sriharsha Chintalapani <ma...@harsha.io>
Committed: Mon Jul 28 12:51:13 2014 -0700
----------------------------------------------------------------------
conf/defaults.yaml | 2 +-
.../security/auth/ShellBasedGroupsMapping.java | 94 ++++++++++++++++++++
.../auth/ShellBasedUnixGroupsMapping.java | 94 --------------------
.../backtype/storm/security/auth/auth_test.clj | 11 ++-
4 files changed, 105 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fb882ca1/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index b0ebb4d..d4283a4 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -38,7 +38,7 @@ storm.cluster.mode: "distributed" # can be distributed or local
storm.local.mode.zmq: false
storm.thrift.transport: "backtype.storm.security.auth.SimpleTransportPlugin"
storm.principal.tolocal: "backtype.storm.security.auth.DefaultPrincipalToLocal"
-storm.group.mapping.service: "backtype.storm.security.auth.ShellBasedUnixGroupsMapping"
+storm.group.mapping.service: "backtype.storm.security.auth.ShellBasedGroupsMapping"
storm.messaging.transport: "backtype.storm.messaging.netty.Context"
storm.nimbus.retry.times: 5
storm.nimbus.retry.interval.millis: 2000
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fb882ca1/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedGroupsMapping.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedGroupsMapping.java b/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedGroupsMapping.java
new file mode 100644
index 0000000..62a4c7e
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedGroupsMapping.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.security.auth;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.StringTokenizer;
+import backtype.storm.Config;
+import backtype.storm.utils.Utils;
+import backtype.storm.utils.ShellUtils;
+import backtype.storm.utils.TimeCacheMap;
+import backtype.storm.utils.ShellUtils.ExitCodeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ShellBasedGroupsMapping implements
+ IGroupMappingServiceProvider {
+
+ public static Logger LOG = LoggerFactory.getLogger(ShellBasedGroupsMapping.class);
+ public TimeCacheMap<String, Set<String>> cachedGroups;
+
+ /**
+ * Invoked once immediately after construction
+ * @param storm_conf Storm configuration
+ */
+ @Override
+ public void prepare(Map storm_conf) {
+ int timeout = Utils.getInt(storm_conf.get(Config.STORM_GROUP_MAPPING_SERVICE_CACHE_DURATION_SECS));
+ cachedGroups = new TimeCacheMap<String, Set<String>>(timeout);
+ }
+
+ /**
+ * Returns list of groups for a user
+ *
+ * @param user get groups for this user
+ * @return list of groups for a given user
+ */
+ @Override
+ public Set<String> getGroups(String user) throws IOException {
+ if(cachedGroups.containsKey(user)) {
+ return cachedGroups.get(user);
+ }
+ Set<String> groups = getUnixGroups(user);
+ if(!groups.isEmpty())
+ cachedGroups.put(user,groups);
+ return groups;
+ }
+
+ /**
+ * Get the current user's group list from Unix by running the command 'groups'
+ * NOTE. For non-existing user it will return EMPTY list
+ * @param user user name
+ * @return the groups set that the <code>user</code> belongs to
+ * @throws IOException if encounter any error when running the command
+ */
+ private static Set<String> getUnixGroups(final String user) throws IOException {
+ String result = "";
+ try {
+ result = ShellUtils.execCommand(ShellUtils.getGroupsForUserCommand(user));
+ } catch (ExitCodeException e) {
+ // if we didn't get the group - just return empty list;
+ LOG.warn("got exception trying to get groups for user " + user, e);
+ return new HashSet<String>();
+ }
+
+ StringTokenizer tokenizer =
+ new StringTokenizer(result, ShellUtils.TOKEN_SEPARATOR_REGEX);
+ Set<String> groups = new HashSet<String>();
+ while (tokenizer.hasMoreTokens()) {
+ groups.add(tokenizer.nextToken());
+ }
+ return groups;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fb882ca1/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedUnixGroupsMapping.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedUnixGroupsMapping.java b/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedUnixGroupsMapping.java
deleted file mode 100644
index 438b938..0000000
--- a/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedUnixGroupsMapping.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package backtype.storm.security.auth;
-
-import java.io.IOException;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.StringTokenizer;
-import backtype.storm.Config;
-import backtype.storm.utils.Utils;
-import backtype.storm.utils.ShellUtils;
-import backtype.storm.utils.TimeCacheMap;
-import backtype.storm.utils.ShellUtils.ExitCodeException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class ShellBasedUnixGroupsMapping implements
- IGroupMappingServiceProvider {
-
- public static Logger LOG = LoggerFactory.getLogger(ShellBasedUnixGroupsMapping.class);
- public TimeCacheMap<String, Set<String>> cachedGroups;
-
- /**
- * Invoked once immediately after construction
- * @param storm_conf Storm configuration
- */
- @Override
- public void prepare(Map storm_conf) {
- int timeout = Utils.getInt(storm_conf.get(Config.STORM_GROUP_MAPPING_SERVICE_CACHE_DURATION_SECS));
- cachedGroups = new TimeCacheMap<String, Set<String>>(timeout);
- }
-
- /**
- * Returns list of groups for a user
- *
- * @param user get groups for this user
- * @return list of groups for a given user
- */
- @Override
- public Set<String> getGroups(String user) throws IOException {
- if(cachedGroups.containsKey(user)) {
- return cachedGroups.get(user);
- }
- Set<String> groups = getUnixGroups(user);
- if(!groups.isEmpty())
- cachedGroups.put(user,groups);
- return groups;
- }
-
- /**
- * Get the current user's group list from Unix by running the command 'groups'
- * NOTE. For non-existing user it will return EMPTY list
- * @param user user name
- * @return the groups set that the <code>user</code> belongs to
- * @throws IOException if encounter any error when running the command
- */
- private static Set<String> getUnixGroups(final String user) throws IOException {
- String result = "";
- try {
- result = ShellUtils.execCommand(ShellUtils.getGroupsForUserCommand(user));
- } catch (ExitCodeException e) {
- // if we didn't get the group - just return empty list;
- LOG.warn("got exception trying to get groups for user " + user, e);
- return new HashSet<String>();
- }
-
- StringTokenizer tokenizer =
- new StringTokenizer(result, ShellUtils.TOKEN_SEPARATOR_REGEX);
- Set<String> groups = new HashSet<String>();
- while (tokenizer.hasMoreTokens()) {
- groups.add(tokenizer.nextToken());
- }
- return groups;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fb882ca1/storm-core/test/clj/backtype/storm/security/auth/auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/security/auth/auth_test.clj b/storm-core/test/clj/backtype/storm/security/auth/auth_test.clj
index 6fdd485..12411e7 100644
--- a/storm-core/test/clj/backtype/storm/security/auth/auth_test.clj
+++ b/storm-core/test/clj/backtype/storm/security/auth/auth_test.clj
@@ -26,7 +26,7 @@
(:import [backtype.storm.generated AuthorizationException])
(:import [backtype.storm.utils NimbusClient])
(:import [backtype.storm.security.auth.authorizer SimpleWhitelistAuthorizer SimpleACLAuthorizer])
- (:import [backtype.storm.security.auth AuthUtils ThriftServer ThriftClient
+ (:import [backtype.storm.security.auth AuthUtils ThriftServer ThriftClient ShellBasedGroupsMapping
ReqContext SimpleTransportPlugin KerberosPrincipalToLocal ThriftConnectionType])
(:use [backtype.storm bootstrap util])
(:use [backtype.storm.daemon common])
@@ -278,6 +278,15 @@
(is (= false (.permit authorizer (ReqContext. supervisor-user) "getTopologyInfo" {TOPOLOGY-USERS ["user-a"]})))
))
+(deftest shell-based-groups-mapping-test
+ (let [cluster-conf (merge (read-storm-config))
+ groups (ShellBasedGroupsMapping. )
+ user-name (System/getProperty "user.name")]
+ (.prepare groups cluster-conf)
+ (>= 0 (.size (.getGroups groups user-name)))
+ (>= 0 (.size (.getGroups groups "userDoesNotExist")))
+ (>= 0 (.size (.getGroups groups nil)))))
+
(deftest simple-acl-same-user-auth-test
(let [cluster-conf (merge (read-storm-config)
{NIMBUS-ADMINS ["admin"]
[4/4] git commit: Merge branch 'STORM-347' of
https://github.com/harshach/incubator-storm into STORM-347
Posted by bo...@apache.org.
Merge branch 'STORM-347' of https://github.com/harshach/incubator-storm into STORM-347
STORM-347: Authentication should allow for groups not just users.
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/ff8336b7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/ff8336b7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/ff8336b7
Branch: refs/heads/security
Commit: ff8336b70d20bccb29f0f5acd8c7f7e4323dca94
Parents: 642ed74 fb882ca
Author: Robert (Bobby) Evans <bo...@apache.org>
Authored: Mon Jul 28 16:59:42 2014 -0500
Committer: Robert (Bobby) Evans <bo...@apache.org>
Committed: Mon Jul 28 16:59:42 2014 -0500
----------------------------------------------------------------------
conf/defaults.yaml | 6 +-
storm-core/src/jvm/backtype/storm/Config.java | 176 ++++---
.../backtype/storm/security/auth/AuthUtils.java | 49 +-
.../auth/IGroupMappingServiceProvider.java | 42 ++
.../security/auth/ShellBasedGroupsMapping.java | 94 ++++
.../auth/authorizer/SimpleACLAuthorizer.java | 29 +-
.../jvm/backtype/storm/utils/ShellUtils.java | 498 +++++++++++++++++++
.../backtype/storm/security/auth/auth_test.clj | 11 +-
8 files changed, 797 insertions(+), 108 deletions(-)
----------------------------------------------------------------------
[2/4] git commit: STORM-347. (Security) authentication should allow
for groups not just users. Added Bobby's suggested changes.
Posted by bo...@apache.org.
STORM-347. (Security) authentication should allow for groups not just
users. Added Bobby's suggested changes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/9e13a356
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/9e13a356
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/9e13a356
Branch: refs/heads/security
Commit: 9e13a356268e1e8bf76ac29c42a441a0f49108be
Parents: f3112fa
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Mon Jul 21 15:13:35 2014 -0700
Committer: Sriharsha Chintalapani <ma...@harsha.io>
Committed: Mon Jul 21 15:13:35 2014 -0700
----------------------------------------------------------------------
conf/defaults.yaml | 3 +
storm-core/src/jvm/backtype/storm/Config.java | 170 ++++++++++---------
.../auth/IGroupMappingServiceProvider.java | 12 --
.../auth/ShellBasedUnixGroupsMapping.java | 26 +--
4 files changed, 107 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/9e13a356/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 83b7b4d..b0ebb4d 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -154,6 +154,9 @@ storm.messaging.netty.transfer.batch.size: 262144
# We check with this interval that whether the Netty channel is writable and try to write pending messages if it is.
storm.messaging.netty.flush.check.interval.ms: 10
+# default number of seconds group mapping service will cache user group
+storm.group.mapping.service.cache.duration.secs: 120
+
### topology.* configs are for specific executing storms
topology.enable.message.timeouts: true
topology.debug: false
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/9e13a356/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index fee5f6e..ea54313 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -28,11 +28,11 @@ import java.util.List;
import java.util.Map;
/**
- * Topology configs are specified as a plain old map. This class provides a
- * convenient way to create a topology config map by providing setter methods for
- * all the configs that can be set. It also makes it easier to do things like add
+ * Topology configs are specified as a plain old map. This class provides a
+ * convenient way to create a topology config map by providing setter methods for
+ * all the configs that can be set. It also makes it easier to do things like add
* serializations.
- *
+ *
* <p>This class also provides constants for all the configurations possible on
* a Storm cluster and Storm topology. Each constant is paired with a schema
* that defines the validity criterion of the corresponding field. Default
@@ -40,7 +40,7 @@ import java.util.Map;
*
* <p>Note that you may put other configurations in any of the configs. Storm
* will ignore anything it doesn't recognize, but your topologies are free to make
- * use of them by reading them in the prepare method of Bolts or the open method of
+ * use of them by reading them in the prepare method of Bolts or the open method of
* Spouts.</p>
*/
public class Config extends HashMap<String, Object> {
@@ -60,39 +60,39 @@ public class Config extends HashMap<String, Object> {
/**
* Netty based messaging: The buffer size for send/recv buffer
*/
- public static final String STORM_MESSAGING_NETTY_BUFFER_SIZE = "storm.messaging.netty.buffer_size";
+ public static final String STORM_MESSAGING_NETTY_BUFFER_SIZE = "storm.messaging.netty.buffer_size";
public static final Object STORM_MESSAGING_NETTY_BUFFER_SIZE_SCHEMA = Number.class;
/**
* Netty based messaging: The max # of retries that a peer will perform when a remote is not accessible
*/
- public static final String STORM_MESSAGING_NETTY_MAX_RETRIES = "storm.messaging.netty.max_retries";
+ public static final String STORM_MESSAGING_NETTY_MAX_RETRIES = "storm.messaging.netty.max_retries";
public static final Object STORM_MESSAGING_NETTY_MAX_RETRIES_SCHEMA = Number.class;
/**
- * Netty based messaging: The min # of milliseconds that a peer will wait.
+ * Netty based messaging: The min # of milliseconds that a peer will wait.
*/
- public static final String STORM_MESSAGING_NETTY_MIN_SLEEP_MS = "storm.messaging.netty.min_wait_ms";
+ public static final String STORM_MESSAGING_NETTY_MIN_SLEEP_MS = "storm.messaging.netty.min_wait_ms";
public static final Object STORM_MESSAGING_NETTY_MIN_SLEEP_MS_SCHEMA = Number.class;
/**
- * Netty based messaging: The max # of milliseconds that a peer will wait.
+ * Netty based messaging: The max # of milliseconds that a peer will wait.
*/
- public static final String STORM_MESSAGING_NETTY_MAX_SLEEP_MS = "storm.messaging.netty.max_wait_ms";
+ public static final String STORM_MESSAGING_NETTY_MAX_SLEEP_MS = "storm.messaging.netty.max_wait_ms";
public static final Object STORM_MESSAGING_NETTY_MAX_SLEEP_MS_SCHEMA = Number.class;
/**
* Netty based messaging: The # of worker threads for the server.
*/
- public static final String STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS = "storm.messaging.netty.server_worker_threads";
+ public static final String STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS = "storm.messaging.netty.server_worker_threads";
public static final Object STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS_SCHEMA = Number.class;
/**
* Netty based messaging: The # of worker threads for the client.
*/
- public static final String STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS = "storm.messaging.netty.client_worker_threads";
+ public static final String STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS = "storm.messaging.netty.client_worker_threads";
public static final Object STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS_SCHEMA = Number.class;
-
+
/**
* If the Netty messaging layer is busy, the Netty client will try to batch message as more as possible up to the size of STORM_NETTY_MESSAGE_BATCH_SIZE bytes
*/
@@ -104,8 +104,8 @@ public class Config extends HashMap<String, Object> {
*/
public static final String STORM_NETTY_FLUSH_CHECK_INTERVAL_MS = "storm.messaging.netty.flush.check.interval.ms";
public static final Object STORM_NETTY_FLUSH_CHECK_INTERVAL_MS_SCHEMA = Number.class;
-
-
+
+
/**
* A list of hosts of ZooKeeper servers used to manage the cluster.
*/
@@ -128,7 +128,7 @@ public class Config extends HashMap<String, Object> {
/**
* A global task scheduler used to assign topologies's tasks to supervisors' wokers.
- *
+ *
* If this is not set, a default system scheduler will be used.
*/
public static final String STORM_SCHEDULER = "storm.scheduler";
@@ -141,9 +141,9 @@ public class Config extends HashMap<String, Object> {
public static final Object STORM_CLUSTER_MODE_SCHEMA = String.class;
/**
- * The hostname the supervisors/workers should report to nimbus. If unset, Storm will
+ * The hostname the supervisors/workers should report to nimbus. If unset, Storm will
* get the hostname to report by calling <code>InetAddress.getLocalHost().getCanonicalHostName()</code>.
- *
+ *
* You should set this config when you dont have a DNS which supervisors/workers
* can utilize to find each other based on hostname got from calls to
* <code>InetAddress.getLocalHost().getCanonicalHostName()</code>.
@@ -164,13 +164,19 @@ public class Config extends HashMap<String, Object> {
public static final Object STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN_SCHEMA = String.class;
/**
+ * Max no.of seconds group mapping service will cache user groups
+ */
+ public static final String STORM_GROUP_MAPPING_SERVICE_CACHE_DURATION_SECS = "storm.group.mapping.service.cache.duration.secs";
+ public static final Object STORM_GROUP_MAPPING_SERVICE_CACHE_DURATION_SECS_SCHEMA = Number.class;
+
+ /**
* The default transport plug-in for Thrift client/server communication
*/
public static final String STORM_THRIFT_TRANSPORT_PLUGIN = "storm.thrift.transport";
public static final Object STORM_THRIFT_TRANSPORT_PLUGIN_SCHEMA = String.class;
/**
- * The serializer class for ListDelegate (tuple payload).
+ * The serializer class for ListDelegate (tuple payload).
* The default serializer will be ListDelegateSerializer
*/
public static final String TOPOLOGY_TUPLE_SERIALIZER = "topology.tuple.serializer";
@@ -184,9 +190,9 @@ public class Config extends HashMap<String, Object> {
public static final Object TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE_SCHEMA = Boolean.class;
/**
- * Whether or not to use ZeroMQ for messaging in local mode. If this is set
- * to false, then Storm will use a pure-Java messaging system. The purpose
- * of this flag is to make it easy to run Storm in local mode by eliminating
+ * Whether or not to use ZeroMQ for messaging in local mode. If this is set
+ * to false, then Storm will use a pure-Java messaging system. The purpose
+ * of this flag is to make it easy to run Storm in local mode by eliminating
* the need for native dependencies, which can be difficult to install.
*
* Defaults to false.
@@ -271,7 +277,7 @@ public class Config extends HashMap<String, Object> {
*/
public static final String STORM_NIMBUS_RETRY_TIMES="storm.nimbus.retry.times";
public static final Object STORM_NIMBUS_RETRY_TIMES_SCHEMA = Number.class;
-
+
/**
* The starting interval between exponential backoff retries of a Nimbus operation.
*/
@@ -311,15 +317,15 @@ public class Config extends HashMap<String, Object> {
/**
* A list of users that are cluster admins and can run any command. To use this set
- * nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
+ * nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
*/
public static final String NIMBUS_ADMINS = "nimbus.admins";
public static final Object NIMBUS_ADMINS_SCHEMA = ConfigValidation.StringsValidator;
/**
- * A list of users that run the supervisors and should be authorized to interact with
+ * A list of users that run the supervisors and should be authorized to interact with
* nimbus as a supervisor would. To use this set
- * nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
+ * nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
*/
public static final String NIMBUS_SUPERVISOR_USERS = "nimbus.supervisor.users";
public static final Object NIMBUS_SUPERVISOR_USERS_SCHEMA = ConfigValidation.StringsValidator;
@@ -392,7 +398,7 @@ public class Config extends HashMap<String, Object> {
public static final Object NIMBUS_TASK_LAUNCH_SECS_SCHEMA = Number.class;
/**
- * Whether or not nimbus should reassign tasks if it detects that a task goes down.
+ * Whether or not nimbus should reassign tasks if it detects that a task goes down.
* Defaults to true, and it's not recommended to change this value.
*/
public static final String NIMBUS_REASSIGN = "nimbus.reassign";
@@ -462,7 +468,7 @@ public class Config extends HashMap<String, Object> {
public static final Object LOGVIEWER_CLEANUP_AGE_MINS_SCHEMA = ConfigValidation.PositiveIntegerValidator;
/**
- * A list of users allowed to view logs via the Log Viewer
+ * A list of users allowed to view logs via the Log Viewer
*/
public static final String LOGS_USERS = "logs.users";
public static final Object LOGS_USERS_SCHEMA = ConfigValidation.StringsValidator;
@@ -594,7 +600,7 @@ public class Config extends HashMap<String, Object> {
public static final Object DRPC_AUTHORIZER_ACL_STRICT_SCHEMA = Boolean.class;
/**
- * DRPC thrift server worker threads
+ * DRPC thrift server worker threads
*/
public static final String DRPC_WORKER_THREADS = "drpc.worker.threads";
public static final Object DRPC_WORKER_THREADS_SCHEMA = Number.class;
@@ -606,7 +612,7 @@ public class Config extends HashMap<String, Object> {
public static final Object DRPC_MAX_BUFFER_SIZE_SCHEMA = Number.class;
/**
- * DRPC thrift server queue size
+ * DRPC thrift server queue size
*/
public static final String DRPC_QUEUE_SIZE = "drpc.queue.size";
public static final Object DRPC_QUEUE_SIZE_SCHEMA = Number.class;
@@ -618,13 +624,13 @@ public class Config extends HashMap<String, Object> {
public static final Object DRPC_INVOCATIONS_THRIFT_TRANSPORT_PLUGIN_SCHEMA = String.class;
/**
- * This port on Storm DRPC is used by DRPC topologies to receive function invocations and send results back.
+ * This port on Storm DRPC is used by DRPC topologies to receive function invocations and send results back.
*/
public static final String DRPC_INVOCATIONS_PORT = "drpc.invocations.port";
public static final Object DRPC_INVOCATIONS_PORT_SCHEMA = Number.class;
/**
- * DRPC invocations thrift server worker threads
+ * DRPC invocations thrift server worker threads
*/
public static final String DRPC_INVOCATIONS_THREADS = "drpc.invocations.threads";
public static final Object DRPC_INVOCATIONS_THREADS_SCHEMA = Number.class;
@@ -667,7 +673,7 @@ public class Config extends HashMap<String, Object> {
*/
public static final String SUPERVISOR_SLOTS_PORTS = "supervisor.slots.ports";
public static final Object SUPERVISOR_SLOTS_PORTS_SCHEMA = ConfigValidation.NumbersValidator;
-
+
/**
* A number representing the maximum number of workers any single topology can acquire.
*/
@@ -739,15 +745,15 @@ public class Config extends HashMap<String, Object> {
public static final Object SUPERVISOR_MONITOR_FREQUENCY_SECS_SCHEMA = Number.class;
/**
- * Should the supervior try to run the worker as the lauching user or not. Defaults to false.
+ * Should the supervior try to run the worker as the lauching user or not. Defaults to false.
*/
public static final String SUPERVISOR_RUN_WORKER_AS_USER = "supervisor.run.worker.as.user";
public static final Object SUPERVISOR_RUN_WORKER_AS_USER_SCHEMA = Boolean.class;
/**
- * Full path to the worker-laucher executable that will be used to lauch workers when
+ * Full path to the worker-laucher executable that will be used to lauch workers when
* SUPERVISOR_RUN_WORKER_AS_USER is set to true.
- */
+ */
public static final String SUPERVISOR_WORKER_LAUNCHER = "supervisor.worker.launcher";
public static final Object SUPERVISOR_WORKER_LAUNCHER_SCHEMA = String.class;
@@ -772,7 +778,7 @@ public class Config extends HashMap<String, Object> {
*/
public static final String WORKER_RECEIVER_THREAD_COUNT = "topology.worker.receiver.thread.count";
public static final Object WORKER_RECEIVER_THREAD_COUNT_SCHEMA = Number.class;
-
+
/**
* How often this worker should heartbeat to the supervisor.
*/
@@ -806,7 +812,7 @@ public class Config extends HashMap<String, Object> {
/**
* A list of users that are allowed to interact with the topology. To use this set
- * nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
+ * nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
*/
public static final String TOPOLOGY_USERS = "topology.users";
public static final Object TOPOLOGY_USERS_SCHEMA = ConfigValidation.StringsValidator;
@@ -843,8 +849,8 @@ public class Config extends HashMap<String, Object> {
/**
* How many instances to create for a spout/bolt. A task runs on a thread with zero or more
* other tasks for the same spout/bolt. The number of tasks for a spout/bolt is always
- * the same throughout the lifetime of a topology, but the number of executors (threads) for
- * a spout/bolt can change over time. This allows a topology to scale to more or less resources
+ * the same throughout the lifetime of a topology, but the number of executors (threads) for
+ * a spout/bolt can change over time. This allows a topology to scale to more or less resources
* without redeploying the topology or violating the constraints of Storm (such as a fields grouping
* guaranteeing that the same value goes to the same task).
*/
@@ -883,8 +889,8 @@ public class Config extends HashMap<String, Object> {
/**
* A list of classes that customize storm's kryo instance during start-up.
- * Each listed class name must implement IKryoDecorator. During start-up the
- * listed class is instantiated with 0 arguments, then its 'decorate' method
+ * Each listed class name must implement IKryoDecorator. During start-up the
+ * listed class is instantiated with 0 arguments, then its 'decorate' method
* is called with storm's kryo instance as the only argument.
*/
public static final String TOPOLOGY_KRYO_DECORATORS = "topology.kryo.decorators";
@@ -914,7 +920,7 @@ public class Config extends HashMap<String, Object> {
/*
* A list of classes implementing IMetricsConsumer (See storm.yaml.example for exact config format).
- * Each listed class will be routed all the metrics data generated by the storm metrics API.
+ * Each listed class will be routed all the metrics data generated by the storm metrics API.
* Each listed class maps 1:1 to a system bolt named __metrics_ClassName#N, and it's parallelism is configurable.
*/
public static final String TOPOLOGY_METRICS_CONSUMER_REGISTER = "topology.metrics.consumer.register";
@@ -930,24 +936,24 @@ public class Config extends HashMap<String, Object> {
/**
- * The maximum number of tuples that can be pending on a spout task at any given time.
- * This config applies to individual tasks, not to spouts or topologies as a whole.
- *
+ * The maximum number of tuples that can be pending on a spout task at any given time.
+ * This config applies to individual tasks, not to spouts or topologies as a whole.
+ *
* A pending tuple is one that has been emitted from a spout but has not been acked or failed yet.
- * Note that this config parameter has no effect for unreliable spouts that don't tag
+ * Note that this config parameter has no effect for unreliable spouts that don't tag
* their tuples with a message id.
*/
- public static final String TOPOLOGY_MAX_SPOUT_PENDING="topology.max.spout.pending";
+ public static final String TOPOLOGY_MAX_SPOUT_PENDING="topology.max.spout.pending";
public static final Object TOPOLOGY_MAX_SPOUT_PENDING_SCHEMA = Number.class;
/**
* A class that implements a strategy for what to do when a spout needs to wait. Waiting is
* triggered in one of two conditions:
- *
+ *
* 1. nextTuple emits no tuples
* 2. The spout has hit maxSpoutPending and can't emit any more tuples
*/
- public static final String TOPOLOGY_SPOUT_WAIT_STRATEGY="topology.spout.wait.strategy";
+ public static final String TOPOLOGY_SPOUT_WAIT_STRATEGY="topology.spout.wait.strategy";
public static final Object TOPOLOGY_SPOUT_WAIT_STRATEGY_SCHEMA = String.class;
/**
@@ -970,7 +976,7 @@ public class Config extends HashMap<String, Object> {
public static final Object TOPOLOGY_STATS_SAMPLE_RATE_SCHEMA = Number.class;
/**
- * The time period that builtin metrics data in bucketed into.
+ * The time period that builtin metrics data in bucketed into.
*/
public static final String TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS="topology.builtin.metrics.bucket.size.secs";
public static final Object TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS_SCHEMA = Number.class;
@@ -1003,7 +1009,7 @@ public class Config extends HashMap<String, Object> {
/**
* A list of task hooks that are automatically added to every spout and bolt in the topology. An example
- * of when you'd do this is to add a hook that integrates with your internal
+ * of when you'd do this is to add a hook that integrates with your internal
* monitoring system. These hooks are instantiated using the zero-arg constructor.
*/
public static final String TOPOLOGY_AUTO_TASK_HOOKS="topology.auto.task.hooks";
@@ -1017,7 +1023,7 @@ public class Config extends HashMap<String, Object> {
public static final Object TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE_SCHEMA = ConfigValidation.PowerOf2Validator;
/**
- * The maximum number of messages to batch from the thread receiving off the network to the
+ * The maximum number of messages to batch from the thread receiving off the network to the
* executor queues. Must be a power of 2.
*/
public static final String TOPOLOGY_RECEIVER_BUFFER_SIZE="topology.receiver.buffer.size";
@@ -1051,14 +1057,14 @@ public class Config extends HashMap<String, Object> {
public static final Object TOPOLOGY_DISRUPTOR_WAIT_STRATEGY_SCHEMA = String.class;
/**
- * The size of the shared thread pool for worker tasks to make use of. The thread pool can be accessed
+ * The size of the shared thread pool for worker tasks to make use of. The thread pool can be accessed
* via the TopologyContext.
*/
public static final String TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE="topology.worker.shared.thread.pool.size";
public static final Object TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE_SCHEMA = Number.class;
/**
- * The interval in seconds to use for determining whether to throttle error reported to Zookeeper. For example,
+ * The interval in seconds to use for determining whether to throttle error reported to Zookeeper. For example,
* an interval of 10 seconds with topology.max.error.report.per.interval set to 5 will only allow 5 errors to be
* reported to Zookeeper per task for every 10 second interval of time.
*/
@@ -1081,7 +1087,7 @@ public class Config extends HashMap<String, Object> {
/**
* Name of the topology. This config is automatically set by Storm when the topology is submitted.
*/
- public final static String TOPOLOGY_NAME="topology.name";
+ public final static String TOPOLOGY_NAME="topology.name";
public static final Object TOPOLOGY_NAME_SCHEMA = String.class;
/**
@@ -1095,19 +1101,19 @@ public class Config extends HashMap<String, Object> {
*/
public static final String TOPOLOGY_SUBMITTER_USER = "topology.submitter.user";
public static final Object TOPOLOGY_SUBMITTER_USER_SCHEMA = String.class;
-
+
/**
* Array of components that scheduler should try to place on separate hosts.
*/
public static final String TOPOLOGY_SPREAD_COMPONENTS = "topology.spread.components";
public static final Object TOPOLOGY_SPREAD_COMPONENTS_SCHEMA = ConfigValidation.StringsValidator;
-
+
/**
* A list of IAutoCredentials that the topology should load and use.
*/
public static final String TOPOLOGY_AUTO_CREDENTIALS = "topology.auto-credentials";
public static final Object TOPOLOGY_AUTO_CREDENTIALS_SCHEMA = ConfigValidation.StringsValidator;
-
+
/**
* Max pending tuples in one ShellBolt
*/
@@ -1157,9 +1163,9 @@ public class Config extends HashMap<String, Object> {
/**
* This value is passed to spawned JVMs (e.g., Nimbus, Supervisor, and Workers)
- * for the java.library.path value. java.library.path tells the JVM where
+ * for the java.library.path value. java.library.path tells the JVM where
* to look for native libraries. It is necessary to set this config correctly since
- * Storm uses the ZeroMQ and JZMQ native libs.
+ * Storm uses the ZeroMQ and JZMQ native libs.
*/
public static final String JAVA_LIBRARY_PATH = "java.library.path";
public static final Object JAVA_LIBRARY_PATH_SCHEMA = String.class;
@@ -1178,7 +1184,7 @@ public class Config extends HashMap<String, Object> {
*/
public static final String ISOLATION_SCHEDULER_MACHINES = "isolation.scheduler.machines";
public static final Object ISOLATION_SCHEDULER_MACHINES_SCHEMA = ConfigValidation.MapOfStringToNumberValidator;
-
+
/**
* A map from the user name to the number of machines that should that user is allowed to use. Set storm.scheduler
* to backtype.storm.scheduler.multitenant.MultitenantScheduler
@@ -1192,15 +1198,15 @@ public class Config extends HashMap<String, Object> {
*/
public static final String TOPOLOGY_ISOLATED_MACHINES = "topology.isolate.machines";
public static final Object TOPOLOGY_ISOLATED_MACHINES_SCHEMA = Number.class;
-
+
public static void setDebug(Map conf, boolean isOn) {
conf.put(Config.TOPOLOGY_DEBUG, isOn);
- }
+ }
public void setDebug(boolean isOn) {
setDebug(this, isOn);
}
-
+
public static void setNumWorkers(Map conf, int workers) {
conf.put(Config.TOPOLOGY_WORKERS, workers);
}
@@ -1216,7 +1222,7 @@ public class Config extends HashMap<String, Object> {
public void setNumAckers(int numExecutors) {
setNumAckers(this, numExecutors);
}
-
+
public static void setMessageTimeoutSecs(Map conf, int secs) {
conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, secs);
}
@@ -1224,7 +1230,7 @@ public class Config extends HashMap<String, Object> {
public void setMessageTimeoutSecs(int secs) {
setMessageTimeoutSecs(this, secs);
}
-
+
public static void registerSerialization(Map conf, Class klass) {
getRegisteredSerializations(conf).add(klass.getName());
}
@@ -1232,17 +1238,17 @@ public class Config extends HashMap<String, Object> {
public void registerSerialization(Class klass) {
registerSerialization(this, klass);
}
-
+
public static void registerSerialization(Map conf, Class klass, Class<? extends Serializer> serializerClass) {
Map<String, String> register = new HashMap<String, String>();
register.put(klass.getName(), serializerClass.getName());
- getRegisteredSerializations(conf).add(register);
+ getRegisteredSerializations(conf).add(register);
}
public void registerSerialization(Class klass, Class<? extends Serializer> serializerClass) {
registerSerialization(this, klass, serializerClass);
}
-
+
public void registerMetricsConsumer(Class klass, Object argument, long parallelismHint) {
HashMap m = new HashMap();
m.put("class", klass.getCanonicalName());
@@ -1270,7 +1276,7 @@ public class Config extends HashMap<String, Object> {
public void registerDecorator(Class<? extends IKryoDecorator> klass) {
registerDecorator(this, klass);
}
-
+
public static void setKryoFactory(Map conf, Class<? extends IKryoFactory> klass) {
conf.put(Config.TOPOLOGY_KRYO_FACTORY, klass.getName());
}
@@ -1286,7 +1292,7 @@ public class Config extends HashMap<String, Object> {
public void setSkipMissingKryoRegistrations(boolean skip) {
setSkipMissingKryoRegistrations(this, skip);
}
-
+
public static void setMaxTaskParallelism(Map conf, int max) {
conf.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, max);
}
@@ -1294,7 +1300,7 @@ public class Config extends HashMap<String, Object> {
public void setMaxTaskParallelism(int max) {
setMaxTaskParallelism(this, max);
}
-
+
public static void setMaxSpoutPending(Map conf, int max) {
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, max);
}
@@ -1302,23 +1308,23 @@ public class Config extends HashMap<String, Object> {
public void setMaxSpoutPending(int max) {
setMaxSpoutPending(this, max);
}
-
+
public static void setStatsSampleRate(Map conf, double rate) {
conf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, rate);
- }
+ }
public void setStatsSampleRate(double rate) {
setStatsSampleRate(this, rate);
- }
+ }
public static void setFallBackOnJavaSerialization(Map conf, boolean fallback) {
conf.put(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION, fallback);
- }
+ }
public void setFallBackOnJavaSerialization(boolean fallback) {
setFallBackOnJavaSerialization(this, fallback);
- }
-
+ }
+
private static List getRegisteredSerializations(Map conf) {
List ret;
if(!conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) {
@@ -1329,13 +1335,13 @@ public class Config extends HashMap<String, Object> {
conf.put(Config.TOPOLOGY_KRYO_REGISTER, ret);
return ret;
}
-
+
private static List getRegisteredDecorators(Map conf) {
List ret;
if(!conf.containsKey(Config.TOPOLOGY_KRYO_DECORATORS)) {
ret = new ArrayList();
} else {
- ret = new ArrayList((List) conf.get(Config.TOPOLOGY_KRYO_DECORATORS));
+ ret = new ArrayList((List) conf.get(Config.TOPOLOGY_KRYO_DECORATORS));
}
conf.put(Config.TOPOLOGY_KRYO_DECORATORS, ret);
return ret;
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/9e13a356/storm-core/src/jvm/backtype/storm/security/auth/IGroupMappingServiceProvider.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/IGroupMappingServiceProvider.java b/storm-core/src/jvm/backtype/storm/security/auth/IGroupMappingServiceProvider.java
index 0b49dec..5590b81 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/IGroupMappingServiceProvider.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/IGroupMappingServiceProvider.java
@@ -39,16 +39,4 @@ public interface IGroupMappingServiceProvider {
*/
public Set<String> getGroups(String user) throws IOException;
- /**
- * Refresh the cache of groups and user mapping
- * @throws IOException
- */
- public void cacheGroupsRefresh() throws IOException;
- /**
- * Caches the group user information
- * @param groups list of groups to add to cache
- * @throws IOException
- */
- public void cacheGroupsAdd(Set<String> groups) throws IOException;
-
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/9e13a356/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedUnixGroupsMapping.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedUnixGroupsMapping.java b/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedUnixGroupsMapping.java
index b8c8323..438b938 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedUnixGroupsMapping.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/ShellBasedUnixGroupsMapping.java
@@ -23,7 +23,10 @@ import java.util.Set;
import java.util.HashSet;
import java.util.Map;
import java.util.StringTokenizer;
+import backtype.storm.Config;
+import backtype.storm.utils.Utils;
import backtype.storm.utils.ShellUtils;
+import backtype.storm.utils.TimeCacheMap;
import backtype.storm.utils.ShellUtils.ExitCodeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,12 +36,17 @@ public class ShellBasedUnixGroupsMapping implements
IGroupMappingServiceProvider {
public static Logger LOG = LoggerFactory.getLogger(ShellBasedUnixGroupsMapping.class);
+ public TimeCacheMap<String, Set<String>> cachedGroups;
/**
* Invoked once immediately after construction
* @param storm_conf Storm configuration
*/
- public void prepare(Map storm_conf) {}
+ @Override
+ public void prepare(Map storm_conf) {
+ int timeout = Utils.getInt(storm_conf.get(Config.STORM_GROUP_MAPPING_SERVICE_CACHE_DURATION_SECS));
+ cachedGroups = new TimeCacheMap<String, Set<String>>(timeout);
+ }
/**
* Returns list of groups for a user
@@ -48,15 +56,13 @@ public class ShellBasedUnixGroupsMapping implements
*/
@Override
public Set<String> getGroups(String user) throws IOException {
- return getUnixGroups(user);
- }
-
- @Override
- public void cacheGroupsRefresh() throws IOException {
- }
-
- @Override
- public void cacheGroupsAdd(Set<String> groups) throws IOException {
+ if(cachedGroups.containsKey(user)) {
+ return cachedGroups.get(user);
+ }
+ Set<String> groups = getUnixGroups(user);
+ if(!groups.isEmpty())
+ cachedGroups.put(user,groups);
+ return groups;
}
/**