You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/05/21 18:07:00 UTC

[05/14] STORM-216: Added Authentication and Authorization.

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/native/worker-launcher/impl/configuration.h
----------------------------------------------------------------------
diff --git a/storm-core/src/native/worker-launcher/impl/configuration.h b/storm-core/src/native/worker-launcher/impl/configuration.h
new file mode 100644
index 0000000..b0d4814
--- /dev/null
+++ b/storm-core/src/native/worker-launcher/impl/configuration.h
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Ensure that the configuration file and all of the containing directories
+ * are only writable by root. Otherwise, an attacker can change the 
+ * configuration and potentially cause damage.
+ * returns 0 if permissions are ok
+ */
+int check_configuration_permissions(const char* file_name);
+
+// read the given configuration file
+void read_config(const char* config_file);
+
+//method exposed to get the configurations
+char *get_value(const char* key);
+
+//function to return array of values pointing to the key. Values are
+//comma seperated strings.
+char ** get_values(const char* key);
+
+// Extracts array of values from the comma separated list of values.
+char ** extract_values(char *value);
+
+// free the memory returned by get_values
+void free_values(char** values);
+
+//method to free allocated configuration
+void free_configurations();
+

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/native/worker-launcher/impl/main.c
----------------------------------------------------------------------
diff --git a/storm-core/src/native/worker-launcher/impl/main.c b/storm-core/src/native/worker-launcher/impl/main.c
new file mode 100644
index 0000000..7067cf9
--- /dev/null
+++ b/storm-core/src/native/worker-launcher/impl/main.c
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "configuration.h"
+#include "worker-launcher.h"
+
+#include <errno.h>
+#include <grp.h>
+#include <limits.h>
+#include <unistd.h>
+#include <signal.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <strings.h>
+#include <sys/stat.h>
+
+#define _STRINGIFY(X) #X
+#define STRINGIFY(X) _STRINGIFY(X)
+#define CONF_FILENAME "worker-launcher.cfg"
+
+#ifndef EXEC_CONF_DIR
+  #error EXEC_CONF_DIR must be defined
+#endif
+
+void display_usage(FILE *stream) {
+  fprintf(stream,
+          "Usage: worker-launcher --checksetup\n");
+  fprintf(stream,
+      "Usage: worker-launcher user command command-args\n");
+  fprintf(stream, "Commands:\n");
+  fprintf(stream, "   initialize stormdist dir: code-dir <code-directory>\n");
+  fprintf(stream, "   remove a file/directory: rmr <directory>\n");
+  fprintf(stream, "   launch a worker: worker <working-directory> <script-to-run>\n");
+  fprintf(stream, "   signal a worker: signal <pid> <signal>\n");
+}
+
+int main(int argc, char **argv) {
+  int invalid_args = 0; 
+  int do_check_setup = 0;
+  
+  LOGFILE = stdout;
+  ERRORFILE = stderr;
+
+  // Minimum number of arguments required to run 
+  // the std. worker-launcher commands is 3
+  // 3 args not needed for checksetup option
+  if (argc < 3) {
+    invalid_args = 1;
+    if (argc == 2) {
+      const char *arg1 = argv[1];
+      if (strcmp("--checksetup", arg1) == 0) {
+        invalid_args = 0;
+        do_check_setup = 1;        
+      }
+    }
+  }
+  
+  if (invalid_args != 0) {
+    display_usage(stdout);
+    return INVALID_ARGUMENT_NUMBER;
+  }
+
+  const char * command = NULL;
+  const char * working_dir = NULL;
+
+  int exit_code = 0;
+
+  char *executable_file = get_executable();
+
+  char *orig_conf_file = STRINGIFY(EXEC_CONF_DIR) "/" CONF_FILENAME;
+  char *conf_file = realpath(orig_conf_file, NULL);
+
+  if (conf_file == NULL) {
+    fprintf(ERRORFILE, "Configuration file %s not found.\n", orig_conf_file);
+    exit(INVALID_CONFIG_FILE);
+  }
+  if (check_configuration_permissions(conf_file) != 0) {
+    exit(INVALID_CONFIG_FILE);
+  }
+  read_config(conf_file);
+  free(conf_file);
+  conf_file = NULL;
+
+  // look up the node manager group in the config file
+  char *nm_group = get_value(LAUNCHER_GROUP_KEY);
+  if (nm_group == NULL) {
+    fprintf(ERRORFILE, "Can't get configured value for %s.\n", LAUNCHER_GROUP_KEY);
+    exit(INVALID_CONFIG_FILE);
+  }
+  struct group *group_info = getgrnam(nm_group);
+  if (group_info == NULL) {
+    fprintf(ERRORFILE, "Can't get group information for %s - %s.\n", nm_group,
+            strerror(errno));
+    fflush(LOGFILE);
+    exit(INVALID_CONFIG_FILE);
+  }
+
+  set_launcher_uid(getuid(), group_info->gr_gid);
+  // if we are running from a setuid executable, make the real uid root
+  setuid(0);
+  // set the real and effective group id to the node manager group
+  setgid(group_info->gr_gid);
+
+  if (check_executor_permissions(executable_file) != 0) {
+    fprintf(ERRORFILE, "Invalid permissions on worker-launcher binary.\n");
+    return INVALID_CONTAINER_EXEC_PERMISSIONS;
+  }
+
+  if (do_check_setup != 0) {
+    // basic setup checks done
+    // verified configs available and valid
+    // verified executor permissions
+    return 0;
+  }
+
+  //checks done for user name
+  if (argv[optind] == NULL) {
+    fprintf(ERRORFILE, "Invalid user name.\n");
+    return INVALID_USER_NAME;
+  }
+
+  int ret = set_user(argv[optind]);
+  if (ret != 0) {
+    return ret;
+  }
+ 
+  optind = optind + 1;
+  command = argv[optind++];
+
+  fprintf(LOGFILE, "main : command provided %s\n",command);
+  fprintf(LOGFILE, "main : user is %s\n", user_detail->pw_name);
+  fflush(LOGFILE);
+
+  if (strcasecmp("code-dir", command) == 0) {
+    if (argc != 4) {
+      fprintf(ERRORFILE, "Incorrect number of arguments (%d vs 4) for code-dir\n",
+	      argc);
+      fflush(ERRORFILE);
+      return INVALID_ARGUMENT_NUMBER;
+    }
+    exit_code = setup_stormdist_dir(argv[optind]);
+  } else if (strcasecmp("rmr", command) == 0) {
+    if (argc != 4) {
+      fprintf(ERRORFILE, "Incorrect number of arguments (%d vs 4) for rmr\n",
+	      argc);
+      fflush(ERRORFILE);
+      return INVALID_ARGUMENT_NUMBER;
+    }
+    exit_code= delete_as_user(user_detail->pw_name, argv[optind],
+                              NULL);
+  } else if (strcasecmp("worker", command) == 0) {
+    if (argc != 5) {
+      fprintf(ERRORFILE, "Incorrect number of arguments (%d vs 5) for worker\n",
+	      argc);
+      fflush(ERRORFILE);
+      return INVALID_ARGUMENT_NUMBER;
+    }
+    working_dir = argv[optind++];
+    exit_code = setup_stormdist_dir(working_dir);
+    if (exit_code == 0) {
+      exit_code = exec_as_user(working_dir, argv[optind]);
+    }
+  } else if (strcasecmp("signal", command) == 0) {
+    if (argc != 5) {
+      fprintf(ERRORFILE, "Incorrect number of arguments (%d vs 5) for signal\n",
+	      argc);
+      fflush(ERRORFILE);
+      return INVALID_ARGUMENT_NUMBER;
+    }
+    char* end_ptr = NULL;
+    char* option = argv[optind++];
+    int container_pid = strtol(option, &end_ptr, 10);
+    if (option == end_ptr || *end_ptr != '\0') {
+      fprintf(ERRORFILE, "Illegal argument for container pid %s\n", option);
+      fflush(ERRORFILE);
+      return INVALID_ARGUMENT_NUMBER;
+    }
+    option = argv[optind++];
+    int signal = strtol(option, &end_ptr, 10);
+    if (option == end_ptr || *end_ptr != '\0') {
+      fprintf(ERRORFILE, "Illegal argument for signal %s\n", option);
+      fflush(ERRORFILE);
+      return INVALID_ARGUMENT_NUMBER;
+    }
+    exit_code = signal_container_as_user(user_detail->pw_name, container_pid, signal);
+  } else {
+    fprintf(ERRORFILE, "Invalid command %s not supported.",command);
+    fflush(ERRORFILE);
+    exit_code = INVALID_COMMAND_PROVIDED;
+  }
+  fclose(LOGFILE);
+  fclose(ERRORFILE);
+  return exit_code;
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/native/worker-launcher/impl/worker-launcher.c
----------------------------------------------------------------------
diff --git a/storm-core/src/native/worker-launcher/impl/worker-launcher.c b/storm-core/src/native/worker-launcher/impl/worker-launcher.c
new file mode 100644
index 0000000..81d7075
--- /dev/null
+++ b/storm-core/src/native/worker-launcher/impl/worker-launcher.c
@@ -0,0 +1,779 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "configuration.h"
+#include "worker-launcher.h"
+
+#include <dirent.h>
+#include <fcntl.h>
+#include <fts.h>
+#include <errno.h>
+#include <grp.h>
+#include <unistd.h>
+#include <signal.h>
+#include <stdarg.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/stat.h>
+
+static const int DEFAULT_MIN_USERID = 1000;
+
+static const char* DEFAULT_BANNED_USERS[] = {"bin", 0};
+
+//struct to store the user details
+struct passwd *user_detail = NULL;
+
+FILE* LOGFILE = NULL;
+FILE* ERRORFILE = NULL;
+
+static uid_t launcher_uid = -1;
+static gid_t launcher_gid = -1;
+
+char *concatenate(char *concat_pattern, char *return_path_name,
+   int numArgs, ...);
+
+void set_launcher_uid(uid_t user, gid_t group) {
+  launcher_uid = user;
+  launcher_gid = group;
+}
+
+/**
+ * get the executable filename.
+ */
+char* get_executable() {
+  char buffer[PATH_MAX];
+  snprintf(buffer, PATH_MAX, "/proc/%u/exe", getpid());
+  char *filename = malloc(PATH_MAX);
+  if (NULL == filename) {
+    fprintf(ERRORFILE, "malloc failed in get_executable\n");
+    exit(-1);
+  }
+  ssize_t len = readlink(buffer, filename, PATH_MAX);
+  if (len == -1) {
+    fprintf(ERRORFILE, "Can't get executable name from %s - %s\n", buffer,
+            strerror(errno));
+    exit(-1);
+  } else if (len >= PATH_MAX) {
+    fprintf(ERRORFILE, "Executable name %.*s is longer than %d characters.\n",
+            PATH_MAX, filename, PATH_MAX);
+    exit(-1);
+  }
+  filename[len] = '\0';
+  return filename;
+}
+
+int check_executor_permissions(char *executable_file) {
+  errno = 0;
+  char * resolved_path = realpath(executable_file, NULL);
+  if (resolved_path == NULL) {
+    fprintf(ERRORFILE,
+        "Error resolving the canonical name for the executable : %s!",
+        strerror(errno));
+    return -1;
+  }
+
+  struct stat filestat;
+  errno = 0;
+  if (stat(resolved_path, &filestat) != 0) {
+    fprintf(ERRORFILE, 
+            "Could not stat the executable : %s!.\n", strerror(errno));
+    return -1;
+  }
+
+  uid_t binary_euid = filestat.st_uid; // Binary's user owner
+  gid_t binary_gid = filestat.st_gid; // Binary's group owner
+
+  // Effective uid should be root
+  if (binary_euid != 0) {
+    fprintf(LOGFILE,
+        "The worker-launcher binary should be user-owned by root.\n");
+    return -1;
+  }
+
+  if (binary_gid != getgid()) {
+    fprintf(LOGFILE, "The configured nodemanager group %d is different from"
+            " the group of the executable %d\n", getgid(), binary_gid);
+    return -1;
+  }
+
+  // check others do not have read/write/execute permissions
+  if ((filestat.st_mode & S_IROTH) == S_IROTH || (filestat.st_mode & S_IWOTH)
+      == S_IWOTH || (filestat.st_mode & S_IXOTH) == S_IXOTH) {
+    fprintf(LOGFILE,
+            "The worker-launcher binary should not have read or write or"
+            " execute for others.\n");
+    return -1;
+  }
+
+  // Binary should be setuid/setgid executable
+  if ((filestat.st_mode & S_ISUID) == 0) {
+    fprintf(LOGFILE, "The worker-launcher binary should be set setuid.\n");
+    return -1;
+  }
+
+  return 0;
+}
+
+/**
+ * Change the effective user id to limit damage.
+ */
+static int change_effective_user(uid_t user, gid_t group) {
+  if (geteuid() == user && getegid() == group) {
+    return 0;
+  }
+  if (seteuid(0) != 0) {
+    return -1;
+  }
+  if (setegid(group) != 0) {
+    fprintf(LOGFILE, "Failed to set effective group id %d - %s\n", group,
+            strerror(errno));
+    return -1;
+  }
+  if (seteuid(user) != 0) {
+    fprintf(LOGFILE, "Failed to set effective user id %d - %s\n", user,
+            strerror(errno));
+    return -1;
+  }
+  return 0;
+}
+
+/**
+ * Change the real and effective user and group to abandon the super user
+ * priviledges.
+ */
+int change_user(uid_t user, gid_t group) {
+  if (user == getuid() && user == geteuid() && 
+      group == getgid() && group == getegid()) {
+    return 0;
+  }
+
+  if (seteuid(0) != 0) {
+    fprintf(LOGFILE, "unable to reacquire root - %s\n", strerror(errno));
+    fprintf(LOGFILE, "Real: %d:%d; Effective: %d:%d\n",
+	    getuid(), getgid(), geteuid(), getegid());
+    return SETUID_OPER_FAILED;
+  }
+  if (setgid(group) != 0) {
+    fprintf(LOGFILE, "unable to set group to %d - %s\n", group, 
+            strerror(errno));
+    fprintf(LOGFILE, "Real: %d:%d; Effective: %d:%d\n",
+	    getuid(), getgid(), geteuid(), getegid());
+    return SETUID_OPER_FAILED;
+  }
+  if (setuid(user) != 0) {
+    fprintf(LOGFILE, "unable to set user to %d - %s\n", user, strerror(errno));
+    fprintf(LOGFILE, "Real: %d:%d; Effective: %d:%d\n",
+	    getuid(), getgid(), geteuid(), getegid());
+    return SETUID_OPER_FAILED;
+  }
+
+  return 0;
+}
+
+/**
+ * Utility function to concatenate argB to argA using the concat_pattern.
+ */
+char *concatenate(char *concat_pattern, char *return_path_name, 
+                  int numArgs, ...) {
+  va_list ap;
+  va_start(ap, numArgs);
+  int strlen_args = 0;
+  char *arg = NULL;
+  int j;
+  for (j = 0; j < numArgs; j++) {
+    arg = va_arg(ap, char*);
+    if (arg == NULL) {
+      fprintf(LOGFILE, "One of the arguments passed for %s in null.\n",
+          return_path_name);
+      return NULL;
+    }
+    strlen_args += strlen(arg);
+  }
+  va_end(ap);
+
+  char *return_path = NULL;
+  int str_len = strlen(concat_pattern) + strlen_args + 1;
+
+  return_path = (char *) malloc(str_len);
+  if (return_path == NULL) {
+    fprintf(LOGFILE, "Unable to allocate memory for %s.\n", return_path_name);
+    return NULL;
+  }
+  va_start(ap, numArgs);
+  vsnprintf(return_path, str_len, concat_pattern, ap);
+  va_end(ap);
+  return return_path;
+}
+
+char *get_container_launcher_file(const char* work_dir) {
+  return concatenate("%s/%s", "container launcher", 2, work_dir, CONTAINER_SCRIPT);
+}
+
+/**
+ * Get the tmp directory under the working directory
+ */
+char *get_tmp_directory(const char *work_dir) {
+  return concatenate("%s/%s", "tmp dir", 2, work_dir, TMP_DIR);
+}
+
+/**
+ * Load the user information for a given user name.
+ */
+static struct passwd* get_user_info(const char* user) {
+  int string_size = sysconf(_SC_GETPW_R_SIZE_MAX);
+  void* buffer = malloc(string_size + sizeof(struct passwd));
+  if (buffer == NULL) {
+    fprintf(LOGFILE, "Malloc failed in get_user_info\n");
+    return NULL;
+  }
+  struct passwd *result = NULL;
+  if (getpwnam_r(user, buffer, buffer + sizeof(struct passwd), string_size,
+		 &result) != 0) {
+    free(buffer);
+    buffer = NULL;
+    fprintf(LOGFILE, "Can't get user information %s - %s\n", user,
+	    strerror(errno));
+    return NULL;
+  }
+  return result;
+}
+
+/**
+ * Is the user a real user account?
+ * Checks:
+ *   1. Not root
+ *   2. UID is above the minimum configured.
+ *   3. Not in banned user list
+ * Returns NULL on failure
+ */
+struct passwd* check_user(const char *user) {
+  if (strcmp(user, "root") == 0) {
+    fprintf(LOGFILE, "Running as root is not allowed\n");
+    fflush(LOGFILE);
+    return NULL;
+  }
+  char *min_uid_str = get_value(MIN_USERID_KEY);
+  int min_uid = DEFAULT_MIN_USERID;
+  if (min_uid_str != NULL) {
+    char *end_ptr = NULL;
+    min_uid = strtol(min_uid_str, &end_ptr, 10);
+    if (min_uid_str == end_ptr || *end_ptr != '\0') {
+      fprintf(LOGFILE, "Illegal value of %s for %s in configuration\n", 
+	      min_uid_str, MIN_USERID_KEY);
+      fflush(LOGFILE);
+      free(min_uid_str);
+      min_uid_str = NULL;
+      return NULL;
+    }
+    free(min_uid_str);
+    min_uid_str = NULL;
+  }
+  struct passwd *user_info = get_user_info(user);
+  if (NULL == user_info) {
+    fprintf(LOGFILE, "User %s not found\n", user);
+    fflush(LOGFILE);
+    return NULL;
+  }
+  if (user_info->pw_uid < min_uid) {
+    fprintf(LOGFILE, "Requested user %s has id %d, which is below the "
+	    "minimum allowed %d\n", user, user_info->pw_uid, min_uid);
+    fflush(LOGFILE);
+    free(user_info);
+    user_info = NULL;
+    return NULL;
+  }
+  char **banned_users = get_values(BANNED_USERS_KEY);
+  char **banned_user = (banned_users == NULL) ? 
+    (char**) DEFAULT_BANNED_USERS : banned_users;
+  for(; *banned_user; ++banned_user) {
+    if (strcmp(*banned_user, user) == 0) {
+      free(user_info);
+      user_info = NULL;
+      if (banned_users != (char**)DEFAULT_BANNED_USERS) {
+        free_values(banned_users);
+        banned_users = NULL;
+      }
+      fprintf(LOGFILE, "Requested user %s is banned\n", user);
+      return NULL;
+    }
+  }
+  if (banned_users != NULL && banned_users != (char**)DEFAULT_BANNED_USERS) {
+    free_values(banned_users);
+    banned_users = NULL;
+  }
+  return user_info;
+}
+
+/**
+ * function used to populate and user_details structure.
+ */
+int set_user(const char *user) {
+  // free any old user
+  if (user_detail != NULL) {
+    free(user_detail);
+    user_detail = NULL;
+  }
+  user_detail = check_user(user);
+  if (user_detail == NULL) {
+    return -1;
+  }
+
+  if (geteuid() == user_detail->pw_uid) {
+    return 0;
+  }
+
+  if (initgroups(user, user_detail->pw_gid) != 0) {
+    fprintf(LOGFILE, "Error setting supplementary groups for user %s: %s\n",
+        user, strerror(errno));
+    return -1;
+  }
+
+  return change_effective_user(user_detail->pw_uid, user_detail->pw_gid);
+}
+
+/**
+ * Open a file as the node manager and return a file descriptor for it.
+ * Returns -1 on error
+ */
+static int open_file_as_nm(const char* filename) {
+  uid_t user = geteuid();
+  gid_t group = getegid();
+  if (change_effective_user(launcher_uid, launcher_gid) != 0) {
+    return -1;
+  }
+  int result = open(filename, O_RDONLY);
+  if (result == -1) {
+    fprintf(LOGFILE, "Can't open file %s as node manager - %s\n", filename,
+	    strerror(errno));
+  }
+  if (change_effective_user(user, group)) {
+    result = -1;
+  }
+  return result;
+}
+
+/**
+ * Copy a file from a fd to a given filename.
+ * The new file must not exist and it is created with permissions perm.
+ * The input stream is closed.
+ * Return 0 if everything is ok.
+ */
+static int copy_file(int input, const char* in_filename, 
+		     const char* out_filename, mode_t perm) {
+  const int buffer_size = 128*1024;
+  char buffer[buffer_size];
+  int out_fd = open(out_filename, O_WRONLY|O_CREAT|O_EXCL|O_NOFOLLOW, perm);
+  if (out_fd == -1) {
+    fprintf(LOGFILE, "Can't open %s for output - %s\n", out_filename, 
+            strerror(errno));
+    return -1;
+  }
+  ssize_t len = read(input, buffer, buffer_size);
+  while (len > 0) {
+    ssize_t pos = 0;
+    while (pos < len) {
+      ssize_t write_result = write(out_fd, buffer + pos, len - pos);
+      if (write_result <= 0) {
+	fprintf(LOGFILE, "Error writing to %s - %s\n", out_filename,
+		strerror(errno));
+	close(out_fd);
+	return -1;
+      }
+      pos += write_result;
+    }
+    len = read(input, buffer, buffer_size);
+  }
+  if (len < 0) {
+    fprintf(LOGFILE, "Failed to read file %s - %s\n", in_filename, 
+	    strerror(errno));
+    close(out_fd);
+    return -1;
+  }
+  if (close(out_fd) != 0) {
+    fprintf(LOGFILE, "Failed to close file %s - %s\n", out_filename, 
+	    strerror(errno));
+    return -1;
+  }
+  close(input);
+  return 0;
+}
+
+int setup_stormdist(FTSENT* entry, uid_t euser) {
+  if (lchown(entry->fts_path, euser, launcher_gid) != 0) {
+    fprintf(ERRORFILE, "Failure to exec app initialization process - %s\n",
+      strerror(errno));
+     return -1;
+  }
+  mode_t mode = entry->fts_statp->st_mode;
+  mode_t new_mode = (mode & (S_IRWXU)) | S_IRGRP | S_IWGRP;
+  if ((mode & S_IXUSR) == S_IXUSR) {
+    new_mode = new_mode | S_IXGRP;
+  }
+  if ((mode & S_IFDIR) == S_IFDIR) {
+    new_mode = new_mode | S_ISGID;
+  }
+  if (chmod(entry->fts_path, new_mode) != 0) {
+    fprintf(ERRORFILE, "Failure to exec app initialization process - %s\n",
+      strerror(errno));
+    return -1;
+  }
+  return 0;
+}
+
+int setup_stormdist_dir(const char* local_dir) {
+  //This is the same as
+  //> chmod g+rwX -R $local_dir
+  //> chown -no-dereference -R $user:$supervisor-group $local_dir 
+
+  int exit_code = 0;
+  uid_t euser = geteuid();
+
+  if (local_dir == NULL) {
+    fprintf(ERRORFILE, "Path is null\n");
+    exit_code = UNABLE_TO_BUILD_PATH; // may be malloc failed
+  } else {
+    char *(paths[]) = {strndup(local_dir,PATH_MAX), 0};
+    if (paths[0] == NULL) {
+      fprintf(ERRORFILE, "Malloc failed in setup_stormdist_dir\n");
+      return -1;
+    }
+    // check to make sure the directory exists
+    if (access(local_dir, F_OK) != 0) {
+      if (errno == ENOENT) {
+        fprintf(ERRORFILE, "Path does not exist %s\n", local_dir);
+        free(paths[0]);
+        paths[0] = NULL;
+        return UNABLE_TO_BUILD_PATH;
+      }
+    }
+    FTS* tree = fts_open(paths, FTS_PHYSICAL | FTS_XDEV, NULL);
+    FTSENT* entry = NULL;
+    int ret = 0;
+
+    if (tree == NULL) {
+      fprintf(ERRORFILE,
+              "Cannot open file traversal structure for the path %s:%s.\n", 
+              local_dir, strerror(errno));
+      free(paths[0]);
+      paths[0] = NULL;
+      return -1;
+    }
+
+    if (seteuid(0) != 0) {
+      fprintf(ERRORFILE, "Could not become root\n");
+      return -1;
+    }
+
+    while (((entry = fts_read(tree)) != NULL) && exit_code == 0) {
+      switch (entry->fts_info) {
+
+      case FTS_DP:        // A directory being visited in post-order
+      case FTS_DOT:       // A dot directory
+        //NOOP
+        fprintf(LOGFILE, "NOOP: %s\n", entry->fts_path); break;
+      case FTS_D:         // A directory in pre-order
+      case FTS_F:         // A regular file
+      case FTS_SL:        // A symbolic link
+      case FTS_SLNONE:    // A broken symbolic link
+        //TODO it would be good to validate that the file is owned by the correct user first.
+        fprintf(LOGFILE, "visiting: %s\n", entry->fts_path);
+        if (setup_stormdist(entry, euser) != 0) {
+          exit_code = -1;
+        }
+        break;
+      case FTS_DEFAULT:   // Unknown type of file
+      case FTS_DNR:       // Unreadable directory
+      case FTS_NS:        // A file with no stat(2) information
+      case FTS_DC:        // A directory that causes a cycle
+      case FTS_NSOK:      // No stat information requested
+      case FTS_ERR:       // Error return
+      default:
+        fprintf(LOGFILE, "Unexpected...\n"); break;
+        exit_code = -1;
+        break;
+      }
+    }
+    ret = fts_close(tree);
+    free(paths[0]);
+    paths[0] = NULL;
+  }
+  return exit_code;
+}
+
+
+int signal_container_as_user(const char *user, int pid, int sig) {
+  if(pid <= 0) {
+    return INVALID_CONTAINER_PID;
+  }
+
+  if (change_user(user_detail->pw_uid, user_detail->pw_gid) != 0) {
+    return SETUID_OPER_FAILED;
+  }
+
+  //Don't continue if the process-group is not alive anymore.
+  int has_group = 1;
+  if (kill(-pid,0) < 0) {
+    if (kill(pid, 0) < 0) {
+      if (errno == ESRCH) {
+        return INVALID_CONTAINER_PID;
+      }
+      fprintf(LOGFILE, "Error signalling container %d with %d - %s\n",
+	      pid, sig, strerror(errno));
+      return -1;
+    } else {
+      has_group = 0;
+    }
+  }
+
+  if (kill((has_group ? -1 : 1) * pid, sig) < 0) {
+    if(errno != ESRCH) {
+      fprintf(LOGFILE, 
+              "Error signalling process group %d with signal %d - %s\n", 
+              -pid, sig, strerror(errno));
+      fprintf(stderr, 
+              "Error signalling process group %d with signal %d - %s\n", 
+              -pid, sig, strerror(errno));
+      fflush(LOGFILE);
+      return UNABLE_TO_SIGNAL_CONTAINER;
+    } else {
+      return INVALID_CONTAINER_PID;
+    }
+  }
+  fprintf(LOGFILE, "Killing process %s%d with %d\n",
+	  (has_group ? "group " :""), pid, sig);
+  return 0;
+}
+
+/**
+ * Delete a final directory as the node manager user.
+ */
+static int rmdir_as_nm(const char* path) {
+  int user_uid = geteuid();
+  int user_gid = getegid();
+  int ret = change_effective_user(launcher_uid, launcher_gid);
+  if (ret == 0) {
+    if (rmdir(path) != 0) {
+      fprintf(LOGFILE, "rmdir of %s failed - %s\n", path, strerror(errno));
+      ret = -1;
+    }
+  }
+  // always change back
+  if (change_effective_user(user_uid, user_gid) != 0) {
+    ret = -1;
+  }
+  return ret;
+}
+
+/**
+ * Recursively delete the given path.
+ * full_path : the path to delete
+ * needs_tt_user: the top level directory must be deleted by the tt user.
+ */
+static int delete_path(const char *full_path, 
+                       int needs_tt_user) {
+  int exit_code = 0;
+
+  if (full_path == NULL) {
+    fprintf(LOGFILE, "Path is null\n");
+    exit_code = UNABLE_TO_BUILD_PATH; // may be malloc failed
+  } else {
+    char *(paths[]) = {strndup(full_path,PATH_MAX), 0};
+    if (paths[0] == NULL) {
+      fprintf(LOGFILE, "Malloc failed in delete_path\n");
+      return -1;
+    }
+    // check to make sure the directory exists
+    if (access(full_path, F_OK) != 0) {
+      if (errno == ENOENT) {
+        free(paths[0]);
+        paths[0] = NULL;
+        return 0;
+      }
+    }
+    FTS* tree = fts_open(paths, FTS_PHYSICAL | FTS_XDEV, NULL);
+    FTSENT* entry = NULL;
+    int ret = 0;
+
+    if (tree == NULL) {
+      fprintf(LOGFILE,
+              "Cannot open file traversal structure for the path %s:%s.\n", 
+              full_path, strerror(errno));
+      free(paths[0]);
+      paths[0] = NULL;
+      return -1;
+    }
+    while (((entry = fts_read(tree)) != NULL) && exit_code == 0) {
+      switch (entry->fts_info) {
+
+      case FTS_DP:        // A directory being visited in post-order
+        if (!needs_tt_user ||
+            strcmp(entry->fts_path, full_path) != 0) {
+          if (rmdir(entry->fts_accpath) != 0) {
+            fprintf(LOGFILE, "Couldn't delete directory %s - %s\n", 
+                    entry->fts_path, strerror(errno));
+            exit_code = -1;
+          }
+        }
+        break;
+
+      case FTS_F:         // A regular file
+      case FTS_SL:        // A symbolic link
+      case FTS_SLNONE:    // A broken symbolic link
+      case FTS_DEFAULT:   // Unknown type of file
+        if (unlink(entry->fts_accpath) != 0) {
+          fprintf(LOGFILE, "Couldn't delete file %s - %s\n", entry->fts_path,
+                  strerror(errno));
+          exit_code = -1;
+        }
+        break;
+
+      case FTS_DNR:       // Unreadable directory
+        fprintf(LOGFILE, "Unreadable directory %s. Skipping..\n", 
+                entry->fts_path);
+        break;
+
+      case FTS_D:         // A directory in pre-order
+        // if the directory isn't readable, chmod it
+        if ((entry->fts_statp->st_mode & 0200) == 0) {
+          fprintf(LOGFILE, "Unreadable directory %s, chmoding.\n", 
+                  entry->fts_path);
+          if (chmod(entry->fts_accpath, 0700) != 0) {
+            fprintf(LOGFILE, "Error chmoding %s - %s, continuing\n", 
+                    entry->fts_path, strerror(errno));
+          }
+        }
+        break;
+
+      case FTS_NS:        // A file with no stat(2) information
+        // usually a root directory that doesn't exist
+        fprintf(LOGFILE, "Directory not found %s\n", entry->fts_path);
+        break;
+
+      case FTS_DC:        // A directory that causes a cycle
+      case FTS_DOT:       // A dot directory
+      case FTS_NSOK:      // No stat information requested
+        break;
+
+      case FTS_ERR:       // Error return
+        fprintf(LOGFILE, "Error traversing directory %s - %s\n", 
+                entry->fts_path, strerror(entry->fts_errno));
+        exit_code = -1;
+        break;
+        break;
+      default:
+        exit_code = -1;
+        break;
+      }
+    }
+    ret = fts_close(tree);
+    if (exit_code == 0 && ret != 0) {
+      fprintf(LOGFILE, "Error in fts_close while deleting %s\n", full_path);
+      exit_code = -1;
+    }
+    if (needs_tt_user) {
+      // If the delete failed, try a final rmdir as root on the top level.
+      // That handles the case where the top level directory is in a directory
+      // that is owned by the node manager.
+      exit_code = rmdir_as_nm(full_path);
+    }
+    free(paths[0]);
+    paths[0] = NULL;
+  }
+  return exit_code;
+}
+
+int exec_as_user(const char * working_dir, const char * script_file) {
+  char *script_file_dest = NULL;
+  script_file_dest = get_container_launcher_file(working_dir);
+  if (script_file_dest == NULL) {
+    return OUT_OF_MEMORY;
+  }
+
+  // open launch script
+  int script_file_source = open_file_as_nm(script_file);
+  if (script_file_source == -1) {
+    return -1;
+  }
+
+  setsid();
+
+  // give up root privs
+  if (change_user(user_detail->pw_uid, user_detail->pw_gid) != 0) {
+    return SETUID_OPER_FAILED;
+  }
+
+  if (copy_file(script_file_source, script_file, script_file_dest, S_IRWXU) != 0) {
+    return -1;
+  }
+
+  fcloseall();
+  umask(0027);
+  if (chdir(working_dir) != 0) {
+    fprintf(LOGFILE, "Can't change directory to %s -%s\n", working_dir,
+	    strerror(errno));
+    return -1;
+  }
+
+  if (execlp(script_file_dest, script_file_dest, NULL) != 0) {
+    fprintf(LOGFILE, "Couldn't execute the container launch file %s - %s", 
+            script_file_dest, strerror(errno));
+    return UNABLE_TO_EXECUTE_CONTAINER_SCRIPT;
+  }
+ 
+  //Unreachable
+  return -1;
+}
+
+/**
+ * Delete the given directory as the user from each of the directories
+ * user: the user doing the delete
+ * subdir: the subdir to delete (if baseDirs is empty, this is treated as
+           an absolute path)
+ * baseDirs: (optional) the baseDirs where the subdir is located
+ */
+int delete_as_user(const char *user,
+                   const char *subdir,
+                   char* const* baseDirs) {
+  int ret = 0;
+
+  char** ptr;
+
+  // TODO: No switching user? !!!!
+  if (baseDirs == NULL || *baseDirs == NULL) {
+    return delete_path(subdir, 1);
+  }
+  // do the delete
+  for(ptr = (char**)baseDirs; *ptr != NULL; ++ptr) {
+    char* full_path = concatenate("%s/%s", "user subdir", 2,
+                              *ptr, subdir);
+    if (full_path == NULL) {
+      return -1;
+    }
+    int this_ret = delete_path(full_path, strlen(subdir) == 0);
+    free(full_path);
+    full_path = NULL;
+    // delete as much as we can, but remember the error
+    if (this_ret != 0) {
+      ret = this_ret;
+    }
+  }
+  return ret;
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/native/worker-launcher/impl/worker-launcher.h
----------------------------------------------------------------------
diff --git a/storm-core/src/native/worker-launcher/impl/worker-launcher.h b/storm-core/src/native/worker-launcher/impl/worker-launcher.h
new file mode 100644
index 0000000..59ab998
--- /dev/null
+++ b/storm-core/src/native/worker-launcher/impl/worker-launcher.h
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <pwd.h>
+#include <stdio.h>
+#include <sys/types.h>
+
+enum errorcodes {
+  INVALID_ARGUMENT_NUMBER = 1,
+  INVALID_USER_NAME, //2
+  INVALID_COMMAND_PROVIDED, //3
+  // SUPER_USER_NOT_ALLOWED_TO_RUN_TASKS (NOT USED) 4
+  INVALID_NM_ROOT_DIRS = 5,
+  SETUID_OPER_FAILED, //6
+  UNABLE_TO_EXECUTE_CONTAINER_SCRIPT, //7
+  UNABLE_TO_SIGNAL_CONTAINER, //8
+  INVALID_CONTAINER_PID, //9
+  // ERROR_RESOLVING_FILE_PATH (NOT_USED) 10
+  // RELATIVE_PATH_COMPONENTS_IN_FILE_PATH (NOT USED) 11
+  // UNABLE_TO_STAT_FILE (NOT USED) 12
+  // FILE_NOT_OWNED_BY_ROOT (NOT USED) 13
+  // PREPARE_CONTAINER_DIRECTORIES_FAILED (NOT USED) 14
+  // INITIALIZE_CONTAINER_FAILED (NOT USED) 15
+  // PREPARE_CONTAINER_LOGS_FAILED (NOT USED) 16
+  // INVALID_LOG_DIR (NOT USED) 17
+  OUT_OF_MEMORY = 18,
+  // INITIALIZE_DISTCACHEFILE_FAILED (NOT USED) 19
+  INITIALIZE_USER_FAILED = 20,
+  UNABLE_TO_BUILD_PATH, //21
+  INVALID_CONTAINER_EXEC_PERMISSIONS, //22
+  // PREPARE_JOB_LOGS_FAILED (NOT USED) 23
+  INVALID_CONFIG_FILE =  24,
+  SETSID_OPER_FAILED = 25,
+  WRITE_PIDFILE_FAILED = 26
+};
+
+#define LAUNCHER_GROUP_KEY "storm.worker-launcher.group"
+
+#define USER_DIR_PATTERN "%s/usercache/%s"
+#define NM_APP_DIR_PATTERN USER_DIR_PATTERN "/appcache/%s"
+#define CONTAINER_DIR_PATTERN NM_APP_DIR_PATTERN "/%s"
+#define CONTAINER_SCRIPT "launch_container.sh"
+#define CREDENTIALS_FILENAME "container_tokens"
+#define MIN_USERID_KEY "min.user.id"
+#define BANNED_USERS_KEY "banned.users"
+#define TMP_DIR "tmp"
+
+extern struct passwd *user_detail;
+
+// the log file for messages
+extern FILE *LOGFILE;
+// the log file for error messages
+extern FILE *ERRORFILE;
+
+int setup_stormdist_dir(const char* local_dir);
+
+int exec_as_user(const char * working_dir, const char * args);
+
+// delete a directory (or file) recursively as the user. The directory
+// could optionally be relative to the baseDir set of directories (if the same
+// directory appears on multiple disk volumes, the disk volumes should be passed
+// as the baseDirs). If baseDirs is not specified, then dir_to_be_deleted is 
+// assumed as the absolute path
+int delete_as_user(const char *user,
+                   const char *dir_to_be_deleted,
+                   char* const* baseDirs);
+
+// get the executable's filename
+char* get_executable();
+
+/**
+ * Check the permissions on the worker-launcher to make sure that security is
+ * permissible. For this, we need worker-launcher binary to
+ *    * be user-owned by root
+ *    * be group-owned by a configured special group.
+ *    * others do not have any permissions
+ *    * be setuid/setgid
+ * @param executable_file the file to check
+ * @return -1 on error 0 on success.
+ */
+int check_executor_permissions(char *executable_file);
+
+/**
+ * Function used to signal a container launched by the user.
+ * The function sends appropriate signal to the process group
+ * specified by the pid.
+ * @param user the user to send the signal as.
+ * @param pid the process id to send the signal to.
+ * @param sig the signal to send.
+ * @return an errorcode enum value on error, or 0 on success.
+ */
+int signal_container_as_user(const char *user, int pid, int sig);
+
+// set the uid and gid of the launcher.  This is used when doing some
+// priviledged operations for setting the effective uid and gid.
+void set_launcher_uid(uid_t user, gid_t group);
+
+/**
+ * Is the user a real user account?
+ * Checks:
+ *   1. Not root
+ *   2. UID is above the minimum configured.
+ *   3. Not in banned user list
+ * Returns NULL on failure
+ */
+struct passwd* check_user(const char *user);
+
+// set the user
+int set_user(const char *user);
+
+// methods to get the directories
+
+char *get_container_launcher_file(const char* work_dir);
+
+int change_user(uid_t user, gid_t group);

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/native/worker-launcher/test/test-worker-launcher.c
----------------------------------------------------------------------
diff --git a/storm-core/src/native/worker-launcher/test/test-worker-launcher.c b/storm-core/src/native/worker-launcher/test/test-worker-launcher.c
new file mode 100644
index 0000000..412e922
--- /dev/null
+++ b/storm-core/src/native/worker-launcher/test/test-worker-launcher.c
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "configuration.h"
+#include "worker-launcher.h"
+
+#include <errno.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <signal.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/stat.h>
+#include <sys/wait.h>
+
+#define TEST_ROOT "/tmp/test-worker-launcher"
+#define DONT_TOUCH_FILE "dont-touch-me"
+#define NM_LOCAL_DIRS       TEST_ROOT "/local-1," TEST_ROOT "/local-2," \
+               TEST_ROOT "/local-3," TEST_ROOT "/local-4," TEST_ROOT "/local-5"
+#define NM_LOG_DIRS         TEST_ROOT "/logdir_1," TEST_ROOT "/logdir_2," \
+                            TEST_ROOT "/logdir_3," TEST_ROOT "/logdir_4"
+#define ARRAY_SIZE 1000
+
+static char* username = NULL;
+static char* local_dirs = NULL;
+static char* log_dirs = NULL;
+
+/**
+ * Run the command using the effective user id.
+ * It can't use system, since bash seems to copy the real user id into the
+ * effective id.
+ */
+void run(const char *cmd) {
+  fflush(stdout);
+  fflush(stderr);
+  pid_t child = fork();
+  if (child == -1) {
+    printf("FAIL: failed to fork - %s\n", strerror(errno));
+  } else if (child == 0) {
+    char *cmd_copy = strdup(cmd);
+    char *ptr;
+    int words = 1;
+    for(ptr = strchr(cmd_copy, ' ');  ptr; ptr = strchr(ptr+1, ' ')) {
+      words += 1;
+    }
+    char **argv = malloc(sizeof(char *) * (words + 1));
+    ptr = strtok(cmd_copy, " ");
+    int i = 0;
+    argv[i++] = ptr;
+    while (ptr != NULL) {
+      ptr = strtok(NULL, " ");
+      argv[i++] = ptr;
+    }
+    if (execvp(argv[0], argv) != 0) {
+      printf("FAIL: exec failed in child %s - %s\n", cmd, strerror(errno));
+      exit(42);
+    }
+  } else {
+    int status = 0;
+    if (waitpid(child, &status, 0) <= 0) {
+      printf("FAIL: failed waiting for child process %s pid %d - %s\n", 
+	     cmd, child, strerror(errno));
+      exit(1);
+    }
+    if (!WIFEXITED(status)) {
+      printf("FAIL: process %s pid %d did not exit\n", cmd, child);
+      exit(1);
+    }
+    if (WEXITSTATUS(status) != 0) {
+      printf("FAIL: process %s pid %d exited with error status %d\n", cmd, 
+	     child, WEXITSTATUS(status));
+      exit(1);
+    }
+  }
+}
+
+int write_config_file(char *file_name) {
+  FILE *file;
+  file = fopen(file_name, "w");
+  if (file == NULL) {
+    printf("Failed to open %s.\n", file_name);
+    return EXIT_FAILURE;
+  }
+  fprintf(file, "banned.users=bannedUser\n");
+  fprintf(file, "min.user.id=1000\n");
+  fclose(file);
+  return 0;
+}
+
+void create_nm_roots(char ** nm_roots) {
+  char** nm_root;
+  for(nm_root=nm_roots; *nm_root != NULL; ++nm_root) {
+    if (mkdir(*nm_root, 0755) != 0) {
+      printf("FAIL: Can't create directory %s - %s\n", *nm_root,
+             strerror(errno));
+      exit(1);
+    }
+    char buffer[100000];
+    sprintf(buffer, "%s/usercache", *nm_root);
+    if (mkdir(buffer, 0755) != 0) {
+      printf("FAIL: Can't create directory %s - %s\n", buffer,
+             strerror(errno));
+      exit(1);
+    }
+  }
+}
+
+void test_get_container_launcher_file() {
+  char *expected_file = ("/tmp/launch_container.sh");
+  char *app_dir = "/tmp";
+  char *container_file =  get_container_launcher_file(app_dir);
+  if (strcmp(container_file, expected_file) != 0) {
+    printf("failure to match expected container file %s vs %s\n", container_file,
+           expected_file);
+    exit(1);
+  }
+  free(container_file);
+}
+
+void test_check_user() {
+  printf("\nTesting test_check_user\n");
+  struct passwd *user = check_user(username);
+  if (user == NULL) {
+    printf("FAIL: failed check for user %s\n", username);
+    exit(1);
+  }
+  free(user);
+  if (check_user("lp") != NULL) {
+    printf("FAIL: failed check for system user lp\n");
+    exit(1);
+  }
+  if (check_user("root") != NULL) {
+    printf("FAIL: failed check for system user root\n");
+    exit(1);
+  }
+}
+
+void test_check_configuration_permissions() {
+  printf("\nTesting check_configuration_permissions\n");
+  if (check_configuration_permissions("/etc/passwd") != 0) {
+    printf("FAIL: failed permission check on /etc/passwd\n");
+    exit(1);
+  }
+  if (check_configuration_permissions(TEST_ROOT) == 0) {
+    printf("FAIL: failed permission check on %s\n", TEST_ROOT);
+    exit(1);
+  }
+}
+
+void run_test_in_child(const char* test_name, void (*func)()) {
+  printf("\nRunning test %s in child process\n", test_name);
+  fflush(stdout);
+  fflush(stderr);
+  pid_t child = fork();
+  if (child == -1) {
+    printf("FAIL: fork failed\n");
+    exit(1);
+  } else if (child == 0) {
+    func();
+    exit(0);
+  } else {
+    int status = 0;
+    if (waitpid(child, &status, 0) == -1) {
+      printf("FAIL: waitpid %d failed - %s\n", child, strerror(errno));
+      exit(1);
+    }
+    if (!WIFEXITED(status)) {
+      printf("FAIL: child %d didn't exit - %d\n", child, status);
+      exit(1);
+    }
+    if (WEXITSTATUS(status) != 0) {
+      printf("FAIL: child %d exited with bad status %d\n",
+	     child, WEXITSTATUS(status));
+      exit(1);
+    }
+  }
+}
+
+void test_signal_container() {
+  printf("\nTesting signal_container\n");
+  fflush(stdout);
+  fflush(stderr);
+  pid_t child = fork();
+  if (child == -1) {
+    printf("FAIL: fork failed\n");
+    exit(1);
+  } else if (child == 0) {
+    if (change_user(user_detail->pw_uid, user_detail->pw_gid) != 0) {
+      exit(1);
+    }
+    sleep(3600);
+    exit(0);
+  } else {
+    printf("Child container launched as %d\n", child);
+    if (signal_container_as_user(username, child, SIGQUIT) != 0) {
+      exit(1);
+    }
+    int status = 0;
+    if (waitpid(child, &status, 0) == -1) {
+      printf("FAIL: waitpid failed - %s\n", strerror(errno));
+      exit(1);
+    }
+    if (!WIFSIGNALED(status)) {
+      printf("FAIL: child wasn't signalled - %d\n", status);
+      exit(1);
+    }
+    if (WTERMSIG(status) != SIGQUIT) {
+      printf("FAIL: child was killed with %d instead of %d\n", 
+	     WTERMSIG(status), SIGQUIT);
+      exit(1);
+    }
+  }
+}
+
+void test_signal_container_group() {
+  printf("\nTesting group signal_container\n");
+  fflush(stdout);
+  fflush(stderr);
+  pid_t child = fork();
+  if (child == -1) {
+    printf("FAIL: fork failed\n");
+    exit(1);
+  } else if (child == 0) {
+    setpgrp();
+    if (change_user(user_detail->pw_uid, user_detail->pw_gid) != 0) {
+      exit(1);
+    }
+    sleep(3600);
+    exit(0);
+  }
+  printf("Child container launched as %d\n", child);
+  if (signal_container_as_user(username, child, SIGKILL) != 0) {
+    exit(1);
+  }
+  int status = 0;
+  if (waitpid(child, &status, 0) == -1) {
+    printf("FAIL: waitpid failed - %s\n", strerror(errno));
+    exit(1);
+  }
+  if (!WIFSIGNALED(status)) {
+    printf("FAIL: child wasn't signalled - %d\n", status);
+    exit(1);
+  }
+  if (WTERMSIG(status) != SIGKILL) {
+    printf("FAIL: child was killed with %d instead of %d\n", 
+	   WTERMSIG(status), SIGKILL);
+    exit(1);
+  }
+}
+
+/**
+ * Ensure that the given path and all of the parent directories are created
+ * with the desired permissions.
+ */
+int mkdirs(const char* path, mode_t perm) {
+  char *cmd = malloc(10 + strlen(path));
+  int ret = 0;
+  sprintf(cmd, "mkdir -p %s", path);
+  ret = system(cmd);
+  free(cmd);
+  return ret;
+}
+
+int main(int argc, char **argv) {
+  LOGFILE = stdout;
+  ERRORFILE = stderr;
+  int my_username = 0;
+
+  // clean up any junk from previous run
+  system("chmod -R u=rwx " TEST_ROOT "; rm -fr " TEST_ROOT);
+  
+  if (mkdirs(TEST_ROOT "/logs/userlogs", 0755) != 0) {
+    exit(1);
+  }
+  
+  if (write_config_file(TEST_ROOT "/test.cfg") != 0) {
+    exit(1);
+  }
+  read_config(TEST_ROOT "/test.cfg");
+
+  local_dirs = (char *) malloc (sizeof(char) * ARRAY_SIZE);
+  strcpy(local_dirs, NM_LOCAL_DIRS);
+  log_dirs = (char *) malloc (sizeof(char) * ARRAY_SIZE);
+  strcpy(log_dirs, NM_LOG_DIRS);
+
+  create_nm_roots(extract_values(local_dirs));
+
+  if (getuid() == 0 && argc == 2) {
+    username = argv[1];
+  } else {
+    username = strdup(getpwuid(getuid())->pw_name);
+    my_username = 1;
+  }
+  set_launcher_uid(geteuid(), getegid());
+
+  if (set_user(username)) {
+    exit(1);
+  }
+
+  printf("\nStarting tests\n");
+
+  printf("\nTesting get_container_launcher_file()\n");
+  test_get_container_launcher_file();
+
+  printf("\nTesting check_configuration_permissions()\n");
+  test_check_configuration_permissions();
+
+  printf("\nTesting check_user()\n");
+  test_check_user();
+
+  // the tests that change user need to be run in a subshell, so that
+  // when they change user they don't give up our privs
+  run_test_in_child("test_signal_container", test_signal_container);
+  run_test_in_child("test_signal_container_group", test_signal_container_group);
+
+  seteuid(0);
+  run("rm -fr " TEST_ROOT);
+  printf("\nFinished tests\n");
+
+  if (my_username) {
+    free(username);
+  }
+  free_configurations();
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/py/storm/DistributedRPC.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/DistributedRPC.py b/storm-core/src/py/storm/DistributedRPC.py
index a7e6ef9..851ad65 100644
--- a/storm-core/src/py/storm/DistributedRPC.py
+++ b/storm-core/src/py/storm/DistributedRPC.py
@@ -64,6 +64,8 @@ class Client(Iface):
       return result.success
     if result.e is not None:
       raise result.e
+    if result.aze is not None:
+      raise result.aze
     raise TApplicationException(TApplicationException.MISSING_RESULT, "execute failed: unknown result");
 
 
@@ -97,6 +99,8 @@ class Processor(Iface, TProcessor):
       result.success = self._handler.execute(args.functionName, args.funcArgs)
     except DRPCExecutionException, e:
       result.e = e
+    except AuthorizationException, aze:
+      result.aze = aze
     oprot.writeMessageBegin("execute", TMessageType.REPLY, seqid)
     result.write(oprot)
     oprot.writeMessageEnd()
@@ -185,19 +189,22 @@ class execute_result:
   Attributes:
    - success
    - e
+   - aze
   """
 
   thrift_spec = (
     (0, TType.STRING, 'success', None, None, ), # 0
     (1, TType.STRUCT, 'e', (DRPCExecutionException, DRPCExecutionException.thrift_spec), None, ), # 1
+    (2, TType.STRUCT, 'aze', (AuthorizationException, AuthorizationException.thrift_spec), None, ), # 2
   )
 
   def __hash__(self):
-    return 0 + hash(self.success) + hash(self.e)
+    return 0 + hash(self.success) + hash(self.e) + hash(self.aze)
 
-  def __init__(self, success=None, e=None,):
+  def __init__(self, success=None, e=None, aze=None,):
     self.success = success
     self.e = e
+    self.aze = aze
 
   def read(self, iprot):
     if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -219,6 +226,12 @@ class execute_result:
           self.e.read(iprot)
         else:
           iprot.skip(ftype)
+      elif fid == 2:
+        if ftype == TType.STRUCT:
+          self.aze = AuthorizationException()
+          self.aze.read(iprot)
+        else:
+          iprot.skip(ftype)
       else:
         iprot.skip(ftype)
       iprot.readFieldEnd()
@@ -237,6 +250,10 @@ class execute_result:
       oprot.writeFieldBegin('e', TType.STRUCT, 1)
       self.e.write(oprot)
       oprot.writeFieldEnd()
+    if self.aze is not None:
+      oprot.writeFieldBegin('aze', TType.STRUCT, 2)
+      self.aze.write(oprot)
+      oprot.writeFieldEnd()
     oprot.writeFieldStop()
     oprot.writeStructEnd()
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/py/storm/DistributedRPCInvocations.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/DistributedRPCInvocations.py b/storm-core/src/py/storm/DistributedRPCInvocations.py
index 4f951a9..6de2245 100644
--- a/storm-core/src/py/storm/DistributedRPCInvocations.py
+++ b/storm-core/src/py/storm/DistributedRPCInvocations.py
@@ -74,6 +74,8 @@ class Client(Iface):
     result = result_result()
     result.read(self._iprot)
     self._iprot.readMessageEnd()
+    if result.aze is not None:
+      raise result.aze
     return
 
   def fetchRequest(self, functionName):
@@ -104,6 +106,8 @@ class Client(Iface):
     self._iprot.readMessageEnd()
     if result.success is not None:
       return result.success
+    if result.aze is not None:
+      raise result.aze
     raise TApplicationException(TApplicationException.MISSING_RESULT, "fetchRequest failed: unknown result");
 
   def failRequest(self, id):
@@ -132,6 +136,8 @@ class Client(Iface):
     result = failRequest_result()
     result.read(self._iprot)
     self._iprot.readMessageEnd()
+    if result.aze is not None:
+      raise result.aze
     return
 
 
@@ -163,7 +169,10 @@ class Processor(Iface, TProcessor):
     args.read(iprot)
     iprot.readMessageEnd()
     result = result_result()
-    self._handler.result(args.id, args.result)
+    try:
+      self._handler.result(args.id, args.result)
+    except AuthorizationException, aze:
+      result.aze = aze
     oprot.writeMessageBegin("result", TMessageType.REPLY, seqid)
     result.write(oprot)
     oprot.writeMessageEnd()
@@ -174,7 +183,10 @@ class Processor(Iface, TProcessor):
     args.read(iprot)
     iprot.readMessageEnd()
     result = fetchRequest_result()
-    result.success = self._handler.fetchRequest(args.functionName)
+    try:
+      result.success = self._handler.fetchRequest(args.functionName)
+    except AuthorizationException, aze:
+      result.aze = aze
     oprot.writeMessageBegin("fetchRequest", TMessageType.REPLY, seqid)
     result.write(oprot)
     oprot.writeMessageEnd()
@@ -185,7 +197,10 @@ class Processor(Iface, TProcessor):
     args.read(iprot)
     iprot.readMessageEnd()
     result = failRequest_result()
-    self._handler.failRequest(args.id)
+    try:
+      self._handler.failRequest(args.id)
+    except AuthorizationException, aze:
+      result.aze = aze
     oprot.writeMessageBegin("failRequest", TMessageType.REPLY, seqid)
     result.write(oprot)
     oprot.writeMessageEnd()
@@ -270,12 +285,21 @@ class result_args:
     return not (self == other)
 
 class result_result:
+  """
+  Attributes:
+   - aze
+  """
 
   thrift_spec = (
+    None, # 0
+    (1, TType.STRUCT, 'aze', (AuthorizationException, AuthorizationException.thrift_spec), None, ), # 1
   )
 
   def __hash__(self):
-    return 0
+    return 0 + hash(self.aze)
+
+  def __init__(self, aze=None,):
+    self.aze = aze
 
   def read(self, iprot):
     if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -286,6 +310,12 @@ class result_result:
       (fname, ftype, fid) = iprot.readFieldBegin()
       if ftype == TType.STOP:
         break
+      if fid == 1:
+        if ftype == TType.STRUCT:
+          self.aze = AuthorizationException()
+          self.aze.read(iprot)
+        else:
+          iprot.skip(ftype)
       else:
         iprot.skip(ftype)
       iprot.readFieldEnd()
@@ -296,6 +326,10 @@ class result_result:
       oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
       return
     oprot.writeStructBegin('result_result')
+    if self.aze is not None:
+      oprot.writeFieldBegin('aze', TType.STRUCT, 1)
+      self.aze.write(oprot)
+      oprot.writeFieldEnd()
     oprot.writeFieldStop()
     oprot.writeStructEnd()
 
@@ -381,17 +415,20 @@ class fetchRequest_result:
   """
   Attributes:
    - success
+   - aze
   """
 
   thrift_spec = (
     (0, TType.STRUCT, 'success', (DRPCRequest, DRPCRequest.thrift_spec), None, ), # 0
+    (1, TType.STRUCT, 'aze', (AuthorizationException, AuthorizationException.thrift_spec), None, ), # 1
   )
 
   def __hash__(self):
-    return 0 + hash(self.success)
+    return 0 + hash(self.success) + hash(self.aze)
 
-  def __init__(self, success=None,):
+  def __init__(self, success=None, aze=None,):
     self.success = success
+    self.aze = aze
 
   def read(self, iprot):
     if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -408,6 +445,12 @@ class fetchRequest_result:
           self.success.read(iprot)
         else:
           iprot.skip(ftype)
+      elif fid == 1:
+        if ftype == TType.STRUCT:
+          self.aze = AuthorizationException()
+          self.aze.read(iprot)
+        else:
+          iprot.skip(ftype)
       else:
         iprot.skip(ftype)
       iprot.readFieldEnd()
@@ -422,6 +465,10 @@ class fetchRequest_result:
       oprot.writeFieldBegin('success', TType.STRUCT, 0)
       self.success.write(oprot)
       oprot.writeFieldEnd()
+    if self.aze is not None:
+      oprot.writeFieldBegin('aze', TType.STRUCT, 1)
+      self.aze.write(oprot)
+      oprot.writeFieldEnd()
     oprot.writeFieldStop()
     oprot.writeStructEnd()
 
@@ -504,12 +551,21 @@ class failRequest_args:
     return not (self == other)
 
 class failRequest_result:
+  """
+  Attributes:
+   - aze
+  """
 
   thrift_spec = (
+    None, # 0
+    (1, TType.STRUCT, 'aze', (AuthorizationException, AuthorizationException.thrift_spec), None, ), # 1
   )
 
   def __hash__(self):
-    return 0
+    return 0 + hash(self.aze)
+
+  def __init__(self, aze=None,):
+    self.aze = aze
 
   def read(self, iprot):
     if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -520,6 +576,12 @@ class failRequest_result:
       (fname, ftype, fid) = iprot.readFieldBegin()
       if ftype == TType.STOP:
         break
+      if fid == 1:
+        if ftype == TType.STRUCT:
+          self.aze = AuthorizationException()
+          self.aze.read(iprot)
+        else:
+          iprot.skip(ftype)
       else:
         iprot.skip(ftype)
       iprot.readFieldEnd()
@@ -530,6 +592,10 @@ class failRequest_result:
       oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
       return
     oprot.writeStructBegin('failRequest_result')
+    if self.aze is not None:
+      oprot.writeFieldBegin('aze', TType.STRUCT, 1)
+      self.aze.write(oprot)
+      oprot.writeFieldEnd()
     oprot.writeFieldStop()
     oprot.writeStructEnd()
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/py/storm/Nimbus-remote
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/Nimbus-remote b/storm-core/src/py/storm/Nimbus-remote
index 4b2ff04..c184fab 100755
--- a/storm-core/src/py/storm/Nimbus-remote
+++ b/storm-core/src/py/storm/Nimbus-remote
@@ -28,6 +28,7 @@ if len(sys.argv) <= 1 or sys.argv[1] == '--help':
   print '  void activate(string name)'
   print '  void deactivate(string name)'
   print '  void rebalance(string name, RebalanceOptions options)'
+  print '  void uploadNewCredentials(string name, Credentials creds)'
   print '  string beginFileUpload()'
   print '  void uploadChunk(string location, string chunk)'
   print '  void finishFileUpload(string location)'
@@ -131,6 +132,12 @@ elif cmd == 'rebalance':
     sys.exit(1)
   pp.pprint(client.rebalance(args[0],eval(args[1]),))
 
+elif cmd == 'uploadNewCredentials':
+  if len(args) != 2:
+    print 'uploadNewCredentials requires 2 args'
+    sys.exit(1)
+  pp.pprint(client.uploadNewCredentials(args[0],eval(args[1]),))
+
 elif cmd == 'beginFileUpload':
   if len(args) != 0:
     print 'beginFileUpload requires 0 args'