You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/05/21 18:07:01 UTC

[06/14] STORM-216: Added Authentication and Authorization.

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModule.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModule.java b/storm-core/src/jvm/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModule.java
new file mode 100644
index 0000000..807abe3
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModule.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.security.auth.kerberos;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.security.Principal;
+import java.util.Map;
+import javax.security.auth.Subject;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.kerberos.KerberosTicket;
+import javax.security.auth.login.LoginException;
+import javax.security.auth.spi.LoginModule;
+
+
+/**
+ * Custom LoginModule to enable Auto Login based on cached ticket
+ */
+public class AutoTGTKrb5LoginModule implements LoginModule {
+    private static final Logger LOG = LoggerFactory.getLogger(AutoTGTKrb5LoginModule.class);
+
+    // initial state
+    private Subject subject;
+
+    protected KerberosTicket kerbTicket = null;
+
+    public void initialize(Subject subject,
+                           CallbackHandler callbackHandler,
+                           Map<String, ?> sharedState,
+                           Map<String, ?> options) {
+
+        this.subject = subject;
+    }
+
+    public boolean login() throws LoginException {
+        LOG.debug("Acquire TGT from Cache");
+        getKerbTicketFromCache();
+        if (kerbTicket != null) {
+            return true;
+        } else {
+            throw new LoginException("Authentication failed, the TGT not found.");
+        }
+    }
+
+    protected void getKerbTicketFromCache() {
+        kerbTicket = AutoTGT.kerbTicket.get();
+    }
+
+    protected Principal getKerbTicketClient() {
+        if (kerbTicket != null) {
+            return kerbTicket.getClient();
+        }
+        return null;
+    }
+
+    public boolean commit() throws LoginException {
+        if (isSucceeded() == false) {
+            return false;
+        }
+        if (subject == null || subject.isReadOnly()) {
+            kerbTicket = null;
+            throw new LoginException("Authentication failed because the Subject is invalid.");
+        }
+        // Let us add the kerbClientPrinc and kerbTicket
+        subject.getPrivateCredentials().add(kerbTicket);
+        subject.getPrincipals().add(getKerbTicketClient());
+        LOG.debug("Commit Succeeded.");
+        return true;
+    }
+
+    public boolean abort() throws LoginException {
+        if (isSucceeded() == false) {
+            return false;
+        } else {
+            return logout();
+        }
+    }
+
+    public boolean logout() throws LoginException {
+        if (subject != null && !subject.isReadOnly() && kerbTicket != null) {
+            subject.getPrincipals().remove(kerbTicket.getClient());
+            subject.getPrivateCredentials().remove(kerbTicket);
+        }
+        kerbTicket = null;
+        return true;
+    }
+
+    private boolean isSucceeded() {
+        return kerbTicket != null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModuleTest.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModuleTest.java b/storm-core/src/jvm/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModuleTest.java
new file mode 100644
index 0000000..ba34fc9
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModuleTest.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.security.auth.kerberos;
+
+import java.security.Principal;
+import javax.security.auth.kerberos.KerberosTicket;
+
+/**
+ * Custom LoginModule extended for testing.
+ */
+public class AutoTGTKrb5LoginModuleTest extends AutoTGTKrb5LoginModule {
+
+    public Principal client = null;
+
+    public void setKerbTicket(KerberosTicket ticket) {
+        this.kerbTicket = ticket;
+    }
+    
+    @Override
+    protected void getKerbTicketFromCache() {
+        // Do nothing.
+    }
+
+    @Override
+    protected Principal getKerbTicketClient() {
+        return this.client;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/security/auth/kerberos/ClientCallbackHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/kerberos/ClientCallbackHandler.java b/storm-core/src/jvm/backtype/storm/security/auth/kerberos/ClientCallbackHandler.java
new file mode 100644
index 0000000..d46aa8b
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/security/auth/kerberos/ClientCallbackHandler.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.security.auth.kerberos;
+
+import java.io.IOException;
+import java.util.Map;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.security.auth.AuthUtils;
+
+/**
+ * SASL client side callback handler.
+ */
+public class ClientCallbackHandler implements CallbackHandler {
+    private static final Logger LOG = LoggerFactory.getLogger(ClientCallbackHandler.class);
+
+    /**
+     * Constructor based on a JAAS configuration
+     * 
+     * For digest, you should have a pair of user name and password defined in this figgure.
+     * 
+     * @param configuration
+     * @throws IOException
+     */
+    public ClientCallbackHandler(Configuration configuration) throws IOException {
+        if (configuration == null) return;
+        AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(AuthUtils.LOGIN_CONTEXT_CLIENT);
+        if (configurationEntries == null) {
+            String errorMessage = "Could not find a '"+AuthUtils.LOGIN_CONTEXT_CLIENT
+                    + "' entry in this configuration: Client cannot start.";
+            LOG.error(errorMessage);
+            throw new IOException(errorMessage);
+        }
+    }
+
+    /**
+     * This method is invoked by SASL for authentication challenges
+     * @param callbacks a collection of challenge callbacks 
+     */
+    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+        for (Callback c : callbacks) {
+            if (c instanceof NameCallback) {
+                LOG.debug("name callback");
+            } else if (c instanceof PasswordCallback) {
+                LOG.debug("password callback");
+                LOG.warn("Could not login: the client is being asked for a password, but the " +
+                        " client code does not currently support obtaining a password from the user." +
+                        " Make sure that the client is configured to use a ticket cache (using" +
+                        " the JAAS configuration setting 'useTicketCache=true)' and restart the client. If" +
+                        " you still get this message after that, the TGT in the ticket cache has expired and must" +
+                        " be manually refreshed. To do so, first determine if you are using a password or a" +
+                        " keytab. If the former, run kinit in a Unix shell in the environment of the user who" +
+                        " is running this client using the command" +
+                        " 'kinit <princ>' (where <princ> is the name of the client's Kerberos principal)." +
+                        " If the latter, do" +
+                        " 'kinit -k -t <keytab> <princ>' (where <princ> is the name of the Kerberos principal, and" +
+                        " <keytab> is the location of the keytab file). After manually refreshing your cache," +
+                        " restart this client. If you continue to see this message after manually refreshing" +
+                        " your cache, ensure that your KDC host's clock is in sync with this host's clock.");
+            } else if (c instanceof AuthorizeCallback) {
+                LOG.debug("authorization callback");
+                AuthorizeCallback ac = (AuthorizeCallback) c;
+                String authid = ac.getAuthenticationID();
+                String authzid = ac.getAuthorizationID();
+                if (authid.equals(authzid)) {
+                    ac.setAuthorized(true);
+                } else {
+                    ac.setAuthorized(false);
+                }
+                if (ac.isAuthorized()) {
+                    ac.setAuthorizedID(authzid);
+                }
+            }  else {
+                throw new UnsupportedCallbackException(c);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java b/storm-core/src/jvm/backtype/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
new file mode 100644
index 0000000..451f87b
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.security.auth.kerberos;
+
+import java.io.IOException;
+import java.security.Principal;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import javax.security.auth.Subject;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.kerberos.KerberosTicket;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginException;
+import javax.security.sasl.Sasl;
+import org.apache.thrift.transport.TSaslClientTransport;
+import org.apache.thrift.transport.TSaslServerTransport;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.TTransportFactory;
+import org.apache.zookeeper.Login;
+import org.apache.zookeeper.server.auth.KerberosName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.security.auth.AuthUtils;
+import backtype.storm.security.auth.SaslTransportPlugin;
+
+public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
+    public static final String KERBEROS = "GSSAPI"; 
+    private static final Logger LOG = LoggerFactory.getLogger(KerberosSaslTransportPlugin.class);
+
+    public TTransportFactory getServerTransportFactory() throws IOException {
+        //create an authentication callback handler
+        CallbackHandler server_callback_handler = new ServerCallbackHandler(login_conf);
+        
+        //login our principal
+        Subject subject = null;
+        try {
+            //specify a configuration object to be used
+            Configuration.setConfiguration(login_conf); 
+            //now login
+            Login login = new Login(AuthUtils.LOGIN_CONTEXT_SERVER, server_callback_handler);
+            subject = login.getSubject();
+        } catch (LoginException ex) {
+            LOG.error("Server failed to login in principal:" + ex, ex);
+            throw new RuntimeException(ex);
+        }
+
+        //check the credential of our principal
+        if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) { 
+            throw new RuntimeException("Fail to verify user principal with section \""
+                    +AuthUtils.LOGIN_CONTEXT_SERVER+"\" in login configuration file "+ login_conf);
+        }
+
+        String principal = AuthUtils.get(login_conf, AuthUtils.LOGIN_CONTEXT_SERVER, "principal"); 
+        LOG.debug("principal:"+principal);  
+        KerberosName serviceKerberosName = new KerberosName(principal);
+        String serviceName = serviceKerberosName.getServiceName();
+        String hostName = serviceKerberosName.getHostName();
+        Map<String, String> props = new TreeMap<String,String>();
+        props.put(Sasl.QOP, "auth");
+        props.put(Sasl.SERVER_AUTH, "false");
+
+        //create a transport factory that will invoke our auth callback for digest
+        TSaslServerTransport.Factory factory = new TSaslServerTransport.Factory();
+        factory.addServerDefinition(KERBEROS, serviceName, hostName, props, server_callback_handler);
+
+        //create a wrap transport factory so that we could apply user credential during connections
+        TUGIAssumingTransportFactory wrapFactory = new TUGIAssumingTransportFactory(factory, subject); 
+
+        LOG.info("SASL GSSAPI transport factory will be used");
+        return wrapFactory;
+    }
+
+    public TTransport connect(TTransport transport, String serverHost) throws TTransportException, IOException {
+        //create an authentication callback handler
+        ClientCallbackHandler client_callback_handler = new ClientCallbackHandler(login_conf);
+        
+        //login our user
+        Login login = null;
+        try { 
+            //specify a configuration object to be used
+            Configuration.setConfiguration(login_conf); 
+            //now login
+            login  = new Login(AuthUtils.LOGIN_CONTEXT_CLIENT, client_callback_handler);
+        } catch (LoginException ex) {
+            LOG.error("Server failed to login in principal:" + ex, ex);
+            throw new RuntimeException(ex);
+        }
+
+        final Subject subject = login.getSubject();
+        if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) { //error
+            throw new RuntimeException("Fail to verify user principal with section \""
+                        +AuthUtils.LOGIN_CONTEXT_CLIENT+"\" in login configuration file "+ login_conf);
+        }
+
+        final String principal = getPrincipal(subject); 
+        String serviceName = AuthUtils.get(login_conf, AuthUtils.LOGIN_CONTEXT_CLIENT, "serviceName");
+        if (serviceName == null) {
+            serviceName = AuthUtils.SERVICE; 
+        }
+        Map<String, String> props = new TreeMap<String,String>();
+        props.put(Sasl.QOP, "auth");
+        props.put(Sasl.SERVER_AUTH, "false");
+
+        LOG.debug("SASL GSSAPI client transport is being established");
+        final TTransport sasalTransport = new TSaslClientTransport(KERBEROS, 
+                principal, 
+                serviceName, 
+                serverHost,
+                props,
+                null, 
+                transport);
+
+        //open Sasl transport with the login credential
+        try {
+            Subject.doAs(subject,
+                    new PrivilegedExceptionAction<Void>() {
+                public Void run() {
+                    try {
+                        LOG.debug("do as:"+ principal);
+                        sasalTransport.open();
+                    }
+                    catch (Exception e) {
+                        LOG.error("Client failed to open SaslClientTransport to interact with a server during session initiation: " + e, e);
+                    }
+                    return null;
+                }
+            });
+        } catch (PrivilegedActionException e) {
+            throw new RuntimeException(e);
+        }
+
+        return sasalTransport;
+    }
+
+    private String getPrincipal(Subject subject) {
+        Set<Principal> principals = (Set<Principal>)subject.getPrincipals();
+        if (principals==null || principals.size()<1) {
+            LOG.info("No principal found in login subject");
+            return null;
+        }
+        return ((Principal)(principals.toArray()[0])).getName();
+    }
+
+    /** A TransportFactory that wraps another one, but assumes a specified UGI
+     * before calling through.                                                                                                                                                      
+     *                                                                                                                                                                              
+     * This is used on the server side to assume the server's Principal when accepting                                                                                              
+     * clients.                                                                                                                                                                     
+     */
+    static class TUGIAssumingTransportFactory extends TTransportFactory {
+        private final Subject subject;
+        private final TTransportFactory wrapped;
+
+        public TUGIAssumingTransportFactory(TTransportFactory wrapped, Subject subject) {
+            this.wrapped = wrapped;
+            this.subject = subject;
+
+            Set<Principal> principals = (Set<Principal>)subject.getPrincipals();
+            if (principals.size()>0) 
+                LOG.info("Service principal:"+ ((Principal)(principals.toArray()[0])).getName());
+        }
+
+        @Override
+        public TTransport getTransport(final TTransport trans) {
+            try {
+                return Subject.doAs(subject,
+                        new PrivilegedExceptionAction<TTransport>() {
+                    public TTransport run() {
+                        try {
+                            return wrapped.getTransport(trans);
+                        }
+                        catch (Exception e) {
+                            LOG.error("Storm server failed to open transport to interact with a client during session initiation: " + e, e);
+                            return null;
+                        }
+                    }
+                });
+            } catch (PrivilegedActionException e) {
+                LOG.error("Storm server experienced a PrivilegedActionException exception while creating a transport using a JAAS principal context:" + e, e);
+                return null;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/security/auth/kerberos/ServerCallbackHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/kerberos/ServerCallbackHandler.java b/storm-core/src/jvm/backtype/storm/security/auth/kerberos/ServerCallbackHandler.java
new file mode 100644
index 0000000..9dc75c4
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/security/auth/kerberos/ServerCallbackHandler.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.security.auth.kerberos;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+
+import backtype.storm.security.auth.AuthUtils;
+
+/**
+ * SASL server side callback handler
+ */
+public class ServerCallbackHandler implements CallbackHandler {
+    private static final Logger LOG = LoggerFactory.getLogger(ServerCallbackHandler.class);
+
+    private String userName;
+
+    public ServerCallbackHandler(Configuration configuration) throws IOException {
+        if (configuration==null) return;
+
+        AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(AuthUtils.LOGIN_CONTEXT_SERVER);
+        if (configurationEntries == null) {
+            String errorMessage = "Could not find a '"+AuthUtils.LOGIN_CONTEXT_SERVER+"' entry in this configuration: Server cannot start.";
+            LOG.error(errorMessage);
+            throw new IOException(errorMessage);
+        }
+    }
+
+    public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
+        for (Callback callback : callbacks) {
+            if (callback instanceof NameCallback) {
+                handleNameCallback((NameCallback) callback);
+            } else if (callback instanceof PasswordCallback) {
+                handlePasswordCallback((PasswordCallback) callback);
+            } else if (callback instanceof AuthorizeCallback) {
+                handleAuthorizeCallback((AuthorizeCallback) callback);
+            }
+        }
+    }
+
+    private void handleNameCallback(NameCallback nc) {
+        LOG.debug("handleNameCallback");
+        userName = nc.getDefaultName();
+        nc.setName(nc.getDefaultName());
+    }
+
+    private void handlePasswordCallback(PasswordCallback pc) {
+        LOG.warn("No password found for user: " + userName);
+    }
+
+    private void handleAuthorizeCallback(AuthorizeCallback ac) {
+        String authenticationID = ac.getAuthenticationID();
+        LOG.debug("Successfully authenticated client: authenticationID=" + authenticationID);
+        ac.setAuthorized(true);
+
+        ac.setAuthorizedID(authenticationID);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/security/auth/kerberos/jaas_kerberos_cluster.conf
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/kerberos/jaas_kerberos_cluster.conf b/storm-core/src/jvm/backtype/storm/security/auth/kerberos/jaas_kerberos_cluster.conf
new file mode 100644
index 0000000..92a1399
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/security/auth/kerberos/jaas_kerberos_cluster.conf
@@ -0,0 +1,31 @@
+/* 
+This is a sample JAAS configuration for Storm servers to handle Kerberos authentication
+*/
+
+/*
+ StormServer section should contains the info about server keytab file and server principal.
+ In Storm, we have 2 thrift servers: Nimbus and DRPC. These servers could be assigned with
+ different principals.
+*/
+StormServer {
+       com.sun.security.auth.module.Krb5LoginModule required
+       useKeyTab=true
+       keyTab="/etc/storm_server.keytab"
+       storeKey=true
+       useTicketCache=false
+       principal="storm_service/carcloth.corp.acme.com@STORM.CORP.ACME.COM";
+};
+
+/*
+StormClient section should contains the info about client keytab file and client principal. 
+For example, Supervisors are clients of Nimbus, and we should assign keytab/principal for supervisors.
+*/
+StormClient {
+       com.sun.security.auth.module.Krb5LoginModule required
+       useKeyTab=true
+       keyTab="/etc/storm_client.keytab"
+       storeKey=true
+       useTicketCache=false
+       serviceName="storm_service";
+};
+

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/security/auth/kerberos/jaas_kerberos_launcher.conf
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/kerberos/jaas_kerberos_launcher.conf b/storm-core/src/jvm/backtype/storm/security/auth/kerberos/jaas_kerberos_launcher.conf
new file mode 100644
index 0000000..138e1f3
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/security/auth/kerberos/jaas_kerberos_launcher.conf
@@ -0,0 +1,12 @@
+/*
+ This is a sample JAAS configuration for Storm topology launcher/submitter.
+ Since launcher machines are typically accessible by many folks, we 
+ encourage you to leverage "kinit", instead of keytab.  
+*/
+StormClient {
+       com.sun.security.auth.module.Krb5LoginModule required
+       doNotPrompt=true
+       useTicketCache=true
+       serviceName="storm_service";
+};
+

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/testing/SingleUserSimpleTransport.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/testing/SingleUserSimpleTransport.java b/storm-core/src/jvm/backtype/storm/testing/SingleUserSimpleTransport.java
new file mode 100644
index 0000000..4d25ac7
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/testing/SingleUserSimpleTransport.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.testing;
+
+import backtype.storm.security.auth.SimpleTransportPlugin;
+import javax.security.auth.Subject;
+import java.security.Principal;
+import java.util.HashSet;
+
+
+public class SingleUserSimpleTransport extends SimpleTransportPlugin {
+   @Override
+   protected Subject getDefaultSubject() {
+       HashSet<Principal> principals = new HashSet<Principal>();
+       principals.add(new Principal() {
+          public String getName() { return "user"; }
+          public String toString() { return "user"; }
+       });
+       return new Subject(true, principals, new HashSet<Object>(), new HashSet<Object>());
+   } 
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/transactional/state/TestTransactionalState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/transactional/state/TestTransactionalState.java b/storm-core/src/jvm/backtype/storm/transactional/state/TestTransactionalState.java
new file mode 100644
index 0000000..3d4a463
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/transactional/state/TestTransactionalState.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.transactional.state;
+
+import java.util.List;
+import java.util.Map;
+
+import backtype.storm.utils.ZookeeperAuthInfo;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.ACL;
+
+/**
+ * Facilitates testing of non-public methods in the parent class.
+ */
+public class TestTransactionalState extends TransactionalState {
+
+    /**
+     * Matching constructor in absence of a default constructor in the parent
+     * class.
+     */
+    protected TestTransactionalState(Map conf, String id, Map componentConf, String subroot) {
+        super(conf, id, componentConf, subroot);
+    }
+
+    public static void createNode(CuratorFramework curator, 
+            String rootDir, byte[] data, List<ACL> acls, CreateMode mode)
+            throws Exception {
+       TransactionalState.createNode(curator, rootDir, data, acls, mode);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/transactional/state/TransactionalState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/transactional/state/TransactionalState.java b/storm-core/src/jvm/backtype/storm/transactional/state/TransactionalState.java
index 91fc666..5afcd0a 100644
--- a/storm-core/src/jvm/backtype/storm/transactional/state/TransactionalState.java
+++ b/storm-core/src/jvm/backtype/storm/transactional/state/TransactionalState.java
@@ -21,18 +21,25 @@ import backtype.storm.Config;
 import backtype.storm.serialization.KryoValuesDeserializer;
 import backtype.storm.serialization.KryoValuesSerializer;
 import backtype.storm.utils.Utils;
+import backtype.storm.utils.ZookeeperAuthInfo;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.ProtectACLCreateModePathAndBytesable;
+import org.apache.curator.framework.api.PathAndBytesable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
 
 public class TransactionalState {
     CuratorFramework _curator;
     KryoValuesSerializer _ser;
     KryoValuesDeserializer _des;
+    List<ACL> _zkAcls = null;
     
     public static TransactionalState newUserState(Map conf, String id, Map componentConf) {
         return new TransactionalState(conf, id, componentConf, "user");
@@ -51,26 +58,55 @@ public class TransactionalState {
                          componentConf
                               .get(Config.TOPOLOGY_KRYO_REGISTER));
             }
-            String rootDir = conf.get(Config.TRANSACTIONAL_ZOOKEEPER_ROOT) + "/" + id + "/" + subroot;
+            String transactionalRoot = (String)conf.get(Config.TRANSACTIONAL_ZOOKEEPER_ROOT);
+            String rootDir = transactionalRoot + "/" + id + "/" + subroot;
             List<String> servers = (List<String>) getWithBackup(conf, Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Config.STORM_ZOOKEEPER_SERVERS);
             Object port = getWithBackup(conf, Config.TRANSACTIONAL_ZOOKEEPER_PORT, Config.STORM_ZOOKEEPER_PORT);
-            CuratorFramework initter = Utils.newCuratorStarted(conf, servers, port);
+            ZookeeperAuthInfo auth = new ZookeeperAuthInfo(conf);
+            CuratorFramework initter = Utils.newCuratorStarted(conf, servers, port, auth);
+            _zkAcls = Utils.getWorkerACL(conf);
             try {
-                initter.create().creatingParentsIfNeeded().forPath(rootDir);
-            } catch(KeeperException.NodeExistsException e)  {
-                
+                TransactionalState.createNode(initter, transactionalRoot, null, null, null);
+            } catch (KeeperException.NodeExistsException e) {
+            }
+            try {
+                TransactionalState.createNode(initter, rootDir, null, _zkAcls, null);
+            } catch (KeeperException.NodeExistsException e) {
             }
-            
             initter.close();
                                     
-            _curator = Utils.newCuratorStarted(conf, servers, port, rootDir);
+            _curator = Utils.newCuratorStarted(conf, servers, port, rootDir, auth);
             _ser = new KryoValuesSerializer(conf);
             _des = new KryoValuesDeserializer(conf);
         } catch (Exception e) {
            throw new RuntimeException(e);
         }
     }
+
+    protected static String forPath(PathAndBytesable<String> builder, 
+            String path, byte[] data) throws Exception {
+        return (data == null) 
+            ? builder.forPath(path) 
+            : builder.forPath(path, data);
+    }
+
+    protected static void createNode(CuratorFramework curator, String path,
+            byte[] data, List<ACL> acls, CreateMode mode) throws Exception {
+        ProtectACLCreateModePathAndBytesable<String> builder =
+            curator.create().creatingParentsIfNeeded();
     
+        if (acls == null) {
+            if (mode == null ) {
+                TransactionalState.forPath(builder, path, data);
+            } else {
+                TransactionalState.forPath(builder.withMode(mode), path, data);
+            }
+            return;
+        }
+
+        TransactionalState.forPath(builder.withACL(acls), path, data);
+    }
+
     public void setData(String path, Object obj) {
         path = "/" + path;
         byte[] ser = _ser.serializeObject(obj);
@@ -78,10 +114,8 @@ public class TransactionalState {
             if(_curator.checkExists().forPath(path)!=null) {
                 _curator.setData().forPath(path, ser);
             } else {
-                _curator.create()
-                        .creatingParentsIfNeeded()
-                        .withMode(CreateMode.PERSISTENT)
-                        .forPath(path, ser);
+                TransactionalState.createNode(_curator, path, ser, _zkAcls,
+                        CreateMode.PERSISTENT);
             }
         } catch(Exception e) {
             throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/ui/InvalidRequestException.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/ui/InvalidRequestException.java b/storm-core/src/jvm/backtype/storm/ui/InvalidRequestException.java
new file mode 100644
index 0000000..9d0ee92
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/ui/InvalidRequestException.java
@@ -0,0 +1,20 @@
+package backtype.storm.ui;
+
+public class InvalidRequestException extends Exception {
+
+    public InvalidRequestException() {
+        super();
+    }
+
+    public InvalidRequestException(String msg) {
+        super(msg);
+    }
+
+    public InvalidRequestException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+
+    public InvalidRequestException(Throwable cause) {
+        super(cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/utils/DRPCClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DRPCClient.java b/storm-core/src/jvm/backtype/storm/utils/DRPCClient.java
index cf38fb8..3218e49 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DRPCClient.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DRPCClient.java
@@ -17,68 +17,49 @@
  */
 package backtype.storm.utils;
 
+import backtype.storm.Config;
 import backtype.storm.generated.DRPCExecutionException;
 import backtype.storm.generated.DistributedRPC;
+import backtype.storm.generated.AuthorizationException;
 import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
+import backtype.storm.security.auth.ThriftClient;
+import backtype.storm.security.auth.ThriftConnectionType;
+import org.apache.thrift.transport.TTransportException;
 
-public class DRPCClient implements DistributedRPC.Iface {
+import java.util.Map;
+
+public class DRPCClient extends ThriftClient implements DistributedRPC.Iface {
     private TTransport conn;
     private DistributedRPC.Client client;
     private String host;
     private int port;
     private Integer timeout;
 
-    public DRPCClient(String host, int port, Integer timeout) {
-        try {
-            this.host = host;
-            this.port = port;
-            this.timeout = timeout;
-            connect();
-        } catch(TException e) {
-            throw new RuntimeException(e);
-        }
-    }
-    
-    public DRPCClient(String host, int port) {
-        this(host, port, null);
+    public DRPCClient(Map conf, String host, int port) throws TTransportException {
+        this(conf, host, port, null);
     }
-    
-    private void connect() throws TException {
-        TSocket socket = new TSocket(host, port);
-        if(timeout!=null) {
-            socket.setTimeout(timeout);
-        }
-        conn = new TFramedTransport(socket);
-        client = new DistributedRPC.Client(new TBinaryProtocol(conn));
-        conn.open();
+
+    public DRPCClient(Map conf, String host, int port, Integer timeout) throws TTransportException {
+        super(conf, ThriftConnectionType.DRPC, host, port, timeout);
+        this.host = host;
+        this.port = port;
+        this.client = new DistributedRPC.Client(_protocol);
     }
-    
+        
     public String getHost() {
         return host;
     }
     
     public int getPort() {
         return port;
-    }   
+    }
     
-    public String execute(String func, String args) throws TException, DRPCExecutionException {
-        try {
-            if(client==null) connect();
-            return client.execute(func, args);
-        } catch(TException e) {
-            client = null;
-            throw e;
-        } catch(DRPCExecutionException e) {
-            client = null;
-            throw e;
-        }
+    public String execute(String func, String args) throws TException, DRPCExecutionException, AuthorizationException {
+        return client.execute(func, args);
     }
 
-    public void close() {
-        conn.close();
+    public DistributedRPC.Client getClient() {
+        return client;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/utils/LocalState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/LocalState.java b/storm-core/src/jvm/backtype/storm/utils/LocalState.java
index 0d0ae07..f412ff3 100644
--- a/storm-core/src/jvm/backtype/storm/utils/LocalState.java
+++ b/storm-core/src/jvm/backtype/storm/utils/LocalState.java
@@ -23,16 +23,19 @@ import java.io.File;
 import java.util.Map;
 import java.util.HashMap;
 import java.io.IOException;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A simple, durable, atomic K/V database. *Very inefficient*, should only be used for occasional reads/writes.
  * Every read/write hits disk.
  */
 public class LocalState {
+    public static Logger LOG = LoggerFactory.getLogger(LocalState.class);
     private VersionedStore _vs;
     
     public LocalState(String backingDir) throws IOException {
+        LOG.debug("New Local State for {}", backingDir);
         _vs = new VersionedStore(backingDir);
     }
     
@@ -83,8 +86,14 @@ public class LocalState {
     private void persist(Map<Object, Object> val, boolean cleanup) throws IOException {
         byte[] toWrite = Utils.serialize(val);
         String newPath = _vs.createVersion();
-        FileUtils.writeByteArrayToFile(new File(newPath), toWrite);
+        File file = new File(newPath);
+        FileUtils.writeByteArrayToFile(file, toWrite);
+        if (toWrite.length != file.length()) {
+            throw new IOException("Tried to serialize " + toWrite.length + 
+                    " bytes to " + file.getCanonicalPath() + ", but " +
+                    file.length() + " bytes were written.");
+        }
         _vs.succeedVersion(newPath);
         if(cleanup) _vs.cleanup(4);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java b/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
index e93acc8..273e232 100644
--- a/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
+++ b/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
@@ -19,6 +19,7 @@ package backtype.storm.utils;
 
 import backtype.storm.Config;
 import backtype.storm.security.auth.ThriftClient;
+import backtype.storm.security.auth.ThriftConnectionType;
 import backtype.storm.generated.Nimbus;
 import java.util.Map;
 import org.apache.thrift.transport.TTransportException;
@@ -32,8 +33,7 @@ public class NimbusClient extends ThriftClient {
     public static NimbusClient getConfiguredClient(Map conf) {
         try {
             String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);
-            int nimbusPort = Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT));
-            return new NimbusClient(conf, nimbusHost, nimbusPort);
+            return new NimbusClient(conf, nimbusHost);
         } catch (TTransportException ex) {
             throw new RuntimeException(ex);
         }
@@ -44,7 +44,12 @@ public class NimbusClient extends ThriftClient {
     }
 
     public NimbusClient(Map conf, String host, int port, Integer timeout) throws TTransportException {
-        super(conf, host, port, timeout);
+        super(conf, ThriftConnectionType.NIMBUS, host, port, timeout);
+        _client = new Nimbus.Client(_protocol);
+    }
+
+    public NimbusClient(Map conf, String host) throws TTransportException {
+        super(conf, ThriftConnectionType.NIMBUS, host, null, null);
         _client = new Nimbus.Client(_protocol);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/utils/TestUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/TestUtils.java b/storm-core/src/jvm/backtype/storm/utils/TestUtils.java
new file mode 100644
index 0000000..276559c
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/utils/TestUtils.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.utils;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import java.util.Map;
+
+public class TestUtils extends Utils {
+
+    public static void testSetupBuilder(CuratorFrameworkFactory.Builder
+            builder, String zkStr, Map conf, ZookeeperAuthInfo auth)
+    {
+        setupBuilder(builder, zkStr, conf, auth);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java
index a1fed96..c28d93a 100644
--- a/storm-core/src/jvm/backtype/storm/utils/Utils.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java
@@ -17,6 +17,7 @@
  */
 package backtype.storm.utils;
 
+import java.io.BufferedReader;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.FileOutputStream;
@@ -45,7 +46,12 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.commons.lang.StringUtils;
 import org.apache.thrift.TException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
 import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.yaml.snakeyaml.Yaml;
 import org.yaml.snakeyaml.constructor.SafeConstructor;
 
@@ -53,10 +59,13 @@ import backtype.storm.Config;
 import backtype.storm.generated.ComponentCommon;
 import backtype.storm.generated.ComponentObject;
 import backtype.storm.generated.StormTopology;
+import backtype.storm.generated.AuthorizationException;
+
 import clojure.lang.IFn;
 import clojure.lang.RT;
 
 public class Utils {
+    public static Logger LOG = LoggerFactory.getLogger(Utils.class);    
     public static final String DEFAULT_STREAM_ID = "default";
 
     public static Object newInstance(String klass) {
@@ -254,7 +263,7 @@ public class Utils {
         return ret;
     }
 
-    public static void downloadFromMaster(Map conf, String file, String localFile) throws IOException, TException {
+    public static void downloadFromMaster(Map conf, String file, String localFile) throws AuthorizationException, IOException, TException {
         NimbusClient client = NimbusClient.getConfiguredClient(conf);
         String id = client.getClient().beginFileDownload(file);
         WritableByteChannel out = Channels.newChannel(new FileOutputStream(localFile));
@@ -307,6 +316,8 @@ public class Utils {
             return (Integer) o;
         } else if (o instanceof Short) {
             return ((Short) o).intValue();
+        } else if (o instanceof String) {
+            return Integer.parseInt((String) o);
         } else {
             throw new IllegalArgumentException("Don't know how to convert " + o + " + to int");
         }
@@ -316,11 +327,6 @@ public class Utils {
         return UUID.randomUUID().getLeastSignificantBits();
     }
     
-    
-    public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, String root) {
-        return newCurator(conf, servers, port, root, null);
-    }
-
     public static class BoundedExponentialBackoffRetry extends ExponentialBackoffRetry {
 
         protected final int maxRetryInterval;
@@ -350,32 +356,39 @@ public class Utils {
             serverPorts.add(zkServer + ":" + Utils.getInt(port));
         }
         String zkStr = StringUtils.join(serverPorts, ",") + root;
-        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
-                .connectString(zkStr)
-                .connectionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)))
-                .sessionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)))
-                .retryPolicy(new BoundedExponentialBackoffRetry(
+        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
+
+        setupBuilder(builder, zkStr, conf, auth);
+        
+        return builder.build();
+    }
+
+    protected static void setupBuilder(CuratorFrameworkFactory.Builder builder, String zkStr, Map conf, ZookeeperAuthInfo auth)
+    {
+        builder.connectString(zkStr)
+            .connectionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)))
+            .sessionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)))
+            .retryPolicy(new BoundedExponentialBackoffRetry(
                             Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL)),
                             Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)),
                             Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING))));
-        if(auth!=null && auth.scheme!=null) {
+        if(auth!=null && auth.scheme!=null && auth.payload!=null) {
             builder = builder.authorization(auth.scheme, auth.payload);
         }
-        return builder.build();
     }
 
-    public static CuratorFramework newCurator(Map conf, List<String> servers, Object port) {
-        return newCurator(conf, servers, port, "");
+    public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, ZookeeperAuthInfo auth) {
+        return newCurator(conf, servers, port, "", auth);
     }
 
-    public static CuratorFramework newCuratorStarted(Map conf, List<String> servers, Object port, String root) {
-        CuratorFramework ret = newCurator(conf, servers, port, root);
+    public static CuratorFramework newCuratorStarted(Map conf, List<String> servers, Object port, String root, ZookeeperAuthInfo auth) {
+        CuratorFramework ret = newCurator(conf, servers, port, root, auth);
         ret.start();
         return ret;
     }
 
-    public static CuratorFramework newCuratorStarted(Map conf, List<String> servers, Object port) {
-        CuratorFramework ret = newCurator(conf, servers, port);
+    public static CuratorFramework newCuratorStarted(Map conf, List<String> servers, Object port, ZookeeperAuthInfo auth) {
+        CuratorFramework ret = newCurator(conf, servers, port, auth);
         ret.start();
         return ret;
     }    
@@ -413,6 +426,18 @@ public class Utils {
         return ret;
     }
 
+    public static void readAndLogStream(String prefix, InputStream in) {
+        try {
+            BufferedReader r = new BufferedReader(new InputStreamReader(in));
+            String line = null;
+            while ((line = r.readLine())!= null) {
+                LOG.info("{}:{}", prefix, line);
+            }
+        } catch (IOException e) {
+            LOG.warn("Error whiel trying to log stream", e);
+        }
+    }
+
     public static boolean exceptionCauseIsInstanceOf(Class klass, Throwable throwable) {
         Throwable t = throwable;
         while(t != null) {
@@ -423,4 +448,67 @@ public class Utils {
         }
         return false;
     }
+
+    /**
+     * Is the cluster configured to interact with ZooKeeper in a secure way?
+     * This only works when called from within Nimbus or a Supervisor process.
+     * @param conf the storm configuration, not the topology configuration
+     * @return true if it is configured else false.
+     */
+    public static boolean isZkAuthenticationConfiguredStormServer(Map conf) {
+        return null != System.getProperty("java.security.auth.login.config")
+            || (conf != null
+                && conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME) != null
+                && ! ((String)conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME)).isEmpty());
+    }
+
+    /**
+     * Is the topology configured to have ZooKeeper authentication.
+     * @param conf the topology configuration
+     * @return true if ZK is configured else false
+     */
+    public static boolean isZkAuthenticationConfiguredTopology(Map conf) {
+        return (conf != null
+                && conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME) != null
+                && ! ((String)conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME)).isEmpty());
+    }
+
+    public static List<ACL> getWorkerACL(Map conf) {
+        //This is a work around to an issue with ZK where a sasl super user is not super unless there is an open SASL ACL so we are trying to give the correct perms
+        if (!isZkAuthenticationConfiguredTopology(conf)) {
+            return null;
+        }
+        String stormZKUser = (String)conf.get(Config.STORM_ZOOKEEPER_SUPERACL);
+        if (stormZKUser == null) {
+           throw new IllegalArgumentException("Authentication is enabled but "+Config.STORM_ZOOKEEPER_SUPERACL+" is not set");
+        }
+        String[] split = stormZKUser.split(":",2);
+        if (split.length != 2) {
+          throw new IllegalArgumentException(Config.STORM_ZOOKEEPER_SUPERACL+" does not appear to be in the form scheme:acl, i.e. sasl:storm-user");
+        }
+        ArrayList<ACL> ret = new ArrayList<ACL>(ZooDefs.Ids.CREATOR_ALL_ACL);
+        ret.add(new ACL(ZooDefs.Perms.ALL, new Id(split[0], split[1])));
+        return ret;
+    }
+
+   public static String threadDump() {
+       final StringBuilder dump = new StringBuilder();
+       final java.lang.management.ThreadMXBean threadMXBean =  java.lang.management.ManagementFactory.getThreadMXBean();
+       final java.lang.management.ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), 100);
+       for (java.lang.management.ThreadInfo threadInfo : threadInfos) {
+           dump.append('"');
+           dump.append(threadInfo.getThreadName());
+           dump.append("\" ");
+           final Thread.State state = threadInfo.getThreadState();
+           dump.append("\n   java.lang.Thread.State: ");
+           dump.append(state);
+           final StackTraceElement[] stackTraceElements = threadInfo.getStackTrace();
+           for (final StackTraceElement stackTraceElement : stackTraceElements) {
+               dump.append("\n        at ");
+               dump.append(stackTraceElement);
+           }
+           dump.append("\n\n");
+       }
+       return dump.toString();
+   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/utils/ZookeeperAuthInfo.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/ZookeeperAuthInfo.java b/storm-core/src/jvm/backtype/storm/utils/ZookeeperAuthInfo.java
index a5a2e9a..d972135 100644
--- a/storm-core/src/jvm/backtype/storm/utils/ZookeeperAuthInfo.java
+++ b/storm-core/src/jvm/backtype/storm/utils/ZookeeperAuthInfo.java
@@ -27,8 +27,13 @@ public class ZookeeperAuthInfo {
     public byte[] payload = null;
     
     public ZookeeperAuthInfo(Map conf) {
-        String scheme = (String) conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME);
-        String payload = (String) conf.get(Config.STORM_ZOOKEEPER_AUTH_PAYLOAD);
+        String scheme = (String) conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME);
+        String payload = (String) conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD);
+
+        if (scheme == null || payload == null) {
+            scheme = (String) conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME);
+            payload = (String) conf.get(Config.STORM_ZOOKEEPER_AUTH_PAYLOAD);
+        }
         if(scheme!=null) {
             this.scheme = scheme;
             if(payload != null) {

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/utils/ZookeeperServerCnxnFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/ZookeeperServerCnxnFactory.java b/storm-core/src/jvm/backtype/storm/utils/ZookeeperServerCnxnFactory.java
new file mode 100644
index 0000000..08a763a
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/utils/ZookeeperServerCnxnFactory.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.utils;
+
+import java.io.IOException;
+import java.net.BindException;
+import java.net.InetSocketAddress;
+
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZookeeperServerCnxnFactory {
+	private static final Logger LOG = LoggerFactory.getLogger(ZookeeperServerCnxnFactory.class);
+	int _port;
+	NIOServerCnxnFactory _factory;
+	
+	public ZookeeperServerCnxnFactory(int port, int maxClientCnxns)  {
+		//port range
+		int max;
+		if (port <= 0) {
+			_port = 2000;
+			max = 65535;
+		} else {
+			_port = port;
+			max = port;
+		}
+
+		try {
+			_factory = new NIOServerCnxnFactory();
+		} catch (IOException e) {
+			_port = 0;
+			_factory = null;
+			e.printStackTrace();
+			throw new RuntimeException(e.getMessage());
+		}
+		
+		//look for available port 
+		for (; _port <= max; _port++) {
+			try {
+				_factory.configure(new InetSocketAddress(_port), maxClientCnxns);
+				LOG.debug("Zookeeper server successfully binded at port "+_port);
+				break;
+			} catch (BindException e1) {
+			} catch (IOException e2) {
+				_port = 0;
+				_factory = null;
+				e2.printStackTrace();
+				throw new RuntimeException(e2.getMessage());
+			} 
+		} 		
+
+		if (_port > max) {
+			_port = 0;
+			_factory = null;
+			LOG.error("Failed to find a port for Zookeeper");
+			throw new RuntimeException("No port is available to launch an inprocess zookeeper.");
+		}
+	}
+	
+	public int port() {
+		return _port;
+	}
+		
+	public NIOServerCnxnFactory factory() {
+		return _factory;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/storm/trident/drpc/ReturnResultsReducer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/drpc/ReturnResultsReducer.java b/storm-core/src/jvm/storm/trident/drpc/ReturnResultsReducer.java
index 4ebb667..d49371a 100644
--- a/storm-core/src/jvm/storm/trident/drpc/ReturnResultsReducer.java
+++ b/storm-core/src/jvm/storm/trident/drpc/ReturnResultsReducer.java
@@ -20,6 +20,7 @@ package storm.trident.drpc;
 import backtype.storm.Config;
 import backtype.storm.drpc.DRPCInvocationsClient;
 import backtype.storm.generated.DistributedRPCInvocations;
+import backtype.storm.generated.AuthorizationException;
 import backtype.storm.utils.ServiceRegistry;
 import backtype.storm.utils.Utils;
 import java.util.ArrayList;
@@ -28,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import org.apache.commons.lang.builder.ToStringBuilder;
 import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
 import org.json.simple.JSONValue;
 import storm.trident.drpc.ReturnResultsReducer.ReturnResultsState;
 import storm.trident.operation.MultiReducer;
@@ -47,12 +49,13 @@ public class ReturnResultsReducer implements MultiReducer<ReturnResultsState> {
         }
     }
     boolean local;
-
+    Map conf;
     Map<List, DRPCInvocationsClient> _clients = new HashMap<List, DRPCInvocationsClient>();
     
     
     @Override
     public void prepare(Map conf, TridentMultiReducerContext context) {
+        this.conf = conf;
         local = conf.get(Config.STORM_CLUSTER_MODE).equals("local");
     }
 
@@ -89,7 +92,11 @@ public class ReturnResultsReducer implements MultiReducer<ReturnResultsState> {
                 }};
 
                 if(!_clients.containsKey(server)) {
-                    _clients.put(server, new DRPCInvocationsClient(host, port));
+                    try {
+                        _clients.put(server, new DRPCInvocationsClient(conf, host, port));
+                    } catch (TTransportException ex) {
+                        throw new RuntimeException(ex);
+                    }
                 }
                 client = _clients.get(server);
             }
@@ -98,6 +105,8 @@ public class ReturnResultsReducer implements MultiReducer<ReturnResultsState> {
                 client.result(id, result);
             } catch(TException e) {
                 collector.reportError(e);
+            } catch (AuthorizationException aze) {
+                collector.reportError(aze);                
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/storm/trident/topology/state/TestTransactionalState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/topology/state/TestTransactionalState.java b/storm-core/src/jvm/storm/trident/topology/state/TestTransactionalState.java
new file mode 100644
index 0000000..ff3edb6
--- /dev/null
+++ b/storm-core/src/jvm/storm/trident/topology/state/TestTransactionalState.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package storm.trident.topology.state;
+
+import java.util.List;
+import java.util.Map;
+
+import backtype.storm.utils.ZookeeperAuthInfo;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.ACL;
+
+/**
+ * Facilitates testing of non-public methods in the parent class.
+ */
+public class TestTransactionalState extends TransactionalState {
+
+    /**
+     * Matching constructor in absence of a default constructor in the parent
+     * class.
+     */
+    protected TestTransactionalState(Map conf, String id, String subroot) {
+        super(conf, id, subroot);
+    }
+
+    public static void createNode(CuratorFramework curator, 
+            String rootDir, byte[] data, List<ACL> acls, CreateMode mode)
+            throws Exception {
+       TransactionalState.createNode(curator, rootDir, data, acls, mode);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/storm/trident/topology/state/TransactionalState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/topology/state/TransactionalState.java b/storm-core/src/jvm/storm/trident/topology/state/TransactionalState.java
index 5325137..5fba1a2 100644
--- a/storm-core/src/jvm/storm/trident/topology/state/TransactionalState.java
+++ b/storm-core/src/jvm/storm/trident/topology/state/TransactionalState.java
@@ -20,7 +20,12 @@ package storm.trident.topology.state;
 
 import backtype.storm.Config;
 import backtype.storm.utils.Utils;
+import backtype.storm.utils.ZookeeperAuthInfo;
+
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.ProtectACLCreateModePathAndBytesable;
+import org.apache.curator.framework.api.PathAndBytesable;
+
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -28,10 +33,14 @@ import java.util.List;
 import java.util.Map;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
 import org.json.simple.JSONValue;
 
 public class TransactionalState {
     CuratorFramework _curator;
+    List<ACL> _zkAcls = null;
     
     public static TransactionalState newUserState(Map conf, String id) {
         return new TransactionalState(conf, id, "user");
@@ -44,24 +53,53 @@ public class TransactionalState {
     protected TransactionalState(Map conf, String id, String subroot) {
         try {
             conf = new HashMap(conf);
-            String rootDir = conf.get(Config.TRANSACTIONAL_ZOOKEEPER_ROOT) + "/" + id + "/" + subroot;
+            String transactionalRoot = (String)conf.get(Config.TRANSACTIONAL_ZOOKEEPER_ROOT);
+            String rootDir = transactionalRoot + "/" + id + "/" + subroot;
             List<String> servers = (List<String>) getWithBackup(conf, Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Config.STORM_ZOOKEEPER_SERVERS);
             Object port = getWithBackup(conf, Config.TRANSACTIONAL_ZOOKEEPER_PORT, Config.STORM_ZOOKEEPER_PORT);
-            CuratorFramework initter = Utils.newCuratorStarted(conf, servers, port);
+            ZookeeperAuthInfo auth = new ZookeeperAuthInfo(conf);
+            CuratorFramework initter = Utils.newCuratorStarted(conf, servers, port, auth);
+            _zkAcls = Utils.getWorkerACL(conf);
+            try {
+                TransactionalState.createNode(initter, transactionalRoot, null, null, null);
+            } catch (KeeperException.NodeExistsException e) {
+            }
             try {
-                initter.create().creatingParentsIfNeeded().forPath(rootDir);
-            } catch(KeeperException.NodeExistsException e)  {
-                
+                TransactionalState.createNode(initter, rootDir, null, _zkAcls, null);
+            } catch (KeeperException.NodeExistsException e) {
             }
-            
             initter.close();
                                     
-            _curator = Utils.newCuratorStarted(conf, servers, port, rootDir);
+            _curator = Utils.newCuratorStarted(conf, servers, port, rootDir, auth);
         } catch (Exception e) {
            throw new RuntimeException(e);
         }
     }
+
+    protected static String forPath(PathAndBytesable<String> builder, 
+            String path, byte[] data) throws Exception {
+        return (data == null) 
+            ? builder.forPath(path) 
+            : builder.forPath(path, data);
+    }
+
+    protected static void createNode(CuratorFramework curator, String path,
+            byte[] data, List<ACL> acls, CreateMode mode) throws Exception {
+        ProtectACLCreateModePathAndBytesable<String> builder =
+            curator.create().creatingParentsIfNeeded();
     
+        if (acls == null) {
+            if (mode == null ) {
+                TransactionalState.forPath(builder, path, data);
+            } else {
+                TransactionalState.forPath(builder.withMode(mode), path, data);
+            }
+            return;
+        }
+
+        TransactionalState.forPath(builder.withACL(acls), path, data);
+    }
+
     public void setData(String path, Object obj) {
         path = "/" + path;
         byte[] ser;
@@ -74,10 +112,8 @@ public class TransactionalState {
             if(_curator.checkExists().forPath(path)!=null) {
                 _curator.setData().forPath(path, ser);
             } else {
-                _curator.create()
-                        .creatingParentsIfNeeded()
-                        .withMode(CreateMode.PERSISTENT)
-                        .forPath(path, ser);
+                TransactionalState.createNode(_curator, path, ser, _zkAcls,
+                        CreateMode.PERSISTENT);
             }
         } catch(Exception e) {
             throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/native/worker-launcher/.autom4te.cfg
----------------------------------------------------------------------
diff --git a/storm-core/src/native/worker-launcher/.autom4te.cfg b/storm-core/src/native/worker-launcher/.autom4te.cfg
new file mode 100644
index 0000000..1ec584f
--- /dev/null
+++ b/storm-core/src/native/worker-launcher/.autom4te.cfg
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# autom4te configuration for worker-launcher
+#
+
+begin-language: "Autoheader-preselections"
+args: --no-cache 
+end-language: "Autoheader-preselections"
+
+begin-language: "Automake-preselections"
+args: --no-cache 
+end-language: "Automake-preselections"
+
+begin-language: "Autoreconf-preselections"
+args: --no-cache 
+end-language: "Autoreconf-preselections"
+
+begin-language: "Autoconf-without-aclocal-m4"
+args: --no-cache 
+end-language: "Autoconf-without-aclocal-m4"
+
+begin-language: "Autoconf"
+args: --no-cache 
+end-language: "Autoconf"
+

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/native/worker-launcher/.deps/worker-launcher.Po
----------------------------------------------------------------------
diff --git a/storm-core/src/native/worker-launcher/.deps/worker-launcher.Po b/storm-core/src/native/worker-launcher/.deps/worker-launcher.Po
new file mode 100644
index 0000000..9ce06a8
--- /dev/null
+++ b/storm-core/src/native/worker-launcher/.deps/worker-launcher.Po
@@ -0,0 +1 @@
+# dummy

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/native/worker-launcher/Makefile.am
----------------------------------------------------------------------
diff --git a/storm-core/src/native/worker-launcher/Makefile.am b/storm-core/src/native/worker-launcher/Makefile.am
new file mode 100644
index 0000000..c9183c0
--- /dev/null
+++ b/storm-core/src/native/worker-launcher/Makefile.am
@@ -0,0 +1,32 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+AM_CFLAGS=-I$(srcdir)/impl -Wall -g -Werror
+
+# Define the programs that need to be built
+bin_PROGRAMS = worker-launcher
+check_PROGRAMS = test-worker-launcher
+
+TESTS = test-worker-launcher
+
+# Define the sources for the common files
+common_SOURCES = impl/configuration.c impl/worker-launcher.c
+
+# Define the sources for the real executable
+worker_launcher_SOURCES = $(common_SOURCES) impl/main.c
+
+# Define the sources for the test executable
+test_worker_launcher_SOURCES = $(common_SOURCES) test/test-worker-launcher.c

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/native/worker-launcher/configure.ac
----------------------------------------------------------------------
diff --git a/storm-core/src/native/worker-launcher/configure.ac b/storm-core/src/native/worker-launcher/configure.ac
new file mode 100644
index 0000000..ab1ef49
--- /dev/null
+++ b/storm-core/src/native/worker-launcher/configure.ac
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#                                               -*- Autoconf -*-
+# Process this file with autoconf to produce a configure script.
+
+AC_PREREQ(2.59)
+AC_INIT(worker-launcher, 1.0.0, storm-user@googlegroups.com)
+AC_GNU_SOURCE
+
+AM_INIT_AUTOMAKE([subdir-objects foreign no-dist])
+
+AC_CONFIG_SRCDIR([impl/worker-launcher.c])
+AC_CONFIG_FILES([Makefile])
+
+AC_PREFIX_DEFAULT(`pwd`/../install)
+
+# Checks for programs.
+AC_PROG_CC
+AM_PROG_CC_C_O
+
+# Checks for libraries.
+
+# Checks for header files.
+AC_LANG(C)
+AC_CHECK_HEADERS([unistd.h])
+
+# Checks for typedefs, structures, and compiler characteristics.
+AC_HEADER_STDBOOL
+AC_C_CONST
+AC_TYPE_OFF_T
+AC_TYPE_SIZE_T
+AC_FUNC_STRERROR_R
+
+# Checks for library functions.
+AC_CHECK_FUNCS([mkdir uname])
+AC_OUTPUT

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/native/worker-launcher/impl/configuration.c
----------------------------------------------------------------------
diff --git a/storm-core/src/native/worker-launcher/impl/configuration.c b/storm-core/src/native/worker-launcher/impl/configuration.c
new file mode 100644
index 0000000..7b7a3c1
--- /dev/null
+++ b/storm-core/src/native/worker-launcher/impl/configuration.c
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// ensure we get the posix version of dirname by including this first
+#include <libgen.h> 
+
+#include "configuration.h"
+#include "worker-launcher.h"
+
+#include <errno.h>
+#include <limits.h>
+#include <unistd.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+
+#define MAX_SIZE 10
+
+struct confentry {
+  const char *key;
+  const char *value;
+};
+
+struct configuration {
+  int size;
+  struct confentry **confdetails;
+};
+
+struct configuration config={.size=0, .confdetails=NULL};
+
+//clean up method for freeing configuration
+void free_configurations() {
+  int i = 0;
+  for (i = 0; i < config.size; i++) {
+    if (config.confdetails[i]->key != NULL) {
+      free((void *)config.confdetails[i]->key);
+      config.confdetails[i]->key = NULL;
+    }
+    if (config.confdetails[i]->value != NULL) {
+      free((void *)config.confdetails[i]->value);
+      config.confdetails[i]->value = NULL;
+    }
+    free(config.confdetails[i]);
+    config.confdetails[i] = NULL;
+  }
+  if (config.size > 0) {
+    free(config.confdetails);
+    config.confdetails = NULL;
+  }
+  config.size = 0;
+}
+
+/**
+ * Is the file/directory only writable by root.
+ * Returns 1 if true
+ */
+static int is_only_root_writable(const char *file) {
+  struct stat file_stat;
+  if (stat(file, &file_stat) != 0) {
+    fprintf(ERRORFILE, "Can't stat file %s - %s\n", file, strerror(errno));
+    return 0;
+  }
+  if (file_stat.st_uid != 0) {
+    fprintf(ERRORFILE, "File %s must be owned by root, but is owned by %d\n",
+            file, file_stat.st_uid);
+    return 0;
+  }
+  if ((file_stat.st_mode & (S_IWGRP | S_IWOTH)) != 0) {
+    fprintf(ERRORFILE, 
+	    "File %s must not be world or group writable, but is %03o\n",
+	    file, file_stat.st_mode & (~S_IFMT));
+    return 0;
+  }
+  return 1;
+}
+
+/**
+ * Ensure that the configuration file and all of the containing directories
+ * are only writable by root. Otherwise, an attacker can change the 
+ * configuration and potentially cause damage.
+ * returns 0 if permissions are ok
+ */
+int check_configuration_permissions(const char* file_name) {
+  // copy the input so that we can modify it with dirname
+  char* dir = strndup(file_name, PATH_MAX);
+  char* buffer = dir;
+  do {
+    if (!is_only_root_writable(dir)) {
+      free(buffer);
+      buffer = NULL;
+      return -1;
+    }
+    dir = dirname(dir);
+  } while (strcmp(dir, "/") != 0);
+  free(buffer);
+  buffer = NULL;
+  return 0;
+}
+
+//function used to load the configurations present in the secure config
+void read_config(const char* file_name) {
+  FILE *conf_file;
+  char *line;
+  char *equaltok;
+  char *temp_equaltok;
+  size_t linesize = 1000;
+  int size_read = 0;
+
+  if (file_name == NULL) {
+    fprintf(ERRORFILE, "Null configuration filename passed in\n");
+    exit(INVALID_CONFIG_FILE);
+  }
+
+  #ifdef DEBUG
+    fprintf(LOGFILE, "read_config :Conf file name is : %s \n", file_name);
+  #endif
+
+  //allocate space for ten configuration items.
+  config.confdetails = (struct confentry **) malloc(sizeof(struct confentry *)
+      * MAX_SIZE);
+  if (config.confdetails == NULL) {
+      fprintf(ERRORFILE, "malloc failed while reading configuration file.\n");
+      exit(OUT_OF_MEMORY);
+  }
+  config.size = 0;
+  conf_file = fopen(file_name, "r");
+  if (conf_file == NULL) {
+    fprintf(ERRORFILE, "Invalid conf file provided : %s \n", file_name);
+    exit(INVALID_CONFIG_FILE);
+  }
+  while(!feof(conf_file)) {
+    line = (char *) malloc(linesize);
+    if(line == NULL) {
+      fprintf(ERRORFILE, "malloc failed while reading configuration file.\n");
+      exit(OUT_OF_MEMORY);
+    }
+    size_read = getline(&line,&linesize,conf_file);
+ 
+    //feof returns true only after we read past EOF.
+    //so a file with no new line, at last can reach this place
+    //if size_read returns negative check for eof condition
+    if (size_read == -1) {
+      free(line);
+      line = NULL;
+      if(!feof(conf_file)){
+        exit(INVALID_CONFIG_FILE);
+      } else {
+        break;
+      }
+    }
+    int eol = strlen(line) - 1;
+    if(line[eol] == '\n') {
+        //trim the ending new line
+        line[eol] = '\0';
+    }
+    //comment line
+    if(line[0] == '#') {
+      free(line);
+      line = NULL;
+      continue;
+    }
+    //tokenize first to get key and list of values.
+    //if no equals is found ignore this line, can be an empty line also
+    equaltok = strtok_r(line, "=", &temp_equaltok);
+    if(equaltok == NULL) {
+      free(line);
+      line = NULL;
+      continue;
+    }
+    config.confdetails[config.size] = (struct confentry *) malloc(
+            sizeof(struct confentry));
+    if(config.confdetails[config.size] == NULL) {
+      fprintf(LOGFILE,
+          "Failed allocating memory for single configuration item\n");
+      goto cleanup;
+    }
+
+    #ifdef DEBUG
+      fprintf(LOGFILE, "read_config : Adding conf key : %s \n", equaltok);
+    #endif
+
+    memset(config.confdetails[config.size], 0, sizeof(struct confentry));
+    const size_t key_tok_len = strlen(equaltok);
+    config.confdetails[config.size]->key = (char *) malloc(
+            sizeof(char) * (key_tok_len+1));
+    if (config.confdetails[config.size]->key == NULL) {
+      fprintf(LOGFILE,
+          "Failed allocating memory for single configuration item\n");
+      goto cleanup;
+    }
+    memset((void*)config.confdetails[config.size]->key, '\0', key_tok_len+1);
+    strncpy((char *)config.confdetails[config.size]->key, equaltok, key_tok_len);
+    equaltok = strtok_r(NULL, "=", &temp_equaltok);
+    if (equaltok == NULL) {
+      fprintf(LOGFILE, "configuration tokenization failed \n");
+      goto cleanup;
+    }
+    //means value is commented so don't store the key
+    if(equaltok[0] == '#') {
+      free(line);
+      line = NULL;
+      free((void *)config.confdetails[config.size]->key);
+      config.confdetails[config.size]->key = NULL;
+      free(config.confdetails[config.size]);
+      config.confdetails[config.size] = NULL;
+      continue;
+    }
+
+    #ifdef DEBUG
+      fprintf(LOGFILE, "read_config : Adding conf value : %s \n", equaltok);
+    #endif
+
+    const size_t val_tok_len = strlen(equaltok);
+    config.confdetails[config.size]->value = (char *) malloc(
+            sizeof(char) * (val_tok_len+1));
+    if (config.confdetails[config.size]->value == NULL) {
+      fprintf(LOGFILE,
+          "Failed allocating memory for single configuration item\n");
+      goto cleanup;
+    }
+    memset((void *)config.confdetails[config.size]->value, '\0', val_tok_len+1);
+    strncpy((char *)config.confdetails[config.size]->value, equaltok, val_tok_len);
+    if((config.size + 1) % MAX_SIZE  == 0) {
+      config.confdetails = (struct confentry **) realloc(config.confdetails,
+          sizeof(struct confentry **) * (MAX_SIZE + config.size));
+      if (config.confdetails == NULL) {
+        fprintf(LOGFILE,
+            "Failed re-allocating memory for configuration items\n");
+        goto cleanup;
+      }
+    }
+    if(config.confdetails[config.size] )
+    config.size++;
+    free(line);
+    line = NULL;
+  }
+ 
+  //close the file
+  fclose(conf_file);
+
+  if (config.size == 0) {
+    fprintf(ERRORFILE, "Invalid configuration provided in %s\n", file_name);
+    exit(INVALID_CONFIG_FILE);
+  }
+
+  //clean up allocated file name
+  return;
+  //free spaces alloced.
+  cleanup:
+  if (line != NULL) {
+    free(line);
+    line = NULL;
+  }
+  fclose(conf_file);
+  free_configurations();
+  return;
+}
+
+/*
+ * function used to get a configuration value.
+ * The function for the first time populates the configuration details into
+ * array, next time onwards used the populated array.
+ *
+ */
+char * get_value(const char* key) {
+  int count;
+  for (count = 0; count < config.size; count++) {
+    if (strcmp(config.confdetails[count]->key, key) == 0) {
+      return strdup(config.confdetails[count]->value);
+    }
+  }
+  return NULL;
+}
+
+/**
+ * Function to return an array of values for a key.
+ * Value delimiter is assumed to be a comma.
+ */
+char ** get_values(const char * key) {
+  char *value = get_value(key);
+  return extract_values(value);
+}
+
+/**
+ * Extracts array of values from the comma separated list of values.
+ */
+char ** extract_values(char *value) {
+  char ** toPass = NULL;
+  char *tempTok = NULL;
+  char *tempstr = NULL;
+  int size = 0;
+  int toPassSize = MAX_SIZE;
+
+  //first allocate any array of 10
+  if(value != NULL) {
+    toPass = (char **) malloc(sizeof(char *) * toPassSize);
+    tempTok = strtok_r((char *)value, ",", &tempstr);
+    while (tempTok != NULL) {
+      toPass[size++] = tempTok;
+      if(size == toPassSize) {
+        toPassSize += MAX_SIZE;
+        toPass = (char **) realloc(toPass,(sizeof(char *) * toPassSize));
+      }
+      tempTok = strtok_r(NULL, ",", &tempstr);
+    }
+  }
+  if (size > 0) {
+    toPass[size] = NULL;
+  }
+  return toPass;
+}
+
+// free an entry set of values
+void free_values(char** values) {
+  if (*values != NULL) {
+    free(*values);
+    *values = NULL;
+  }
+  if (values != NULL) {
+    free(values);
+    values = NULL;
+  }
+}