You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/05/21 18:07:00 UTC
[05/14] STORM-216: Added Authentication and Authorization.
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/native/worker-launcher/impl/configuration.h
----------------------------------------------------------------------
diff --git a/storm-core/src/native/worker-launcher/impl/configuration.h b/storm-core/src/native/worker-launcher/impl/configuration.h
new file mode 100644
index 0000000..b0d4814
--- /dev/null
+++ b/storm-core/src/native/worker-launcher/impl/configuration.h
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Ensure that the configuration file and all of the containing directories
+ * are only writable by root. Otherwise, an attacker can change the
+ * configuration and potentially cause damage.
+ * returns 0 if permissions are ok
+ */
+int check_configuration_permissions(const char* file_name);
+
+// read the given configuration file
+void read_config(const char* config_file);
+
+//method exposed to get the configurations
+char *get_value(const char* key);
+
+//function to return array of values pointing to the key. Values are
+//comma seperated strings.
+char ** get_values(const char* key);
+
+// Extracts array of values from the comma separated list of values.
+char ** extract_values(char *value);
+
+// free the memory returned by get_values
+void free_values(char** values);
+
+//method to free allocated configuration
+void free_configurations();
+
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/native/worker-launcher/impl/main.c
----------------------------------------------------------------------
diff --git a/storm-core/src/native/worker-launcher/impl/main.c b/storm-core/src/native/worker-launcher/impl/main.c
new file mode 100644
index 0000000..7067cf9
--- /dev/null
+++ b/storm-core/src/native/worker-launcher/impl/main.c
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "configuration.h"
+#include "worker-launcher.h"
+
+#include <errno.h>
+#include <grp.h>
+#include <limits.h>
+#include <unistd.h>
+#include <signal.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <strings.h>
+#include <sys/stat.h>
+
+#define _STRINGIFY(X) #X
+#define STRINGIFY(X) _STRINGIFY(X)
+#define CONF_FILENAME "worker-launcher.cfg"
+
+#ifndef EXEC_CONF_DIR
+ #error EXEC_CONF_DIR must be defined
+#endif
+
+void display_usage(FILE *stream) {
+ fprintf(stream,
+ "Usage: worker-launcher --checksetup\n");
+ fprintf(stream,
+ "Usage: worker-launcher user command command-args\n");
+ fprintf(stream, "Commands:\n");
+ fprintf(stream, " initialize stormdist dir: code-dir <code-directory>\n");
+ fprintf(stream, " remove a file/directory: rmr <directory>\n");
+ fprintf(stream, " launch a worker: worker <working-directory> <script-to-run>\n");
+ fprintf(stream, " signal a worker: signal <pid> <signal>\n");
+}
+
+int main(int argc, char **argv) {
+ int invalid_args = 0;
+ int do_check_setup = 0;
+
+ LOGFILE = stdout;
+ ERRORFILE = stderr;
+
+ // Minimum number of arguments required to run
+ // the std. worker-launcher commands is 3
+ // 3 args not needed for checksetup option
+ if (argc < 3) {
+ invalid_args = 1;
+ if (argc == 2) {
+ const char *arg1 = argv[1];
+ if (strcmp("--checksetup", arg1) == 0) {
+ invalid_args = 0;
+ do_check_setup = 1;
+ }
+ }
+ }
+
+ if (invalid_args != 0) {
+ display_usage(stdout);
+ return INVALID_ARGUMENT_NUMBER;
+ }
+
+ const char * command = NULL;
+ const char * working_dir = NULL;
+
+ int exit_code = 0;
+
+ char *executable_file = get_executable();
+
+ char *orig_conf_file = STRINGIFY(EXEC_CONF_DIR) "/" CONF_FILENAME;
+ char *conf_file = realpath(orig_conf_file, NULL);
+
+ if (conf_file == NULL) {
+ fprintf(ERRORFILE, "Configuration file %s not found.\n", orig_conf_file);
+ exit(INVALID_CONFIG_FILE);
+ }
+ if (check_configuration_permissions(conf_file) != 0) {
+ exit(INVALID_CONFIG_FILE);
+ }
+ read_config(conf_file);
+ free(conf_file);
+ conf_file = NULL;
+
+ // look up the node manager group in the config file
+ char *nm_group = get_value(LAUNCHER_GROUP_KEY);
+ if (nm_group == NULL) {
+ fprintf(ERRORFILE, "Can't get configured value for %s.\n", LAUNCHER_GROUP_KEY);
+ exit(INVALID_CONFIG_FILE);
+ }
+ struct group *group_info = getgrnam(nm_group);
+ if (group_info == NULL) {
+ fprintf(ERRORFILE, "Can't get group information for %s - %s.\n", nm_group,
+ strerror(errno));
+ fflush(LOGFILE);
+ exit(INVALID_CONFIG_FILE);
+ }
+
+ set_launcher_uid(getuid(), group_info->gr_gid);
+ // if we are running from a setuid executable, make the real uid root
+ setuid(0);
+ // set the real and effective group id to the node manager group
+ setgid(group_info->gr_gid);
+
+ if (check_executor_permissions(executable_file) != 0) {
+ fprintf(ERRORFILE, "Invalid permissions on worker-launcher binary.\n");
+ return INVALID_CONTAINER_EXEC_PERMISSIONS;
+ }
+
+ if (do_check_setup != 0) {
+ // basic setup checks done
+ // verified configs available and valid
+ // verified executor permissions
+ return 0;
+ }
+
+ //checks done for user name
+ if (argv[optind] == NULL) {
+ fprintf(ERRORFILE, "Invalid user name.\n");
+ return INVALID_USER_NAME;
+ }
+
+ int ret = set_user(argv[optind]);
+ if (ret != 0) {
+ return ret;
+ }
+
+ optind = optind + 1;
+ command = argv[optind++];
+
+ fprintf(LOGFILE, "main : command provided %s\n",command);
+ fprintf(LOGFILE, "main : user is %s\n", user_detail->pw_name);
+ fflush(LOGFILE);
+
+ if (strcasecmp("code-dir", command) == 0) {
+ if (argc != 4) {
+ fprintf(ERRORFILE, "Incorrect number of arguments (%d vs 4) for code-dir\n",
+ argc);
+ fflush(ERRORFILE);
+ return INVALID_ARGUMENT_NUMBER;
+ }
+ exit_code = setup_stormdist_dir(argv[optind]);
+ } else if (strcasecmp("rmr", command) == 0) {
+ if (argc != 4) {
+ fprintf(ERRORFILE, "Incorrect number of arguments (%d vs 4) for rmr\n",
+ argc);
+ fflush(ERRORFILE);
+ return INVALID_ARGUMENT_NUMBER;
+ }
+ exit_code= delete_as_user(user_detail->pw_name, argv[optind],
+ NULL);
+ } else if (strcasecmp("worker", command) == 0) {
+ if (argc != 5) {
+ fprintf(ERRORFILE, "Incorrect number of arguments (%d vs 5) for worker\n",
+ argc);
+ fflush(ERRORFILE);
+ return INVALID_ARGUMENT_NUMBER;
+ }
+ working_dir = argv[optind++];
+ exit_code = setup_stormdist_dir(working_dir);
+ if (exit_code == 0) {
+ exit_code = exec_as_user(working_dir, argv[optind]);
+ }
+ } else if (strcasecmp("signal", command) == 0) {
+ if (argc != 5) {
+ fprintf(ERRORFILE, "Incorrect number of arguments (%d vs 5) for signal\n",
+ argc);
+ fflush(ERRORFILE);
+ return INVALID_ARGUMENT_NUMBER;
+ }
+ char* end_ptr = NULL;
+ char* option = argv[optind++];
+ int container_pid = strtol(option, &end_ptr, 10);
+ if (option == end_ptr || *end_ptr != '\0') {
+ fprintf(ERRORFILE, "Illegal argument for container pid %s\n", option);
+ fflush(ERRORFILE);
+ return INVALID_ARGUMENT_NUMBER;
+ }
+ option = argv[optind++];
+ int signal = strtol(option, &end_ptr, 10);
+ if (option == end_ptr || *end_ptr != '\0') {
+ fprintf(ERRORFILE, "Illegal argument for signal %s\n", option);
+ fflush(ERRORFILE);
+ return INVALID_ARGUMENT_NUMBER;
+ }
+ exit_code = signal_container_as_user(user_detail->pw_name, container_pid, signal);
+ } else {
+ fprintf(ERRORFILE, "Invalid command %s not supported.",command);
+ fflush(ERRORFILE);
+ exit_code = INVALID_COMMAND_PROVIDED;
+ }
+ fclose(LOGFILE);
+ fclose(ERRORFILE);
+ return exit_code;
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/native/worker-launcher/impl/worker-launcher.c
----------------------------------------------------------------------
diff --git a/storm-core/src/native/worker-launcher/impl/worker-launcher.c b/storm-core/src/native/worker-launcher/impl/worker-launcher.c
new file mode 100644
index 0000000..81d7075
--- /dev/null
+++ b/storm-core/src/native/worker-launcher/impl/worker-launcher.c
@@ -0,0 +1,779 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "configuration.h"
+#include "worker-launcher.h"
+
+#include <dirent.h>
+#include <fcntl.h>
+#include <fts.h>
+#include <errno.h>
+#include <grp.h>
+#include <unistd.h>
+#include <signal.h>
+#include <stdarg.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/stat.h>
+
+static const int DEFAULT_MIN_USERID = 1000;
+
+static const char* DEFAULT_BANNED_USERS[] = {"bin", 0};
+
+//struct to store the user details
+struct passwd *user_detail = NULL;
+
+FILE* LOGFILE = NULL;
+FILE* ERRORFILE = NULL;
+
+static uid_t launcher_uid = -1;
+static gid_t launcher_gid = -1;
+
+char *concatenate(char *concat_pattern, char *return_path_name,
+ int numArgs, ...);
+
+void set_launcher_uid(uid_t user, gid_t group) {
+ launcher_uid = user;
+ launcher_gid = group;
+}
+
+/**
+ * get the executable filename.
+ */
+char* get_executable() {
+ char buffer[PATH_MAX];
+ snprintf(buffer, PATH_MAX, "/proc/%u/exe", getpid());
+ char *filename = malloc(PATH_MAX);
+ if (NULL == filename) {
+ fprintf(ERRORFILE, "malloc failed in get_executable\n");
+ exit(-1);
+ }
+ ssize_t len = readlink(buffer, filename, PATH_MAX);
+ if (len == -1) {
+ fprintf(ERRORFILE, "Can't get executable name from %s - %s\n", buffer,
+ strerror(errno));
+ exit(-1);
+ } else if (len >= PATH_MAX) {
+ fprintf(ERRORFILE, "Executable name %.*s is longer than %d characters.\n",
+ PATH_MAX, filename, PATH_MAX);
+ exit(-1);
+ }
+ filename[len] = '\0';
+ return filename;
+}
+
+int check_executor_permissions(char *executable_file) {
+ errno = 0;
+ char * resolved_path = realpath(executable_file, NULL);
+ if (resolved_path == NULL) {
+ fprintf(ERRORFILE,
+ "Error resolving the canonical name for the executable : %s!",
+ strerror(errno));
+ return -1;
+ }
+
+ struct stat filestat;
+ errno = 0;
+ if (stat(resolved_path, &filestat) != 0) {
+ fprintf(ERRORFILE,
+ "Could not stat the executable : %s!.\n", strerror(errno));
+ return -1;
+ }
+
+ uid_t binary_euid = filestat.st_uid; // Binary's user owner
+ gid_t binary_gid = filestat.st_gid; // Binary's group owner
+
+ // Effective uid should be root
+ if (binary_euid != 0) {
+ fprintf(LOGFILE,
+ "The worker-launcher binary should be user-owned by root.\n");
+ return -1;
+ }
+
+ if (binary_gid != getgid()) {
+ fprintf(LOGFILE, "The configured nodemanager group %d is different from"
+ " the group of the executable %d\n", getgid(), binary_gid);
+ return -1;
+ }
+
+ // check others do not have read/write/execute permissions
+ if ((filestat.st_mode & S_IROTH) == S_IROTH || (filestat.st_mode & S_IWOTH)
+ == S_IWOTH || (filestat.st_mode & S_IXOTH) == S_IXOTH) {
+ fprintf(LOGFILE,
+ "The worker-launcher binary should not have read or write or"
+ " execute for others.\n");
+ return -1;
+ }
+
+ // Binary should be setuid/setgid executable
+ if ((filestat.st_mode & S_ISUID) == 0) {
+ fprintf(LOGFILE, "The worker-launcher binary should be set setuid.\n");
+ return -1;
+ }
+
+ return 0;
+}
+
+/**
+ * Change the effective user id to limit damage.
+ */
+static int change_effective_user(uid_t user, gid_t group) {
+ if (geteuid() == user && getegid() == group) {
+ return 0;
+ }
+ if (seteuid(0) != 0) {
+ return -1;
+ }
+ if (setegid(group) != 0) {
+ fprintf(LOGFILE, "Failed to set effective group id %d - %s\n", group,
+ strerror(errno));
+ return -1;
+ }
+ if (seteuid(user) != 0) {
+ fprintf(LOGFILE, "Failed to set effective user id %d - %s\n", user,
+ strerror(errno));
+ return -1;
+ }
+ return 0;
+}
+
+/**
+ * Change the real and effective user and group to abandon the super user
+ * priviledges.
+ */
+int change_user(uid_t user, gid_t group) {
+ if (user == getuid() && user == geteuid() &&
+ group == getgid() && group == getegid()) {
+ return 0;
+ }
+
+ if (seteuid(0) != 0) {
+ fprintf(LOGFILE, "unable to reacquire root - %s\n", strerror(errno));
+ fprintf(LOGFILE, "Real: %d:%d; Effective: %d:%d\n",
+ getuid(), getgid(), geteuid(), getegid());
+ return SETUID_OPER_FAILED;
+ }
+ if (setgid(group) != 0) {
+ fprintf(LOGFILE, "unable to set group to %d - %s\n", group,
+ strerror(errno));
+ fprintf(LOGFILE, "Real: %d:%d; Effective: %d:%d\n",
+ getuid(), getgid(), geteuid(), getegid());
+ return SETUID_OPER_FAILED;
+ }
+ if (setuid(user) != 0) {
+ fprintf(LOGFILE, "unable to set user to %d - %s\n", user, strerror(errno));
+ fprintf(LOGFILE, "Real: %d:%d; Effective: %d:%d\n",
+ getuid(), getgid(), geteuid(), getegid());
+ return SETUID_OPER_FAILED;
+ }
+
+ return 0;
+}
+
+/**
+ * Utility function to concatenate argB to argA using the concat_pattern.
+ */
+char *concatenate(char *concat_pattern, char *return_path_name,
+ int numArgs, ...) {
+ va_list ap;
+ va_start(ap, numArgs);
+ int strlen_args = 0;
+ char *arg = NULL;
+ int j;
+ for (j = 0; j < numArgs; j++) {
+ arg = va_arg(ap, char*);
+ if (arg == NULL) {
+ fprintf(LOGFILE, "One of the arguments passed for %s in null.\n",
+ return_path_name);
+ return NULL;
+ }
+ strlen_args += strlen(arg);
+ }
+ va_end(ap);
+
+ char *return_path = NULL;
+ int str_len = strlen(concat_pattern) + strlen_args + 1;
+
+ return_path = (char *) malloc(str_len);
+ if (return_path == NULL) {
+ fprintf(LOGFILE, "Unable to allocate memory for %s.\n", return_path_name);
+ return NULL;
+ }
+ va_start(ap, numArgs);
+ vsnprintf(return_path, str_len, concat_pattern, ap);
+ va_end(ap);
+ return return_path;
+}
+
+char *get_container_launcher_file(const char* work_dir) {
+ return concatenate("%s/%s", "container launcher", 2, work_dir, CONTAINER_SCRIPT);
+}
+
+/**
+ * Get the tmp directory under the working directory
+ */
+char *get_tmp_directory(const char *work_dir) {
+ return concatenate("%s/%s", "tmp dir", 2, work_dir, TMP_DIR);
+}
+
+/**
+ * Load the user information for a given user name.
+ */
+static struct passwd* get_user_info(const char* user) {
+ int string_size = sysconf(_SC_GETPW_R_SIZE_MAX);
+ void* buffer = malloc(string_size + sizeof(struct passwd));
+ if (buffer == NULL) {
+ fprintf(LOGFILE, "Malloc failed in get_user_info\n");
+ return NULL;
+ }
+ struct passwd *result = NULL;
+ if (getpwnam_r(user, buffer, buffer + sizeof(struct passwd), string_size,
+ &result) != 0) {
+ free(buffer);
+ buffer = NULL;
+ fprintf(LOGFILE, "Can't get user information %s - %s\n", user,
+ strerror(errno));
+ return NULL;
+ }
+ return result;
+}
+
+/**
+ * Is the user a real user account?
+ * Checks:
+ * 1. Not root
+ * 2. UID is above the minimum configured.
+ * 3. Not in banned user list
+ * Returns NULL on failure
+ */
+struct passwd* check_user(const char *user) {
+ if (strcmp(user, "root") == 0) {
+ fprintf(LOGFILE, "Running as root is not allowed\n");
+ fflush(LOGFILE);
+ return NULL;
+ }
+ char *min_uid_str = get_value(MIN_USERID_KEY);
+ int min_uid = DEFAULT_MIN_USERID;
+ if (min_uid_str != NULL) {
+ char *end_ptr = NULL;
+ min_uid = strtol(min_uid_str, &end_ptr, 10);
+ if (min_uid_str == end_ptr || *end_ptr != '\0') {
+ fprintf(LOGFILE, "Illegal value of %s for %s in configuration\n",
+ min_uid_str, MIN_USERID_KEY);
+ fflush(LOGFILE);
+ free(min_uid_str);
+ min_uid_str = NULL;
+ return NULL;
+ }
+ free(min_uid_str);
+ min_uid_str = NULL;
+ }
+ struct passwd *user_info = get_user_info(user);
+ if (NULL == user_info) {
+ fprintf(LOGFILE, "User %s not found\n", user);
+ fflush(LOGFILE);
+ return NULL;
+ }
+ if (user_info->pw_uid < min_uid) {
+ fprintf(LOGFILE, "Requested user %s has id %d, which is below the "
+ "minimum allowed %d\n", user, user_info->pw_uid, min_uid);
+ fflush(LOGFILE);
+ free(user_info);
+ user_info = NULL;
+ return NULL;
+ }
+ char **banned_users = get_values(BANNED_USERS_KEY);
+ char **banned_user = (banned_users == NULL) ?
+ (char**) DEFAULT_BANNED_USERS : banned_users;
+ for(; *banned_user; ++banned_user) {
+ if (strcmp(*banned_user, user) == 0) {
+ free(user_info);
+ user_info = NULL;
+ if (banned_users != (char**)DEFAULT_BANNED_USERS) {
+ free_values(banned_users);
+ banned_users = NULL;
+ }
+ fprintf(LOGFILE, "Requested user %s is banned\n", user);
+ return NULL;
+ }
+ }
+ if (banned_users != NULL && banned_users != (char**)DEFAULT_BANNED_USERS) {
+ free_values(banned_users);
+ banned_users = NULL;
+ }
+ return user_info;
+}
+
+/**
+ * function used to populate and user_details structure.
+ */
+int set_user(const char *user) {
+ // free any old user
+ if (user_detail != NULL) {
+ free(user_detail);
+ user_detail = NULL;
+ }
+ user_detail = check_user(user);
+ if (user_detail == NULL) {
+ return -1;
+ }
+
+ if (geteuid() == user_detail->pw_uid) {
+ return 0;
+ }
+
+ if (initgroups(user, user_detail->pw_gid) != 0) {
+ fprintf(LOGFILE, "Error setting supplementary groups for user %s: %s\n",
+ user, strerror(errno));
+ return -1;
+ }
+
+ return change_effective_user(user_detail->pw_uid, user_detail->pw_gid);
+}
+
+/**
+ * Open a file as the node manager and return a file descriptor for it.
+ * Returns -1 on error
+ */
+static int open_file_as_nm(const char* filename) {
+ uid_t user = geteuid();
+ gid_t group = getegid();
+ if (change_effective_user(launcher_uid, launcher_gid) != 0) {
+ return -1;
+ }
+ int result = open(filename, O_RDONLY);
+ if (result == -1) {
+ fprintf(LOGFILE, "Can't open file %s as node manager - %s\n", filename,
+ strerror(errno));
+ }
+ if (change_effective_user(user, group)) {
+ result = -1;
+ }
+ return result;
+}
+
+/**
+ * Copy a file from a fd to a given filename.
+ * The new file must not exist and it is created with permissions perm.
+ * The input stream is closed.
+ * Return 0 if everything is ok.
+ */
+static int copy_file(int input, const char* in_filename,
+ const char* out_filename, mode_t perm) {
+ const int buffer_size = 128*1024;
+ char buffer[buffer_size];
+ int out_fd = open(out_filename, O_WRONLY|O_CREAT|O_EXCL|O_NOFOLLOW, perm);
+ if (out_fd == -1) {
+ fprintf(LOGFILE, "Can't open %s for output - %s\n", out_filename,
+ strerror(errno));
+ return -1;
+ }
+ ssize_t len = read(input, buffer, buffer_size);
+ while (len > 0) {
+ ssize_t pos = 0;
+ while (pos < len) {
+ ssize_t write_result = write(out_fd, buffer + pos, len - pos);
+ if (write_result <= 0) {
+ fprintf(LOGFILE, "Error writing to %s - %s\n", out_filename,
+ strerror(errno));
+ close(out_fd);
+ return -1;
+ }
+ pos += write_result;
+ }
+ len = read(input, buffer, buffer_size);
+ }
+ if (len < 0) {
+ fprintf(LOGFILE, "Failed to read file %s - %s\n", in_filename,
+ strerror(errno));
+ close(out_fd);
+ return -1;
+ }
+ if (close(out_fd) != 0) {
+ fprintf(LOGFILE, "Failed to close file %s - %s\n", out_filename,
+ strerror(errno));
+ return -1;
+ }
+ close(input);
+ return 0;
+}
+
+int setup_stormdist(FTSENT* entry, uid_t euser) {
+ if (lchown(entry->fts_path, euser, launcher_gid) != 0) {
+ fprintf(ERRORFILE, "Failure to exec app initialization process - %s\n",
+ strerror(errno));
+ return -1;
+ }
+ mode_t mode = entry->fts_statp->st_mode;
+ mode_t new_mode = (mode & (S_IRWXU)) | S_IRGRP | S_IWGRP;
+ if ((mode & S_IXUSR) == S_IXUSR) {
+ new_mode = new_mode | S_IXGRP;
+ }
+ if ((mode & S_IFDIR) == S_IFDIR) {
+ new_mode = new_mode | S_ISGID;
+ }
+ if (chmod(entry->fts_path, new_mode) != 0) {
+ fprintf(ERRORFILE, "Failure to exec app initialization process - %s\n",
+ strerror(errno));
+ return -1;
+ }
+ return 0;
+}
+
+int setup_stormdist_dir(const char* local_dir) {
+ //This is the same as
+ //> chmod g+rwX -R $local_dir
+ //> chown -no-dereference -R $user:$supervisor-group $local_dir
+
+ int exit_code = 0;
+ uid_t euser = geteuid();
+
+ if (local_dir == NULL) {
+ fprintf(ERRORFILE, "Path is null\n");
+ exit_code = UNABLE_TO_BUILD_PATH; // may be malloc failed
+ } else {
+ char *(paths[]) = {strndup(local_dir,PATH_MAX), 0};
+ if (paths[0] == NULL) {
+ fprintf(ERRORFILE, "Malloc failed in setup_stormdist_dir\n");
+ return -1;
+ }
+ // check to make sure the directory exists
+ if (access(local_dir, F_OK) != 0) {
+ if (errno == ENOENT) {
+ fprintf(ERRORFILE, "Path does not exist %s\n", local_dir);
+ free(paths[0]);
+ paths[0] = NULL;
+ return UNABLE_TO_BUILD_PATH;
+ }
+ }
+ FTS* tree = fts_open(paths, FTS_PHYSICAL | FTS_XDEV, NULL);
+ FTSENT* entry = NULL;
+ int ret = 0;
+
+ if (tree == NULL) {
+ fprintf(ERRORFILE,
+ "Cannot open file traversal structure for the path %s:%s.\n",
+ local_dir, strerror(errno));
+ free(paths[0]);
+ paths[0] = NULL;
+ return -1;
+ }
+
+ if (seteuid(0) != 0) {
+ fprintf(ERRORFILE, "Could not become root\n");
+ return -1;
+ }
+
+ while (((entry = fts_read(tree)) != NULL) && exit_code == 0) {
+ switch (entry->fts_info) {
+
+ case FTS_DP: // A directory being visited in post-order
+ case FTS_DOT: // A dot directory
+ //NOOP
+ fprintf(LOGFILE, "NOOP: %s\n", entry->fts_path); break;
+ case FTS_D: // A directory in pre-order
+ case FTS_F: // A regular file
+ case FTS_SL: // A symbolic link
+ case FTS_SLNONE: // A broken symbolic link
+ //TODO it would be good to validate that the file is owned by the correct user first.
+ fprintf(LOGFILE, "visiting: %s\n", entry->fts_path);
+ if (setup_stormdist(entry, euser) != 0) {
+ exit_code = -1;
+ }
+ break;
+ case FTS_DEFAULT: // Unknown type of file
+ case FTS_DNR: // Unreadable directory
+ case FTS_NS: // A file with no stat(2) information
+ case FTS_DC: // A directory that causes a cycle
+ case FTS_NSOK: // No stat information requested
+ case FTS_ERR: // Error return
+ default:
+ fprintf(LOGFILE, "Unexpected...\n"); break;
+ exit_code = -1;
+ break;
+ }
+ }
+ ret = fts_close(tree);
+ free(paths[0]);
+ paths[0] = NULL;
+ }
+ return exit_code;
+}
+
+
+int signal_container_as_user(const char *user, int pid, int sig) {
+ if(pid <= 0) {
+ return INVALID_CONTAINER_PID;
+ }
+
+ if (change_user(user_detail->pw_uid, user_detail->pw_gid) != 0) {
+ return SETUID_OPER_FAILED;
+ }
+
+ //Don't continue if the process-group is not alive anymore.
+ int has_group = 1;
+ if (kill(-pid,0) < 0) {
+ if (kill(pid, 0) < 0) {
+ if (errno == ESRCH) {
+ return INVALID_CONTAINER_PID;
+ }
+ fprintf(LOGFILE, "Error signalling container %d with %d - %s\n",
+ pid, sig, strerror(errno));
+ return -1;
+ } else {
+ has_group = 0;
+ }
+ }
+
+ if (kill((has_group ? -1 : 1) * pid, sig) < 0) {
+ if(errno != ESRCH) {
+ fprintf(LOGFILE,
+ "Error signalling process group %d with signal %d - %s\n",
+ -pid, sig, strerror(errno));
+ fprintf(stderr,
+ "Error signalling process group %d with signal %d - %s\n",
+ -pid, sig, strerror(errno));
+ fflush(LOGFILE);
+ return UNABLE_TO_SIGNAL_CONTAINER;
+ } else {
+ return INVALID_CONTAINER_PID;
+ }
+ }
+ fprintf(LOGFILE, "Killing process %s%d with %d\n",
+ (has_group ? "group " :""), pid, sig);
+ return 0;
+}
+
+/**
+ * Delete a final directory as the node manager user.
+ */
+static int rmdir_as_nm(const char* path) {
+ int user_uid = geteuid();
+ int user_gid = getegid();
+ int ret = change_effective_user(launcher_uid, launcher_gid);
+ if (ret == 0) {
+ if (rmdir(path) != 0) {
+ fprintf(LOGFILE, "rmdir of %s failed - %s\n", path, strerror(errno));
+ ret = -1;
+ }
+ }
+ // always change back
+ if (change_effective_user(user_uid, user_gid) != 0) {
+ ret = -1;
+ }
+ return ret;
+}
+
+/**
+ * Recursively delete the given path.
+ * full_path : the path to delete
+ * needs_tt_user: the top level directory must be deleted by the tt user.
+ */
+static int delete_path(const char *full_path,
+ int needs_tt_user) {
+ int exit_code = 0;
+
+ if (full_path == NULL) {
+ fprintf(LOGFILE, "Path is null\n");
+ exit_code = UNABLE_TO_BUILD_PATH; // may be malloc failed
+ } else {
+ char *(paths[]) = {strndup(full_path,PATH_MAX), 0};
+ if (paths[0] == NULL) {
+ fprintf(LOGFILE, "Malloc failed in delete_path\n");
+ return -1;
+ }
+ // check to make sure the directory exists
+ if (access(full_path, F_OK) != 0) {
+ if (errno == ENOENT) {
+ free(paths[0]);
+ paths[0] = NULL;
+ return 0;
+ }
+ }
+ FTS* tree = fts_open(paths, FTS_PHYSICAL | FTS_XDEV, NULL);
+ FTSENT* entry = NULL;
+ int ret = 0;
+
+ if (tree == NULL) {
+ fprintf(LOGFILE,
+ "Cannot open file traversal structure for the path %s:%s.\n",
+ full_path, strerror(errno));
+ free(paths[0]);
+ paths[0] = NULL;
+ return -1;
+ }
+ while (((entry = fts_read(tree)) != NULL) && exit_code == 0) {
+ switch (entry->fts_info) {
+
+ case FTS_DP: // A directory being visited in post-order
+ if (!needs_tt_user ||
+ strcmp(entry->fts_path, full_path) != 0) {
+ if (rmdir(entry->fts_accpath) != 0) {
+ fprintf(LOGFILE, "Couldn't delete directory %s - %s\n",
+ entry->fts_path, strerror(errno));
+ exit_code = -1;
+ }
+ }
+ break;
+
+ case FTS_F: // A regular file
+ case FTS_SL: // A symbolic link
+ case FTS_SLNONE: // A broken symbolic link
+ case FTS_DEFAULT: // Unknown type of file
+ if (unlink(entry->fts_accpath) != 0) {
+ fprintf(LOGFILE, "Couldn't delete file %s - %s\n", entry->fts_path,
+ strerror(errno));
+ exit_code = -1;
+ }
+ break;
+
+ case FTS_DNR: // Unreadable directory
+ fprintf(LOGFILE, "Unreadable directory %s. Skipping..\n",
+ entry->fts_path);
+ break;
+
+ case FTS_D: // A directory in pre-order
+ // if the directory isn't readable, chmod it
+ if ((entry->fts_statp->st_mode & 0200) == 0) {
+ fprintf(LOGFILE, "Unreadable directory %s, chmoding.\n",
+ entry->fts_path);
+ if (chmod(entry->fts_accpath, 0700) != 0) {
+ fprintf(LOGFILE, "Error chmoding %s - %s, continuing\n",
+ entry->fts_path, strerror(errno));
+ }
+ }
+ break;
+
+ case FTS_NS: // A file with no stat(2) information
+ // usually a root directory that doesn't exist
+ fprintf(LOGFILE, "Directory not found %s\n", entry->fts_path);
+ break;
+
+ case FTS_DC: // A directory that causes a cycle
+ case FTS_DOT: // A dot directory
+ case FTS_NSOK: // No stat information requested
+ break;
+
+ case FTS_ERR: // Error return
+ fprintf(LOGFILE, "Error traversing directory %s - %s\n",
+ entry->fts_path, strerror(entry->fts_errno));
+ exit_code = -1;
+ break;
+ break;
+ default:
+ exit_code = -1;
+ break;
+ }
+ }
+ ret = fts_close(tree);
+ if (exit_code == 0 && ret != 0) {
+ fprintf(LOGFILE, "Error in fts_close while deleting %s\n", full_path);
+ exit_code = -1;
+ }
+ if (needs_tt_user) {
+ // If the delete failed, try a final rmdir as root on the top level.
+ // That handles the case where the top level directory is in a directory
+ // that is owned by the node manager.
+ exit_code = rmdir_as_nm(full_path);
+ }
+ free(paths[0]);
+ paths[0] = NULL;
+ }
+ return exit_code;
+}
+
+int exec_as_user(const char * working_dir, const char * script_file) {
+ char *script_file_dest = NULL;
+ script_file_dest = get_container_launcher_file(working_dir);
+ if (script_file_dest == NULL) {
+ return OUT_OF_MEMORY;
+ }
+
+ // open launch script
+ int script_file_source = open_file_as_nm(script_file);
+ if (script_file_source == -1) {
+ return -1;
+ }
+
+ setsid();
+
+ // give up root privs
+ if (change_user(user_detail->pw_uid, user_detail->pw_gid) != 0) {
+ return SETUID_OPER_FAILED;
+ }
+
+ if (copy_file(script_file_source, script_file, script_file_dest, S_IRWXU) != 0) {
+ return -1;
+ }
+
+ fcloseall();
+ umask(0027);
+ if (chdir(working_dir) != 0) {
+ fprintf(LOGFILE, "Can't change directory to %s -%s\n", working_dir,
+ strerror(errno));
+ return -1;
+ }
+
+ if (execlp(script_file_dest, script_file_dest, NULL) != 0) {
+ fprintf(LOGFILE, "Couldn't execute the container launch file %s - %s",
+ script_file_dest, strerror(errno));
+ return UNABLE_TO_EXECUTE_CONTAINER_SCRIPT;
+ }
+
+ //Unreachable
+ return -1;
+}
+
+/**
+ * Delete the given directory as the user from each of the directories
+ * user: the user doing the delete
+ * subdir: the subdir to delete (if baseDirs is empty, this is treated as
+ an absolute path)
+ * baseDirs: (optional) the baseDirs where the subdir is located
+ */
+int delete_as_user(const char *user,
+ const char *subdir,
+ char* const* baseDirs) {
+ int ret = 0;
+
+ char** ptr;
+
+ // TODO: No switching user? !!!!
+ if (baseDirs == NULL || *baseDirs == NULL) {
+ return delete_path(subdir, 1);
+ }
+ // do the delete
+ for(ptr = (char**)baseDirs; *ptr != NULL; ++ptr) {
+ char* full_path = concatenate("%s/%s", "user subdir", 2,
+ *ptr, subdir);
+ if (full_path == NULL) {
+ return -1;
+ }
+ int this_ret = delete_path(full_path, strlen(subdir) == 0);
+ free(full_path);
+ full_path = NULL;
+ // delete as much as we can, but remember the error
+ if (this_ret != 0) {
+ ret = this_ret;
+ }
+ }
+ return ret;
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/native/worker-launcher/impl/worker-launcher.h
----------------------------------------------------------------------
diff --git a/storm-core/src/native/worker-launcher/impl/worker-launcher.h b/storm-core/src/native/worker-launcher/impl/worker-launcher.h
new file mode 100644
index 0000000..59ab998
--- /dev/null
+++ b/storm-core/src/native/worker-launcher/impl/worker-launcher.h
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <pwd.h>
+#include <stdio.h>
+#include <sys/types.h>
+
+enum errorcodes {
+ INVALID_ARGUMENT_NUMBER = 1,
+ INVALID_USER_NAME, //2
+ INVALID_COMMAND_PROVIDED, //3
+ // SUPER_USER_NOT_ALLOWED_TO_RUN_TASKS (NOT USED) 4
+ INVALID_NM_ROOT_DIRS = 5,
+ SETUID_OPER_FAILED, //6
+ UNABLE_TO_EXECUTE_CONTAINER_SCRIPT, //7
+ UNABLE_TO_SIGNAL_CONTAINER, //8
+ INVALID_CONTAINER_PID, //9
+ // ERROR_RESOLVING_FILE_PATH (NOT_USED) 10
+ // RELATIVE_PATH_COMPONENTS_IN_FILE_PATH (NOT USED) 11
+ // UNABLE_TO_STAT_FILE (NOT USED) 12
+ // FILE_NOT_OWNED_BY_ROOT (NOT USED) 13
+ // PREPARE_CONTAINER_DIRECTORIES_FAILED (NOT USED) 14
+ // INITIALIZE_CONTAINER_FAILED (NOT USED) 15
+ // PREPARE_CONTAINER_LOGS_FAILED (NOT USED) 16
+ // INVALID_LOG_DIR (NOT USED) 17
+ OUT_OF_MEMORY = 18,
+ // INITIALIZE_DISTCACHEFILE_FAILED (NOT USED) 19
+ INITIALIZE_USER_FAILED = 20,
+ UNABLE_TO_BUILD_PATH, //21
+ INVALID_CONTAINER_EXEC_PERMISSIONS, //22
+ // PREPARE_JOB_LOGS_FAILED (NOT USED) 23
+ INVALID_CONFIG_FILE = 24,
+ SETSID_OPER_FAILED = 25,
+ WRITE_PIDFILE_FAILED = 26
+};
+
+#define LAUNCHER_GROUP_KEY "storm.worker-launcher.group"
+
+#define USER_DIR_PATTERN "%s/usercache/%s"
+#define NM_APP_DIR_PATTERN USER_DIR_PATTERN "/appcache/%s"
+#define CONTAINER_DIR_PATTERN NM_APP_DIR_PATTERN "/%s"
+#define CONTAINER_SCRIPT "launch_container.sh"
+#define CREDENTIALS_FILENAME "container_tokens"
+#define MIN_USERID_KEY "min.user.id"
+#define BANNED_USERS_KEY "banned.users"
+#define TMP_DIR "tmp"
+
+extern struct passwd *user_detail;
+
+// the log file for messages
+extern FILE *LOGFILE;
+// the log file for error messages
+extern FILE *ERRORFILE;
+
+int setup_stormdist_dir(const char* local_dir);
+
+int exec_as_user(const char * working_dir, const char * args);
+
+// delete a directory (or file) recursively as the user. The directory
+// could optionally be relative to the baseDir set of directories (if the same
+// directory appears on multiple disk volumes, the disk volumes should be passed
+// as the baseDirs). If baseDirs is not specified, then dir_to_be_deleted is
+// assumed as the absolute path
+int delete_as_user(const char *user,
+ const char *dir_to_be_deleted,
+ char* const* baseDirs);
+
+// get the executable's filename
+char* get_executable();
+
+/**
+ * Check the permissions on the worker-launcher to make sure that security is
+ * permissible. For this, we need worker-launcher binary to
+ * * be user-owned by root
+ * * be group-owned by a configured special group.
+ * * others do not have any permissions
+ * * be setuid/setgid
+ * @param executable_file the file to check
+ * @return -1 on error 0 on success.
+ */
+int check_executor_permissions(char *executable_file);
+
+/**
+ * Function used to signal a container launched by the user.
+ * The function sends appropriate signal to the process group
+ * specified by the pid.
+ * @param user the user to send the signal as.
+ * @param pid the process id to send the signal to.
+ * @param sig the signal to send.
+ * @return an errorcode enum value on error, or 0 on success.
+ */
+int signal_container_as_user(const char *user, int pid, int sig);
+
+// set the uid and gid of the launcher. This is used when doing some
+// priviledged operations for setting the effective uid and gid.
+void set_launcher_uid(uid_t user, gid_t group);
+
+/**
+ * Is the user a real user account?
+ * Checks:
+ * 1. Not root
+ * 2. UID is above the minimum configured.
+ * 3. Not in banned user list
+ * Returns NULL on failure
+ */
+struct passwd* check_user(const char *user);
+
+// set the user
+int set_user(const char *user);
+
+// methods to get the directories
+
+char *get_container_launcher_file(const char* work_dir);
+
+int change_user(uid_t user, gid_t group);
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/native/worker-launcher/test/test-worker-launcher.c
----------------------------------------------------------------------
diff --git a/storm-core/src/native/worker-launcher/test/test-worker-launcher.c b/storm-core/src/native/worker-launcher/test/test-worker-launcher.c
new file mode 100644
index 0000000..412e922
--- /dev/null
+++ b/storm-core/src/native/worker-launcher/test/test-worker-launcher.c
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "configuration.h"
+#include "worker-launcher.h"
+
+#include <errno.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <signal.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/stat.h>
+#include <sys/wait.h>
+
+#define TEST_ROOT "/tmp/test-worker-launcher"
+#define DONT_TOUCH_FILE "dont-touch-me"
+#define NM_LOCAL_DIRS TEST_ROOT "/local-1," TEST_ROOT "/local-2," \
+ TEST_ROOT "/local-3," TEST_ROOT "/local-4," TEST_ROOT "/local-5"
+#define NM_LOG_DIRS TEST_ROOT "/logdir_1," TEST_ROOT "/logdir_2," \
+ TEST_ROOT "/logdir_3," TEST_ROOT "/logdir_4"
+#define ARRAY_SIZE 1000
+
+static char* username = NULL;
+static char* local_dirs = NULL;
+static char* log_dirs = NULL;
+
+/**
+ * Run the command using the effective user id.
+ * It can't use system, since bash seems to copy the real user id into the
+ * effective id.
+ */
+void run(const char *cmd) {
+ fflush(stdout);
+ fflush(stderr);
+ pid_t child = fork();
+ if (child == -1) {
+ printf("FAIL: failed to fork - %s\n", strerror(errno));
+ } else if (child == 0) {
+ char *cmd_copy = strdup(cmd);
+ char *ptr;
+ int words = 1;
+ for(ptr = strchr(cmd_copy, ' '); ptr; ptr = strchr(ptr+1, ' ')) {
+ words += 1;
+ }
+ char **argv = malloc(sizeof(char *) * (words + 1));
+ ptr = strtok(cmd_copy, " ");
+ int i = 0;
+ argv[i++] = ptr;
+ while (ptr != NULL) {
+ ptr = strtok(NULL, " ");
+ argv[i++] = ptr;
+ }
+ if (execvp(argv[0], argv) != 0) {
+ printf("FAIL: exec failed in child %s - %s\n", cmd, strerror(errno));
+ exit(42);
+ }
+ } else {
+ int status = 0;
+ if (waitpid(child, &status, 0) <= 0) {
+ printf("FAIL: failed waiting for child process %s pid %d - %s\n",
+ cmd, child, strerror(errno));
+ exit(1);
+ }
+ if (!WIFEXITED(status)) {
+ printf("FAIL: process %s pid %d did not exit\n", cmd, child);
+ exit(1);
+ }
+ if (WEXITSTATUS(status) != 0) {
+ printf("FAIL: process %s pid %d exited with error status %d\n", cmd,
+ child, WEXITSTATUS(status));
+ exit(1);
+ }
+ }
+}
+
+int write_config_file(char *file_name) {
+ FILE *file;
+ file = fopen(file_name, "w");
+ if (file == NULL) {
+ printf("Failed to open %s.\n", file_name);
+ return EXIT_FAILURE;
+ }
+ fprintf(file, "banned.users=bannedUser\n");
+ fprintf(file, "min.user.id=1000\n");
+ fclose(file);
+ return 0;
+}
+
+void create_nm_roots(char ** nm_roots) {
+ char** nm_root;
+ for(nm_root=nm_roots; *nm_root != NULL; ++nm_root) {
+ if (mkdir(*nm_root, 0755) != 0) {
+ printf("FAIL: Can't create directory %s - %s\n", *nm_root,
+ strerror(errno));
+ exit(1);
+ }
+ char buffer[100000];
+ sprintf(buffer, "%s/usercache", *nm_root);
+ if (mkdir(buffer, 0755) != 0) {
+ printf("FAIL: Can't create directory %s - %s\n", buffer,
+ strerror(errno));
+ exit(1);
+ }
+ }
+}
+
+void test_get_container_launcher_file() {
+ char *expected_file = ("/tmp/launch_container.sh");
+ char *app_dir = "/tmp";
+ char *container_file = get_container_launcher_file(app_dir);
+ if (strcmp(container_file, expected_file) != 0) {
+ printf("failure to match expected container file %s vs %s\n", container_file,
+ expected_file);
+ exit(1);
+ }
+ free(container_file);
+}
+
+void test_check_user() {
+ printf("\nTesting test_check_user\n");
+ struct passwd *user = check_user(username);
+ if (user == NULL) {
+ printf("FAIL: failed check for user %s\n", username);
+ exit(1);
+ }
+ free(user);
+ if (check_user("lp") != NULL) {
+ printf("FAIL: failed check for system user lp\n");
+ exit(1);
+ }
+ if (check_user("root") != NULL) {
+ printf("FAIL: failed check for system user root\n");
+ exit(1);
+ }
+}
+
+void test_check_configuration_permissions() {
+ printf("\nTesting check_configuration_permissions\n");
+ if (check_configuration_permissions("/etc/passwd") != 0) {
+ printf("FAIL: failed permission check on /etc/passwd\n");
+ exit(1);
+ }
+ if (check_configuration_permissions(TEST_ROOT) == 0) {
+ printf("FAIL: failed permission check on %s\n", TEST_ROOT);
+ exit(1);
+ }
+}
+
+void run_test_in_child(const char* test_name, void (*func)()) {
+ printf("\nRunning test %s in child process\n", test_name);
+ fflush(stdout);
+ fflush(stderr);
+ pid_t child = fork();
+ if (child == -1) {
+ printf("FAIL: fork failed\n");
+ exit(1);
+ } else if (child == 0) {
+ func();
+ exit(0);
+ } else {
+ int status = 0;
+ if (waitpid(child, &status, 0) == -1) {
+ printf("FAIL: waitpid %d failed - %s\n", child, strerror(errno));
+ exit(1);
+ }
+ if (!WIFEXITED(status)) {
+ printf("FAIL: child %d didn't exit - %d\n", child, status);
+ exit(1);
+ }
+ if (WEXITSTATUS(status) != 0) {
+ printf("FAIL: child %d exited with bad status %d\n",
+ child, WEXITSTATUS(status));
+ exit(1);
+ }
+ }
+}
+
+void test_signal_container() {
+ printf("\nTesting signal_container\n");
+ fflush(stdout);
+ fflush(stderr);
+ pid_t child = fork();
+ if (child == -1) {
+ printf("FAIL: fork failed\n");
+ exit(1);
+ } else if (child == 0) {
+ if (change_user(user_detail->pw_uid, user_detail->pw_gid) != 0) {
+ exit(1);
+ }
+ sleep(3600);
+ exit(0);
+ } else {
+ printf("Child container launched as %d\n", child);
+ if (signal_container_as_user(username, child, SIGQUIT) != 0) {
+ exit(1);
+ }
+ int status = 0;
+ if (waitpid(child, &status, 0) == -1) {
+ printf("FAIL: waitpid failed - %s\n", strerror(errno));
+ exit(1);
+ }
+ if (!WIFSIGNALED(status)) {
+ printf("FAIL: child wasn't signalled - %d\n", status);
+ exit(1);
+ }
+ if (WTERMSIG(status) != SIGQUIT) {
+ printf("FAIL: child was killed with %d instead of %d\n",
+ WTERMSIG(status), SIGQUIT);
+ exit(1);
+ }
+ }
+}
+
+void test_signal_container_group() {
+ printf("\nTesting group signal_container\n");
+ fflush(stdout);
+ fflush(stderr);
+ pid_t child = fork();
+ if (child == -1) {
+ printf("FAIL: fork failed\n");
+ exit(1);
+ } else if (child == 0) {
+ setpgrp();
+ if (change_user(user_detail->pw_uid, user_detail->pw_gid) != 0) {
+ exit(1);
+ }
+ sleep(3600);
+ exit(0);
+ }
+ printf("Child container launched as %d\n", child);
+ if (signal_container_as_user(username, child, SIGKILL) != 0) {
+ exit(1);
+ }
+ int status = 0;
+ if (waitpid(child, &status, 0) == -1) {
+ printf("FAIL: waitpid failed - %s\n", strerror(errno));
+ exit(1);
+ }
+ if (!WIFSIGNALED(status)) {
+ printf("FAIL: child wasn't signalled - %d\n", status);
+ exit(1);
+ }
+ if (WTERMSIG(status) != SIGKILL) {
+ printf("FAIL: child was killed with %d instead of %d\n",
+ WTERMSIG(status), SIGKILL);
+ exit(1);
+ }
+}
+
+/**
+ * Ensure that the given path and all of the parent directories are created
+ * with the desired permissions.
+ */
+int mkdirs(const char* path, mode_t perm) {
+ char *cmd = malloc(10 + strlen(path));
+ int ret = 0;
+ sprintf(cmd, "mkdir -p %s", path);
+ ret = system(cmd);
+ free(cmd);
+ return ret;
+}
+
+int main(int argc, char **argv) {
+ LOGFILE = stdout;
+ ERRORFILE = stderr;
+ int my_username = 0;
+
+ // clean up any junk from previous run
+ system("chmod -R u=rwx " TEST_ROOT "; rm -fr " TEST_ROOT);
+
+ if (mkdirs(TEST_ROOT "/logs/userlogs", 0755) != 0) {
+ exit(1);
+ }
+
+ if (write_config_file(TEST_ROOT "/test.cfg") != 0) {
+ exit(1);
+ }
+ read_config(TEST_ROOT "/test.cfg");
+
+ local_dirs = (char *) malloc (sizeof(char) * ARRAY_SIZE);
+ strcpy(local_dirs, NM_LOCAL_DIRS);
+ log_dirs = (char *) malloc (sizeof(char) * ARRAY_SIZE);
+ strcpy(log_dirs, NM_LOG_DIRS);
+
+ create_nm_roots(extract_values(local_dirs));
+
+ if (getuid() == 0 && argc == 2) {
+ username = argv[1];
+ } else {
+ username = strdup(getpwuid(getuid())->pw_name);
+ my_username = 1;
+ }
+ set_launcher_uid(geteuid(), getegid());
+
+ if (set_user(username)) {
+ exit(1);
+ }
+
+ printf("\nStarting tests\n");
+
+ printf("\nTesting get_container_launcher_file()\n");
+ test_get_container_launcher_file();
+
+ printf("\nTesting check_configuration_permissions()\n");
+ test_check_configuration_permissions();
+
+ printf("\nTesting check_user()\n");
+ test_check_user();
+
+ // the tests that change user need to be run in a subshell, so that
+ // when they change user they don't give up our privs
+ run_test_in_child("test_signal_container", test_signal_container);
+ run_test_in_child("test_signal_container_group", test_signal_container_group);
+
+ seteuid(0);
+ run("rm -fr " TEST_ROOT);
+ printf("\nFinished tests\n");
+
+ if (my_username) {
+ free(username);
+ }
+ free_configurations();
+ return 0;
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/py/storm/DistributedRPC.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/DistributedRPC.py b/storm-core/src/py/storm/DistributedRPC.py
index a7e6ef9..851ad65 100644
--- a/storm-core/src/py/storm/DistributedRPC.py
+++ b/storm-core/src/py/storm/DistributedRPC.py
@@ -64,6 +64,8 @@ class Client(Iface):
return result.success
if result.e is not None:
raise result.e
+ if result.aze is not None:
+ raise result.aze
raise TApplicationException(TApplicationException.MISSING_RESULT, "execute failed: unknown result");
@@ -97,6 +99,8 @@ class Processor(Iface, TProcessor):
result.success = self._handler.execute(args.functionName, args.funcArgs)
except DRPCExecutionException, e:
result.e = e
+ except AuthorizationException, aze:
+ result.aze = aze
oprot.writeMessageBegin("execute", TMessageType.REPLY, seqid)
result.write(oprot)
oprot.writeMessageEnd()
@@ -185,19 +189,22 @@ class execute_result:
Attributes:
- success
- e
+ - aze
"""
thrift_spec = (
(0, TType.STRING, 'success', None, None, ), # 0
(1, TType.STRUCT, 'e', (DRPCExecutionException, DRPCExecutionException.thrift_spec), None, ), # 1
+ (2, TType.STRUCT, 'aze', (AuthorizationException, AuthorizationException.thrift_spec), None, ), # 2
)
def __hash__(self):
- return 0 + hash(self.success) + hash(self.e)
+ return 0 + hash(self.success) + hash(self.e) + hash(self.aze)
- def __init__(self, success=None, e=None,):
+ def __init__(self, success=None, e=None, aze=None,):
self.success = success
self.e = e
+ self.aze = aze
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -219,6 +226,12 @@ class execute_result:
self.e.read(iprot)
else:
iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.STRUCT:
+ self.aze = AuthorizationException()
+ self.aze.read(iprot)
+ else:
+ iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
@@ -237,6 +250,10 @@ class execute_result:
oprot.writeFieldBegin('e', TType.STRUCT, 1)
self.e.write(oprot)
oprot.writeFieldEnd()
+ if self.aze is not None:
+ oprot.writeFieldBegin('aze', TType.STRUCT, 2)
+ self.aze.write(oprot)
+ oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/py/storm/DistributedRPCInvocations.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/DistributedRPCInvocations.py b/storm-core/src/py/storm/DistributedRPCInvocations.py
index 4f951a9..6de2245 100644
--- a/storm-core/src/py/storm/DistributedRPCInvocations.py
+++ b/storm-core/src/py/storm/DistributedRPCInvocations.py
@@ -74,6 +74,8 @@ class Client(Iface):
result = result_result()
result.read(self._iprot)
self._iprot.readMessageEnd()
+ if result.aze is not None:
+ raise result.aze
return
def fetchRequest(self, functionName):
@@ -104,6 +106,8 @@ class Client(Iface):
self._iprot.readMessageEnd()
if result.success is not None:
return result.success
+ if result.aze is not None:
+ raise result.aze
raise TApplicationException(TApplicationException.MISSING_RESULT, "fetchRequest failed: unknown result");
def failRequest(self, id):
@@ -132,6 +136,8 @@ class Client(Iface):
result = failRequest_result()
result.read(self._iprot)
self._iprot.readMessageEnd()
+ if result.aze is not None:
+ raise result.aze
return
@@ -163,7 +169,10 @@ class Processor(Iface, TProcessor):
args.read(iprot)
iprot.readMessageEnd()
result = result_result()
- self._handler.result(args.id, args.result)
+ try:
+ self._handler.result(args.id, args.result)
+ except AuthorizationException, aze:
+ result.aze = aze
oprot.writeMessageBegin("result", TMessageType.REPLY, seqid)
result.write(oprot)
oprot.writeMessageEnd()
@@ -174,7 +183,10 @@ class Processor(Iface, TProcessor):
args.read(iprot)
iprot.readMessageEnd()
result = fetchRequest_result()
- result.success = self._handler.fetchRequest(args.functionName)
+ try:
+ result.success = self._handler.fetchRequest(args.functionName)
+ except AuthorizationException, aze:
+ result.aze = aze
oprot.writeMessageBegin("fetchRequest", TMessageType.REPLY, seqid)
result.write(oprot)
oprot.writeMessageEnd()
@@ -185,7 +197,10 @@ class Processor(Iface, TProcessor):
args.read(iprot)
iprot.readMessageEnd()
result = failRequest_result()
- self._handler.failRequest(args.id)
+ try:
+ self._handler.failRequest(args.id)
+ except AuthorizationException, aze:
+ result.aze = aze
oprot.writeMessageBegin("failRequest", TMessageType.REPLY, seqid)
result.write(oprot)
oprot.writeMessageEnd()
@@ -270,12 +285,21 @@ class result_args:
return not (self == other)
class result_result:
+ """
+ Attributes:
+ - aze
+ """
thrift_spec = (
+ None, # 0
+ (1, TType.STRUCT, 'aze', (AuthorizationException, AuthorizationException.thrift_spec), None, ), # 1
)
def __hash__(self):
- return 0
+ return 0 + hash(self.aze)
+
+ def __init__(self, aze=None,):
+ self.aze = aze
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -286,6 +310,12 @@ class result_result:
(fname, ftype, fid) = iprot.readFieldBegin()
if ftype == TType.STOP:
break
+ if fid == 1:
+ if ftype == TType.STRUCT:
+ self.aze = AuthorizationException()
+ self.aze.read(iprot)
+ else:
+ iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
@@ -296,6 +326,10 @@ class result_result:
oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
return
oprot.writeStructBegin('result_result')
+ if self.aze is not None:
+ oprot.writeFieldBegin('aze', TType.STRUCT, 1)
+ self.aze.write(oprot)
+ oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
@@ -381,17 +415,20 @@ class fetchRequest_result:
"""
Attributes:
- success
+ - aze
"""
thrift_spec = (
(0, TType.STRUCT, 'success', (DRPCRequest, DRPCRequest.thrift_spec), None, ), # 0
+ (1, TType.STRUCT, 'aze', (AuthorizationException, AuthorizationException.thrift_spec), None, ), # 1
)
def __hash__(self):
- return 0 + hash(self.success)
+ return 0 + hash(self.success) + hash(self.aze)
- def __init__(self, success=None,):
+ def __init__(self, success=None, aze=None,):
self.success = success
+ self.aze = aze
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -408,6 +445,12 @@ class fetchRequest_result:
self.success.read(iprot)
else:
iprot.skip(ftype)
+ elif fid == 1:
+ if ftype == TType.STRUCT:
+ self.aze = AuthorizationException()
+ self.aze.read(iprot)
+ else:
+ iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
@@ -422,6 +465,10 @@ class fetchRequest_result:
oprot.writeFieldBegin('success', TType.STRUCT, 0)
self.success.write(oprot)
oprot.writeFieldEnd()
+ if self.aze is not None:
+ oprot.writeFieldBegin('aze', TType.STRUCT, 1)
+ self.aze.write(oprot)
+ oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
@@ -504,12 +551,21 @@ class failRequest_args:
return not (self == other)
class failRequest_result:
+ """
+ Attributes:
+ - aze
+ """
thrift_spec = (
+ None, # 0
+ (1, TType.STRUCT, 'aze', (AuthorizationException, AuthorizationException.thrift_spec), None, ), # 1
)
def __hash__(self):
- return 0
+ return 0 + hash(self.aze)
+
+ def __init__(self, aze=None,):
+ self.aze = aze
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -520,6 +576,12 @@ class failRequest_result:
(fname, ftype, fid) = iprot.readFieldBegin()
if ftype == TType.STOP:
break
+ if fid == 1:
+ if ftype == TType.STRUCT:
+ self.aze = AuthorizationException()
+ self.aze.read(iprot)
+ else:
+ iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
@@ -530,6 +592,10 @@ class failRequest_result:
oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
return
oprot.writeStructBegin('failRequest_result')
+ if self.aze is not None:
+ oprot.writeFieldBegin('aze', TType.STRUCT, 1)
+ self.aze.write(oprot)
+ oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/py/storm/Nimbus-remote
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/Nimbus-remote b/storm-core/src/py/storm/Nimbus-remote
index 4b2ff04..c184fab 100755
--- a/storm-core/src/py/storm/Nimbus-remote
+++ b/storm-core/src/py/storm/Nimbus-remote
@@ -28,6 +28,7 @@ if len(sys.argv) <= 1 or sys.argv[1] == '--help':
print ' void activate(string name)'
print ' void deactivate(string name)'
print ' void rebalance(string name, RebalanceOptions options)'
+ print ' void uploadNewCredentials(string name, Credentials creds)'
print ' string beginFileUpload()'
print ' void uploadChunk(string location, string chunk)'
print ' void finishFileUpload(string location)'
@@ -131,6 +132,12 @@ elif cmd == 'rebalance':
sys.exit(1)
pp.pprint(client.rebalance(args[0],eval(args[1]),))
+elif cmd == 'uploadNewCredentials':
+ if len(args) != 2:
+ print 'uploadNewCredentials requires 2 args'
+ sys.exit(1)
+ pp.pprint(client.uploadNewCredentials(args[0],eval(args[1]),))
+
elif cmd == 'beginFileUpload':
if len(args) != 0:
print 'beginFileUpload requires 0 args'