You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/05/21 18:07:06 UTC

[11/14] STORM-216: Added Authentication and Authorization.

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/ConfigValidation.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/ConfigValidation.java b/storm-core/src/jvm/backtype/storm/ConfigValidation.java
index 3accb82..e990921 100644
--- a/storm-core/src/jvm/backtype/storm/ConfigValidation.java
+++ b/storm-core/src/jvm/backtype/storm/ConfigValidation.java
@@ -18,6 +18,8 @@
 package backtype.storm;
 import java.util.Map;
 
+import java.util.Map;
+
 /**
  * Provides functionality for validating configuration fields.
  */
@@ -35,51 +37,147 @@ public class ConfigValidation {
          */
         public void validateField(String name, Object field) throws IllegalArgumentException;
     }
+    
+    /**
+     * Declares a method for validating configuration values that is nestable.
+     */
+    public static abstract class NestableFieldValidator implements FieldValidator {
+        @Override
+        public void validateField(String name, Object field) throws IllegalArgumentException {
+            validateField(null, name, field);
+        }
+        
+        /**
+         * Validates the given field.
+         * @param pd describes the parent wrapping this validator.
+         * @param name the name of the field.
+         * @param field The field to be validated.
+         * @throws IllegalArgumentException if the field fails validation.
+         */
+        public abstract void validateField(String pd, String name, Object field) throws IllegalArgumentException;
+    }
 
     /**
-     * Returns a new FieldValidator for a List of the given Class.
+     * Returns a new NestableFieldValidator for a given class.
+     * @param cls the Class the field should be a type of
+     * @param nullAllowed whether or not a value of null is valid
+     * @return a NestableFieldValidator for that class
+     */
+    public static NestableFieldValidator fv(final Class cls, final boolean nullAllowed) {
+        return new NestableFieldValidator() {
+            @Override
+            public void validateField(String pd, String name, Object field)
+                    throws IllegalArgumentException {
+                if (nullAllowed && field == null) {
+                    return;
+                }
+                if (! cls.isInstance(field)) {
+                    throw new IllegalArgumentException(
+                        pd + name + " must be a " + cls.getName() + ". ("+field+")");
+                }
+            }
+        };
+    }
+    
+    /**
+     * Returns a new NestableFieldValidator for a List of the given Class.
      * @param cls the Class of elements composing the list
-     * @return a FieldValidator for a list of the given class
+     * @param nullAllowed whether or not a value of null is valid
+     * @return a NestableFieldValidator for a list of the given class
      */
-    static FieldValidator FieldListValidatorFactory(final Class cls) {
-        return new FieldValidator() {
+    public static NestableFieldValidator listFv(Class cls, boolean nullAllowed) {
+      return listFv(fv(cls, false), nullAllowed);
+    }
+    
+    /**
+     * Returns a new NestableFieldValidator for a List where each item is validated by validator.
+     * @param validator used to validate each item in the list
+     * @param nullAllowed whether or not a value of null is valid
+     * @return a NestableFieldValidator for a list with each item validated by a different validator.
+     */
+    public static NestableFieldValidator listFv(final NestableFieldValidator validator, 
+            final boolean nullAllowed) {
+        return new NestableFieldValidator() {
             @Override
-            public void validateField(String name, Object field)
+            public void validateField(String pd, String name, Object field)
                     throws IllegalArgumentException {
-                if (field == null) {
-                    // A null value is acceptable.
+                if (nullAllowed && field == null) {
                     return;
                 }
                 if (field instanceof Iterable) {
                     for (Object e : (Iterable)field) {
-                        if (! cls.isInstance(e)) {
-                            throw new IllegalArgumentException(
-                                    "Each element of the list " + name + " must be a " +
-                                    cls.getName() + ".");
-                        }
+                        validator.validateField(pd + "Each element of the list ", name, e);
                     }
                     return;
                 }
                 throw new IllegalArgumentException(
-                        "Field " + name + " must be an Iterable of " + cls.getName());
+                        "Field " + name + " must be an Iterable but was " +
+                        ((field == null) ? "null" :  ("a " + field.getClass())));
             }
         };
     }
 
     /**
+     * Returns a new NestableFieldValidator for a Map of key to val.
+     * @param key the Class of keys in the map
+     * @param val the Class of values in the map
+     * @param nullAllowed whether or not a value of null is valid
+     * @return a NestableFieldValidator for a Map of key to val
+     */
+    public static NestableFieldValidator mapFv(Class key, Class val, 
+            boolean nullAllowed) {
+        return mapFv(fv(key, false), fv(val, false), nullAllowed);
+    }
+ 
+    /**
+     * Returns a new NestableFieldValidator for a Map.
+     * @param key a validator for the keys in the map
+     * @param val a validator for the values in the map
+     * @param nullAllowed whether or not a value of null is valid
+     * @return a NestableFieldValidator for a Map
+     */   
+    public static NestableFieldValidator mapFv(final NestableFieldValidator key, 
+            final NestableFieldValidator val, final boolean nullAllowed) {
+        return new NestableFieldValidator() {
+            @SuppressWarnings("unchecked")
+            @Override
+            public void validateField(String pd, String name, Object field)
+                    throws IllegalArgumentException {
+                if (nullAllowed && field == null) {
+                    return;
+                }
+                if (field instanceof Map) {
+                    for (Map.Entry<Object, Object> entry: ((Map<Object, Object>)field).entrySet()) {
+                      key.validateField("Each key of the map ", name, entry.getKey());
+                      val.validateField("Each value in the map ", name, entry.getValue());
+                    }
+                    return;
+                }
+                throw new IllegalArgumentException(
+                        "Field " + name + " must be a Map");
+            }
+        };
+    }
+    
+    /**
      * Validates a list of Numbers.
      */
-    public static Object NumbersValidator = FieldListValidatorFactory(Number.class);
+    public static Object NumbersValidator = listFv(Number.class, true);
 
     /**
-     * Validates is a list of Strings.
+     * Validates a list of Strings.
+     */
+    public static Object StringsValidator = listFv(String.class, true);
+    
+    /**
+     * Validates a map of Strings to Numbers.
      */
-    public static Object StringsValidator = FieldListValidatorFactory(String.class);
+    public static Object MapOfStringToNumberValidator = mapFv(String.class, Number.class, true);
 
     /**
      * Validates is a list of Maps.
      */
-    public static Object MapsValidator = FieldListValidatorFactory(Map.class);
+    public static Object MapsValidator = listFv(Map.class, true);
 
     /**
      * Validates a power of 2.
@@ -105,6 +203,28 @@ public class ConfigValidation {
     };
 
     /**
+     * Validates a positive integer.
+     */
+    public static Object PositiveIntegerValidator = new FieldValidator() {
+        @Override
+        public void validateField(String name, Object o) throws IllegalArgumentException {
+            if (o == null) {
+                // A null value is acceptable.
+                return;
+            }
+            final long i;
+            if (o instanceof Number &&
+                    (i = ((Number)o).longValue()) == ((Number)o).doubleValue())
+            {
+                if (i > 0) {
+                    return;
+                }
+            }
+            throw new IllegalArgumentException("Field " + name + " must be a positive integer.");
+        }
+    };
+
+    /**
      * Validates Kryo Registration
      */
     public static Object KryoRegValidator = new FieldValidator() {
@@ -141,7 +261,7 @@ public class ConfigValidation {
      */
     public static Object StringOrStringListValidator = new FieldValidator() {
 
-        private FieldValidator fv = FieldListValidatorFactory(String.class);
+        private FieldValidator fv = listFv(String.class, false);
 
         @Override
         public void validateField(String name, Object o) throws IllegalArgumentException {

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/Constants.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Constants.java b/storm-core/src/jvm/backtype/storm/Constants.java
index 39d3ffa..35c252f 100644
--- a/storm-core/src/jvm/backtype/storm/Constants.java
+++ b/storm-core/src/jvm/backtype/storm/Constants.java
@@ -31,5 +31,6 @@ public class Constants {
     public static final String METRICS_COMPONENT_ID_PREFIX = "__metrics";
     public static final String METRICS_STREAM_ID = "__metrics";
     public static final String METRICS_TICK_STREAM_ID = "__metrics_tick";
+    public static final String CREDENTIALS_CHANGED_STREAM_ID = "__credentials";
 }
-    
\ No newline at end of file
+    

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/ICredentialsListener.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/ICredentialsListener.java b/storm-core/src/jvm/backtype/storm/ICredentialsListener.java
new file mode 100644
index 0000000..1a7bc1b
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/ICredentialsListener.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm;
+
+import java.util.Map;
+
+/**
+ * Allows a bolt or a spout to be informed when the credentials of the topology have changed.
+ */
+public interface ICredentialsListener {
+    /**
+     * Called when the credentials of a topology have changed.
+     * @param credentials the new credentials, could be null.
+     */
+    public void setCredentials(Map<String,String> credentials);
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/ILocalCluster.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/ILocalCluster.java b/storm-core/src/jvm/backtype/storm/ILocalCluster.java
index 818dfb0..7d5aa35 100644
--- a/storm-core/src/jvm/backtype/storm/ILocalCluster.java
+++ b/storm-core/src/jvm/backtype/storm/ILocalCluster.java
@@ -26,6 +26,7 @@ import backtype.storm.generated.NotAliveException;
 import backtype.storm.generated.RebalanceOptions;
 import backtype.storm.generated.StormTopology;
 import backtype.storm.generated.TopologyInfo;
+import backtype.storm.generated.Credentials;
 
 import java.util.Map;
 
@@ -33,6 +34,7 @@ import java.util.Map;
 public interface ILocalCluster {
     void submitTopology(String topologyName, Map conf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException;
     void submitTopologyWithOpts(String topologyName, Map conf, StormTopology topology, SubmitOptions submitOpts) throws AlreadyAliveException, InvalidTopologyException;
+    void uploadNewCredentials(String topologyName, Credentials creds);
     void killTopology(String topologyName) throws NotAliveException;
     void killTopologyWithOpts(String name, KillOptions options) throws NotAliveException;
     void activate(String topologyName) throws NotAliveException;

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/StormSubmitter.java b/storm-core/src/jvm/backtype/storm/StormSubmitter.java
index 5dfb34b..d5da103 100644
--- a/storm-core/src/jvm/backtype/storm/StormSubmitter.java
+++ b/storm-core/src/jvm/backtype/storm/StormSubmitter.java
@@ -19,6 +19,8 @@ package backtype.storm;
 
 import java.io.File;
 import java.nio.ByteBuffer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -28,13 +30,9 @@ import org.json.simple.JSONValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import backtype.storm.generated.AlreadyAliveException;
-import backtype.storm.generated.ClusterSummary;
-import backtype.storm.generated.InvalidTopologyException;
-import backtype.storm.generated.Nimbus;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.generated.SubmitOptions;
-import backtype.storm.generated.TopologySummary;
+import backtype.storm.security.auth.IAutoCredentials;
+import backtype.storm.security.auth.AuthUtils;
+import backtype.storm.generated.*;
 import backtype.storm.utils.BufferFileInputStream;
 import backtype.storm.utils.NimbusClient;
 import backtype.storm.utils.Utils;
@@ -49,12 +47,99 @@ public class StormSubmitter {
 
     private static final int THRIFT_CHUNK_SIZE_BYTES = 307200;
     
-    private static Nimbus.Iface localNimbus = null;
+    private static ILocalCluster localNimbus = null;
 
-    public static void setLocalNimbus(Nimbus.Iface localNimbusHandler) {
+    public static void setLocalNimbus(ILocalCluster localNimbusHandler) {
         StormSubmitter.localNimbus = localNimbusHandler;
     }
 
+    private static String generateZookeeperDigestSecretPayload() {
+        return Utils.secureRandomLong() + ":" + Utils.secureRandomLong();
+    }
+
+    public static final Pattern zkDigestPattern = Pattern.compile("\\S+:\\S+");
+
+    public static boolean validateZKDigestPayload(String payload) {
+        if (payload != null) {
+            Matcher m = zkDigestPattern.matcher(payload);
+            return m.matches();
+        }
+        return false;
+    }
+
+    @SuppressWarnings("unchecked")
+    public static Map prepareZookeeperAuthentication(Map conf) {
+        Map toRet = new HashMap();
+
+        // Is the topology ZooKeeper authentication configuration unset?
+        if (! conf.containsKey(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD) ||
+                conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD) == null || 
+                !  validateZKDigestPayload((String)
+                    conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD))) {
+
+            String secretPayload = generateZookeeperDigestSecretPayload();
+            toRet.put(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD, secretPayload);
+            LOG.info("Generated ZooKeeper secret payload for MD5-digest: " + secretPayload);
+        }
+        
+        // This should always be set to digest.
+        toRet.put(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME, "digest");
+
+        return toRet;
+    }
+
+    private static Map<String,String> populateCredentials(Map conf, Map<String, String> creds) {
+        Map<String,String> ret = new HashMap<String,String>();
+        for (IAutoCredentials autoCred: AuthUtils.GetAutoCredentials(conf)) {
+            LOG.info("Running "+autoCred);
+            autoCred.populateCredentials(ret);
+        }
+        if (creds != null) {
+            ret.putAll(creds);
+        }
+        return ret;
+    }
+
+    /**
+     * Push a new set of credentials to the running topology.
+     * @param name the name of the topology to push credentials to.
+     * @param stormConf the topology-specific configuration, if desired. See {@link Config}. 
+     * @param credentials the credentials to push.
+     * @throws AuthorizationException if you are not authorized ot push credentials.
+     * @throws NotAliveException if the topology is not alive
+     * @throws InvalidTopologyException if any other error happens
+     */
+    public static void pushCredentials(String name, Map stormConf, Map<String, String> credentials) 
+            throws AuthorizationException, NotAliveException, InvalidTopologyException {
+        stormConf = new HashMap(stormConf);
+        stormConf.putAll(Utils.readCommandLineOpts());
+        Map conf = Utils.readStormConfig();
+        conf.putAll(stormConf);
+        Map<String,String> fullCreds = populateCredentials(conf, credentials);
+        if (fullCreds.isEmpty()) {
+            LOG.warn("No credentials were found to push to "+name);
+            return;
+        }
+        try {
+            if(localNimbus!=null) {
+                LOG.info("Pushing Credentials to topology " + name + " in local mode");
+                localNimbus.uploadNewCredentials(name, new Credentials(fullCreds));
+            } else {
+                NimbusClient client = NimbusClient.getConfiguredClient(conf);
+                try {
+                    LOG.info("Uploading new credentials to " +  name);
+                    client.getClient().uploadNewCredentials(name, new Credentials(fullCreds));
+                } finally {
+                    client.close();
+                }
+            }
+            LOG.info("Finished submitting topology: " +  name);
+        } catch(TException e) {
+            throw new RuntimeException(e);
+        }
+    }
+ 
+
     /**
      * Submits a topology to run on the cluster. A topology runs forever or until 
      * explicitly killed.
@@ -65,8 +150,10 @@ public class StormSubmitter {
      * @param topology the processing to execute.
      * @throws AlreadyAliveException if a topology with this name is already running
      * @throws InvalidTopologyException if an invalid topology was submitted
+     * @throws AuthorizationException if authorization is failed
      */
-    public static void submitTopology(String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException {
+    public static void submitTopology(String name, Map stormConf, StormTopology topology) 
+            throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
         submitTopology(name, stormConf, topology, null, null);
     }    
 
@@ -82,8 +169,11 @@ public class StormSubmitter {
      * @param progressListener to track the progress of the jar upload process
      * @throws AlreadyAliveException if a topology with this name is already running
      * @throws InvalidTopologyException if an invalid topology was submitted
+     * @throws AuthorizationException if authorization is failed
      */
-    public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts, ProgressListener progressListener) throws AlreadyAliveException, InvalidTopologyException {
+    @SuppressWarnings("unchecked")
+    public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts,
+             ProgressListener progressListener) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
         if(!Utils.isValidConf(stormConf)) {
             throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
         }
@@ -91,24 +181,45 @@ public class StormSubmitter {
         stormConf.putAll(Utils.readCommandLineOpts());
         Map conf = Utils.readStormConfig();
         conf.putAll(stormConf);
+        stormConf.putAll(prepareZookeeperAuthentication(conf));
+
+        Map<String,String> passedCreds = new HashMap<String, String>();
+        if (opts != null) {
+            Credentials tmpCreds = opts.get_creds();
+            if (tmpCreds != null) {
+                passedCreds = tmpCreds.get_creds();
+            }
+        }
+        Map<String,String> fullCreds = populateCredentials(conf, passedCreds);
+        if (!fullCreds.isEmpty()) {
+            if (opts == null) {
+                opts = new SubmitOptions(TopologyInitialStatus.ACTIVE);
+            }
+            opts.set_creds(new Credentials(fullCreds));
+        }
         try {
-            String serConf = JSONValue.toJSONString(stormConf);
             if(localNimbus!=null) {
                 LOG.info("Submitting topology " + name + " in local mode");
-                localNimbus.submitTopology(name, null, serConf, topology);
+                if(opts!=null) {
+                    localNimbus.submitTopologyWithOpts(name, stormConf, topology, opts);                    
+                } else {
+                    // this is for backwards compatibility
+                    localNimbus.submitTopology(name, stormConf, topology);                                            
+                }
             } else {
+                String serConf = JSONValue.toJSONString(stormConf);
                 NimbusClient client = NimbusClient.getConfiguredClient(conf);
                 if(topologyNameExists(conf, name)) {
                     throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
                 }
-                submitJar(conf, progressListener);
+                String jar = submitJar(conf, progressListener);
                 try {
                     LOG.info("Submitting topology " +  name + " in distributed mode with conf " + serConf);
                     if(opts!=null) {
-                        client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts);
+                        client.getClient().submitTopologyWithOpts(name, jar, serConf, topology, opts);                    
                     } else {
                         // this is for backwards compatibility
-                        client.getClient().submitTopology(name, submittedJar, serConf, topology);
+                        client.getClient().submitTopology(name, jar, serConf, topology);                                            
                     }
                 } catch(InvalidTopologyException e) {
                     LOG.warn("Topology submission exception: "+e.get_msg());
@@ -136,9 +247,10 @@ public class StormSubmitter {
      * @param topology the processing to execute.
      * @throws AlreadyAliveException if a topology with this name is already running
      * @throws InvalidTopologyException if an invalid topology was submitted
+     * @throws AuthorizationException if authorization is failed
      */
 
-    public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException {
+    public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
         submitTopologyWithProgressBar(name, stormConf, topology, null);
     }
 
@@ -153,9 +265,10 @@ public class StormSubmitter {
      * @param opts to manipulate the starting of the topology
      * @throws AlreadyAliveException if a topology with this name is already running
      * @throws InvalidTopologyException if an invalid topology was submitted
+     * @throws AuthorizationException if authorization is failed
      */
 
-    public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException {
+    public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
         // show a progress bar so we know we're not stuck (especially on slow connections)
         submitTopology(name, stormConf, topology, opts, new StormSubmitter.ProgressListener() {
             @Override
@@ -198,16 +311,8 @@ public class StormSubmitter {
         }
     }
 
-    private static String submittedJar = null;
-
-    private static void submitJar(Map conf, ProgressListener listener) {
-        if(submittedJar==null) {
-            LOG.info("Jar not uploaded to master yet. Submitting jar...");
-            String localJar = System.getProperty("storm.jar");
-            submittedJar = submitJar(conf, localJar, listener);
-        } else {
-            LOG.info("Jar already uploaded to master. Not submitting jar.");
-        }
+    private static String submitJar(Map conf, ProgressListener listener) {
+        return  submitJar(conf, System.getProperty("storm.jar"), listener);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/drpc/DRPCInvocationsClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/drpc/DRPCInvocationsClient.java b/storm-core/src/jvm/backtype/storm/drpc/DRPCInvocationsClient.java
index fadebf6..987cde0 100644
--- a/storm-core/src/jvm/backtype/storm/drpc/DRPCInvocationsClient.java
+++ b/storm-core/src/jvm/backtype/storm/drpc/DRPCInvocationsClient.java
@@ -17,36 +17,33 @@
  */
 package backtype.storm.drpc;
 
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
 import backtype.storm.generated.DRPCRequest;
 import backtype.storm.generated.DistributedRPCInvocations;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.security.auth.ThriftClient;
+import backtype.storm.security.auth.ThriftConnectionType;
+import org.apache.thrift.transport.TTransportException;
 import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class DRPCInvocationsClient implements DistributedRPCInvocations.Iface {
-    private TTransport conn;
-    private DistributedRPCInvocations.Client client;
+public class DRPCInvocationsClient extends ThriftClient implements DistributedRPCInvocations.Iface {
+    public static Logger LOG = LoggerFactory.getLogger(DRPCInvocationsClient.class);
+    private final AtomicReference<DistributedRPCInvocations.Client> client =
+       new AtomicReference<DistributedRPCInvocations.Client>();
     private String host;
-    private int port;    
+    private int port;
 
-    public DRPCInvocationsClient(String host, int port) {
-        try {
-            this.host = host;
-            this.port = port;
-            connect();
-        } catch(TException e) {
-            throw new RuntimeException(e);
-        }
-    }
-    
-    private void connect() throws TException {
-        conn = new TFramedTransport(new TSocket(host, port));
-        client = new DistributedRPCInvocations.Client(new TBinaryProtocol(conn));
-        conn.open();
+    public DRPCInvocationsClient(Map conf, String host, int port) throws TTransportException {
+        super(conf, ThriftConnectionType.DRPC_INVOCATIONS, host, port, null);
+        this.host = host;
+        this.port = port;
+        client.set(new DistributedRPCInvocations.Client(_protocol));
     }
-    
+        
     public String getHost() {
         return host;
     }
@@ -55,37 +52,57 @@ public class DRPCInvocationsClient implements DistributedRPCInvocations.Iface {
         return port;
     }       
 
-    public void result(String id, String result) throws TException {
+    public void reconnectClient() throws TException {
+        if (client.get() == null) {
+            reconnect();
+            client.set(new DistributedRPCInvocations.Client(_protocol));
+        }
+    }
+
+    public boolean isConnected() {
+        return client.get() != null;
+    }
+
+    public void result(String id, String result) throws TException, AuthorizationException {
+        DistributedRPCInvocations.Client c = client.get();
         try {
-            if(client==null) connect();
-            client.result(id, result);
+            if (c == null) {
+                throw new TException("Client is not connected...");
+            }
+            c.result(id, result);
         } catch(TException e) {
-            client = null;
+            client.compareAndSet(c, null);
             throw e;
         }
     }
 
-    public DRPCRequest fetchRequest(String func) throws TException {
+    public DRPCRequest fetchRequest(String func) throws TException, AuthorizationException {
+        DistributedRPCInvocations.Client c = client.get();
         try {
-            if(client==null) connect();
-            return client.fetchRequest(func);
+            if (c == null) {
+                throw new TException("Client is not connected...");
+            }
+            return c.fetchRequest(func);
         } catch(TException e) {
-            client = null;
+            client.compareAndSet(c, null);
             throw e;
         }
     }    
 
-    public void failRequest(String id) throws TException {
+    public void failRequest(String id) throws TException, AuthorizationException {
+        DistributedRPCInvocations.Client c = client.get();
         try {
-            if(client==null) connect();
-            client.failRequest(id);
+            if (c == null) {
+                throw new TException("Client is not connected...");
+            }
+            c.failRequest(id);
         } catch(TException e) {
-            client = null;
+            client.compareAndSet(c, null);
             throw e;
         }
     }
 
-    public void close() {
-        conn.close();
+    public DistributedRPCInvocations.Client getClient() {
+        return client.get();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/drpc/DRPCSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/drpc/DRPCSpout.java b/storm-core/src/jvm/backtype/storm/drpc/DRPCSpout.java
index 918cbc0..82fd6cd 100644
--- a/storm-core/src/jvm/backtype/storm/drpc/DRPCSpout.java
+++ b/storm-core/src/jvm/backtype/storm/drpc/DRPCSpout.java
@@ -21,6 +21,7 @@ import backtype.storm.Config;
 import backtype.storm.ILocalDRPC;
 import backtype.storm.generated.DRPCRequest;
 import backtype.storm.generated.DistributedRPCInvocations;
+import backtype.storm.generated.AuthorizationException;
 import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
@@ -31,18 +32,30 @@ import backtype.storm.utils.ServiceRegistry;
 import backtype.storm.utils.Utils;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
+import java.util.LinkedList;
 import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.Callable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
 import org.json.simple.JSONValue;
 
 public class DRPCSpout extends BaseRichSpout {
+    //ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE PROBLEMS
+    static final long serialVersionUID = 2387848310969237877L;
+
     public static Logger LOG = LoggerFactory.getLogger(DRPCSpout.class);
     
     SpoutOutputCollector _collector;
     List<DRPCInvocationsClient> _clients = new ArrayList<DRPCInvocationsClient>();
+    transient LinkedList<Future<Void>> _futures = null;
+    transient ExecutorService _backround = null;
     String _function;
     String _local_drpc_id = null;
     
@@ -65,11 +78,60 @@ public class DRPCSpout extends BaseRichSpout {
         _function = function;
         _local_drpc_id = drpc.getServiceId();
     }
-    
+   
+    private class Adder implements Callable<Void> {
+        private String server;
+        private int port;
+        private Map conf;
+
+        public Adder(String server, int port, Map conf) {
+            this.server = server;
+            this.port = port;
+            this.conf = conf;
+        }
+
+        @Override
+        public Void call() throws Exception {
+            DRPCInvocationsClient c = new DRPCInvocationsClient(conf, server, port);
+            synchronized (_clients) {
+                _clients.add(c);
+            }
+            return null;
+        }
+    }
+
+    private void reconnect(final DRPCInvocationsClient c) {
+        _futures.add(_backround.submit(new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                c.reconnectClient();
+                return null;
+            }
+        }));
+    }
+
+    private void checkFutures() {
+        Iterator<Future<Void>> i = _futures.iterator();
+        while (i.hasNext()) {
+            Future<Void> f = i.next();
+            if (f.isDone()) {
+                i.remove();
+            }
+            try {
+                f.get();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+ 
     @Override
     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
         _collector = collector;
         if(_local_drpc_id==null) {
+            _backround = Executors.newCachedThreadPool();
+            _futures = new LinkedList<Future<Void>>();
+
             int numTasks = context.getComponentTasks(context.getThisComponentId()).size();
             int index = context.getThisTaskIndex();
 
@@ -78,13 +140,14 @@ public class DRPCSpout extends BaseRichSpout {
             if(servers == null || servers.isEmpty()) {
                 throw new RuntimeException("No DRPC servers configured for topology");   
             }
-            if(numTasks < servers.size()) {
-                for(String s: servers) {
-                    _clients.add(new DRPCInvocationsClient(s, port));
+            
+            if (numTasks < servers.size()) {
+                for (String s: servers) {
+                    _futures.add(_backround.submit(new Adder(s, port, conf)));
                 }
-            } else {
+            } else {        
                 int i = index % servers.size();
-                _clients.add(new DRPCInvocationsClient(servers.get(i), port));
+                _futures.add(_backround.submit(new Adder(servers.get(i), port, conf)));
             }
         }
         
@@ -101,8 +164,18 @@ public class DRPCSpout extends BaseRichSpout {
     public void nextTuple() {
         boolean gotRequest = false;
         if(_local_drpc_id==null) {
-            for(int i=0; i<_clients.size(); i++) {
-                DRPCInvocationsClient client = _clients.get(i);
+            int size = 0;
+            synchronized (_clients) {
+                size = _clients.size(); //This will only ever grow, so no need to worry about falling off the end
+            }
+            for(int i=0; i<size; i++) {
+                DRPCInvocationsClient client;
+                synchronized (_clients) {
+                    client = _clients.get(i);
+                }
+                if (!client.isConnected()) {
+                    continue;
+                }
                 try {
                     DRPCRequest req = client.fetchRequest(_function);
                     if(req.get_request_id().length() > 0) {
@@ -114,10 +187,17 @@ public class DRPCSpout extends BaseRichSpout {
                         _collector.emit(new Values(req.get_func_args(), JSONValue.toJSONString(returnInfo)), new DRPCMessageId(req.get_request_id(), i));
                         break;
                     }
+                } catch (TException e) {
+                    reconnect(client);
+                    LOG.error("Failed to fetch DRPC result from DRPC server", e);
+                } catch (AuthorizationException aze) {
+                    reconnect(client);
+                    LOG.error("Not authorized to fetch DRPC result from DRPC server", aze);
                 } catch (Exception e) {
                     LOG.error("Failed to fetch DRPC result from DRPC server", e);
                 }
             }
+            checkFutures();
         } else {
             DistributedRPCInvocations.Iface drpc = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(_local_drpc_id);
             if(drpc!=null) { // can happen during shutdown of drpc while topology is still up
@@ -133,6 +213,8 @@ public class DRPCSpout extends BaseRichSpout {
                     }
                 } catch (TException e) {
                     throw new RuntimeException(e);
+                } catch (AuthorizationException aze) {
+                    throw new RuntimeException(aze);
                 }
             }
         }
@@ -159,6 +241,8 @@ public class DRPCSpout extends BaseRichSpout {
             client.failRequest(did.id);
         } catch (TException e) {
             LOG.error("Failed to fail request", e);
+        } catch (AuthorizationException aze) {
+            LOG.error("Not authorized to failREquest from DRPC server", aze);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/drpc/ReturnResults.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/drpc/ReturnResults.java b/storm-core/src/jvm/backtype/storm/drpc/ReturnResults.java
index 34cca98..3d50679 100644
--- a/storm-core/src/jvm/backtype/storm/drpc/ReturnResults.java
+++ b/storm-core/src/jvm/backtype/storm/drpc/ReturnResults.java
@@ -19,6 +19,7 @@ package backtype.storm.drpc;
 
 import backtype.storm.Config;
 import backtype.storm.generated.DistributedRPCInvocations;
+import backtype.storm.generated.AuthorizationException;
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
@@ -33,18 +34,23 @@ import java.util.Map;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
 import org.json.simple.JSONValue;
 
 
 public class ReturnResults extends BaseRichBolt {
+    //ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE PROBLEMS
+    static final long serialVersionUID = -774882142710631591L;
+
     public static final Logger LOG = LoggerFactory.getLogger(ReturnResults.class);
     OutputCollector _collector;
     boolean local;
-
+    Map _conf; 
     Map<List, DRPCInvocationsClient> _clients = new HashMap<List, DRPCInvocationsClient>();
 
     @Override
     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        _conf = stormConf;
         _collector = collector;
         local = stormConf.get(Config.STORM_CLUSTER_MODE).equals("local");
     }
@@ -68,17 +74,40 @@ public class ReturnResults extends BaseRichBolt {
                 }};
             
                 if(!_clients.containsKey(server)) {
-                    _clients.put(server, new DRPCInvocationsClient(host, port));
+                    try {
+                        _clients.put(server, new DRPCInvocationsClient(_conf, host, port));
+                    } catch (TTransportException ex) {
+                        throw new RuntimeException(ex);
+                    }
                 }
                 client = _clients.get(server);
             }
-                
+ 
             try {
                 client.result(id, result);
                 _collector.ack(input);
             } catch(TException e) {
                 LOG.error("Failed to return results to DRPC server", e);
                 _collector.fail(input);
+                if (client instanceof DRPCInvocationsClient) {
+                    try {
+                        LOG.info("reconnecting... ");
+                        ((DRPCInvocationsClient)client).reconnectClient(); //Blocking call
+                    } catch (TException e2) {
+                        throw new RuntimeException(e2);
+                    }
+                }
+            } catch (AuthorizationException aze) {
+                LOG.error("Not authorized to return results to DRPC server", aze);
+                _collector.fail(input);
+                if (client instanceof DRPCInvocationsClient) {
+                    try {
+                        LOG.info("reconnecting... ");
+                        ((DRPCInvocationsClient)client).reconnectClient(); //Blocking call
+                    } catch (TException e2) {
+                        throw new RuntimeException(e2);
+                    }
+                }
             }
         }
     }    

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/generated/AuthorizationException.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/AuthorizationException.java b/storm-core/src/jvm/backtype/storm/generated/AuthorizationException.java
new file mode 100644
index 0000000..9efc9da
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/generated/AuthorizationException.java
@@ -0,0 +1,328 @@
+/**
+ * Autogenerated by Thrift Compiler (0.7.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package backtype.storm.generated;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AuthorizationException extends Exception implements org.apache.thrift.TBase<AuthorizationException, AuthorizationException._Fields>, java.io.Serializable, Cloneable {
+  private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("AuthorizationException");
+
+  private static final org.apache.thrift.protocol.TField MSG_FIELD_DESC = new org.apache.thrift.protocol.TField("msg", org.apache.thrift.protocol.TType.STRING, (short)1);
+
+  private String msg; // required
+
+  /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+  public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+    MSG((short)1, "msg");
+
+    private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+    static {
+      for (_Fields field : EnumSet.allOf(_Fields.class)) {
+        byName.put(field.getFieldName(), field);
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, or null if its not found.
+     */
+    public static _Fields findByThriftId(int fieldId) {
+      switch(fieldId) {
+        case 1: // MSG
+          return MSG;
+        default:
+          return null;
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, throwing an exception
+     * if it is not found.
+     */
+    public static _Fields findByThriftIdOrThrow(int fieldId) {
+      _Fields fields = findByThriftId(fieldId);
+      if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+      return fields;
+    }
+
+    /**
+     * Find the _Fields constant that matches name, or null if its not found.
+     */
+    public static _Fields findByName(String name) {
+      return byName.get(name);
+    }
+
+    private final short _thriftId;
+    private final String _fieldName;
+
+    _Fields(short thriftId, String fieldName) {
+      _thriftId = thriftId;
+      _fieldName = fieldName;
+    }
+
+    public short getThriftFieldId() {
+      return _thriftId;
+    }
+
+    public String getFieldName() {
+      return _fieldName;
+    }
+  }
+
+  // isset id assignments
+
+  public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+  static {
+    Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+    tmpMap.put(_Fields.MSG, new org.apache.thrift.meta_data.FieldMetaData("msg", org.apache.thrift.TFieldRequirementType.REQUIRED, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    metaDataMap = Collections.unmodifiableMap(tmpMap);
+    org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(AuthorizationException.class, metaDataMap);
+  }
+
+  public AuthorizationException() {
+  }
+
+  public AuthorizationException(
+    String msg)
+  {
+    this();
+    this.msg = msg;
+  }
+
+  /**
+   * Performs a deep copy on <i>other</i>.
+   */
+  public AuthorizationException(AuthorizationException other) {
+    if (other.is_set_msg()) {
+      this.msg = other.msg;
+    }
+  }
+
+  public AuthorizationException deepCopy() {
+    return new AuthorizationException(this);
+  }
+
+  @Override
+  public void clear() {
+    this.msg = null;
+  }
+
+  public String get_msg() {
+    return this.msg;
+  }
+
+  public void set_msg(String msg) {
+    this.msg = msg;
+  }
+
+  public void unset_msg() {
+    this.msg = null;
+  }
+
+  /** Returns true if field msg is set (has been assigned a value) and false otherwise */
+  public boolean is_set_msg() {
+    return this.msg != null;
+  }
+
+  public void set_msg_isSet(boolean value) {
+    if (!value) {
+      this.msg = null;
+    }
+  }
+
+  public void setFieldValue(_Fields field, Object value) {
+    switch (field) {
+    case MSG:
+      if (value == null) {
+        unset_msg();
+      } else {
+        set_msg((String)value);
+      }
+      break;
+
+    }
+  }
+
+  public Object getFieldValue(_Fields field) {
+    switch (field) {
+    case MSG:
+      return get_msg();
+
+    }
+    throw new IllegalStateException();
+  }
+
+  /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+  public boolean isSet(_Fields field) {
+    if (field == null) {
+      throw new IllegalArgumentException();
+    }
+
+    switch (field) {
+    case MSG:
+      return is_set_msg();
+    }
+    throw new IllegalStateException();
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that == null)
+      return false;
+    if (that instanceof AuthorizationException)
+      return this.equals((AuthorizationException)that);
+    return false;
+  }
+
+  public boolean equals(AuthorizationException that) {
+    if (that == null)
+      return false;
+
+    boolean this_present_msg = true && this.is_set_msg();
+    boolean that_present_msg = true && that.is_set_msg();
+    if (this_present_msg || that_present_msg) {
+      if (!(this_present_msg && that_present_msg))
+        return false;
+      if (!this.msg.equals(that.msg))
+        return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    HashCodeBuilder builder = new HashCodeBuilder();
+
+    boolean present_msg = true && (is_set_msg());
+    builder.append(present_msg);
+    if (present_msg)
+      builder.append(msg);
+
+    return builder.toHashCode();
+  }
+
+  public int compareTo(AuthorizationException other) {
+    if (!getClass().equals(other.getClass())) {
+      return getClass().getName().compareTo(other.getClass().getName());
+    }
+
+    int lastComparison = 0;
+    AuthorizationException typedOther = (AuthorizationException)other;
+
+    lastComparison = Boolean.valueOf(is_set_msg()).compareTo(typedOther.is_set_msg());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_msg()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.msg, typedOther.msg);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    return 0;
+  }
+
+  public _Fields fieldForId(int fieldId) {
+    return _Fields.findByThriftId(fieldId);
+  }
+
+  public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+    org.apache.thrift.protocol.TField field;
+    iprot.readStructBegin();
+    while (true)
+    {
+      field = iprot.readFieldBegin();
+      if (field.type == org.apache.thrift.protocol.TType.STOP) { 
+        break;
+      }
+      switch (field.id) {
+        case 1: // MSG
+          if (field.type == org.apache.thrift.protocol.TType.STRING) {
+            this.msg = iprot.readString();
+          } else { 
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        default:
+          org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+      }
+      iprot.readFieldEnd();
+    }
+    iprot.readStructEnd();
+    validate();
+  }
+
+  public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+    validate();
+
+    oprot.writeStructBegin(STRUCT_DESC);
+    if (this.msg != null) {
+      oprot.writeFieldBegin(MSG_FIELD_DESC);
+      oprot.writeString(this.msg);
+      oprot.writeFieldEnd();
+    }
+    oprot.writeFieldStop();
+    oprot.writeStructEnd();
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("AuthorizationException(");
+    boolean first = true;
+
+    sb.append("msg:");
+    if (this.msg == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.msg);
+    }
+    first = false;
+    sb.append(")");
+    return sb.toString();
+  }
+
+  public void validate() throws org.apache.thrift.TException {
+    // check for required fields
+    if (!is_set_msg()) {
+      throw new org.apache.thrift.protocol.TProtocolException("Required field 'msg' is unset! Struct:" + toString());
+    }
+
+  }
+
+  private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+    try {
+      write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+    try {
+      read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/generated/Credentials.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/Credentials.java b/storm-core/src/jvm/backtype/storm/generated/Credentials.java
new file mode 100644
index 0000000..105cec1
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/generated/Credentials.java
@@ -0,0 +1,373 @@
+/**
+ * Autogenerated by Thrift Compiler (0.7.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package backtype.storm.generated;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Credentials implements org.apache.thrift.TBase<Credentials, Credentials._Fields>, java.io.Serializable, Cloneable {
+  private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("Credentials");
+
+  private static final org.apache.thrift.protocol.TField CREDS_FIELD_DESC = new org.apache.thrift.protocol.TField("creds", org.apache.thrift.protocol.TType.MAP, (short)1);
+
+  private Map<String,String> creds; // required
+
+  /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+  public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+    CREDS((short)1, "creds");
+
+    private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+    static {
+      for (_Fields field : EnumSet.allOf(_Fields.class)) {
+        byName.put(field.getFieldName(), field);
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, or null if its not found.
+     */
+    public static _Fields findByThriftId(int fieldId) {
+      switch(fieldId) {
+        case 1: // CREDS
+          return CREDS;
+        default:
+          return null;
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, throwing an exception
+     * if it is not found.
+     */
+    public static _Fields findByThriftIdOrThrow(int fieldId) {
+      _Fields fields = findByThriftId(fieldId);
+      if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+      return fields;
+    }
+
+    /**
+     * Find the _Fields constant that matches name, or null if its not found.
+     */
+    public static _Fields findByName(String name) {
+      return byName.get(name);
+    }
+
+    private final short _thriftId;
+    private final String _fieldName;
+
+    _Fields(short thriftId, String fieldName) {
+      _thriftId = thriftId;
+      _fieldName = fieldName;
+    }
+
+    public short getThriftFieldId() {
+      return _thriftId;
+    }
+
+    public String getFieldName() {
+      return _fieldName;
+    }
+  }
+
+  // isset id assignments
+
+  public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+  static {
+    Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+    tmpMap.put(_Fields.CREDS, new org.apache.thrift.meta_data.FieldMetaData("creds", org.apache.thrift.TFieldRequirementType.REQUIRED, 
+        new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP, 
+            new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING), 
+            new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))));
+    metaDataMap = Collections.unmodifiableMap(tmpMap);
+    org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(Credentials.class, metaDataMap);
+  }
+
+  public Credentials() {
+  }
+
+  public Credentials(
+    Map<String,String> creds)
+  {
+    this();
+    this.creds = creds;
+  }
+
+  /**
+   * Performs a deep copy on <i>other</i>.
+   */
+  public Credentials(Credentials other) {
+    if (other.is_set_creds()) {
+      Map<String,String> __this__creds = new HashMap<String,String>();
+      for (Map.Entry<String, String> other_element : other.creds.entrySet()) {
+
+        String other_element_key = other_element.getKey();
+        String other_element_value = other_element.getValue();
+
+        String __this__creds_copy_key = other_element_key;
+
+        String __this__creds_copy_value = other_element_value;
+
+        __this__creds.put(__this__creds_copy_key, __this__creds_copy_value);
+      }
+      this.creds = __this__creds;
+    }
+  }
+
+  public Credentials deepCopy() {
+    return new Credentials(this);
+  }
+
+  @Override
+  public void clear() {
+    this.creds = null;
+  }
+
+  public int get_creds_size() {
+    return (this.creds == null) ? 0 : this.creds.size();
+  }
+
+  public void put_to_creds(String key, String val) {
+    if (this.creds == null) {
+      this.creds = new HashMap<String,String>();
+    }
+    this.creds.put(key, val);
+  }
+
+  public Map<String,String> get_creds() {
+    return this.creds;
+  }
+
+  public void set_creds(Map<String,String> creds) {
+    this.creds = creds;
+  }
+
+  public void unset_creds() {
+    this.creds = null;
+  }
+
+  /** Returns true if field creds is set (has been assigned a value) and false otherwise */
+  public boolean is_set_creds() {
+    return this.creds != null;
+  }
+
+  public void set_creds_isSet(boolean value) {
+    if (!value) {
+      this.creds = null;
+    }
+  }
+
+  public void setFieldValue(_Fields field, Object value) {
+    switch (field) {
+    case CREDS:
+      if (value == null) {
+        unset_creds();
+      } else {
+        set_creds((Map<String,String>)value);
+      }
+      break;
+
+    }
+  }
+
+  public Object getFieldValue(_Fields field) {
+    switch (field) {
+    case CREDS:
+      return get_creds();
+
+    }
+    throw new IllegalStateException();
+  }
+
+  /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+  public boolean isSet(_Fields field) {
+    if (field == null) {
+      throw new IllegalArgumentException();
+    }
+
+    switch (field) {
+    case CREDS:
+      return is_set_creds();
+    }
+    throw new IllegalStateException();
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that == null)
+      return false;
+    if (that instanceof Credentials)
+      return this.equals((Credentials)that);
+    return false;
+  }
+
+  public boolean equals(Credentials that) {
+    if (that == null)
+      return false;
+
+    boolean this_present_creds = true && this.is_set_creds();
+    boolean that_present_creds = true && that.is_set_creds();
+    if (this_present_creds || that_present_creds) {
+      if (!(this_present_creds && that_present_creds))
+        return false;
+      if (!this.creds.equals(that.creds))
+        return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    HashCodeBuilder builder = new HashCodeBuilder();
+
+    boolean present_creds = true && (is_set_creds());
+    builder.append(present_creds);
+    if (present_creds)
+      builder.append(creds);
+
+    return builder.toHashCode();
+  }
+
+  public int compareTo(Credentials other) {
+    if (!getClass().equals(other.getClass())) {
+      return getClass().getName().compareTo(other.getClass().getName());
+    }
+
+    int lastComparison = 0;
+    Credentials typedOther = (Credentials)other;
+
+    lastComparison = Boolean.valueOf(is_set_creds()).compareTo(typedOther.is_set_creds());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_creds()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.creds, typedOther.creds);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    return 0;
+  }
+
+  public _Fields fieldForId(int fieldId) {
+    return _Fields.findByThriftId(fieldId);
+  }
+
+  public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+    org.apache.thrift.protocol.TField field;
+    iprot.readStructBegin();
+    while (true)
+    {
+      field = iprot.readFieldBegin();
+      if (field.type == org.apache.thrift.protocol.TType.STOP) { 
+        break;
+      }
+      switch (field.id) {
+        case 1: // CREDS
+          if (field.type == org.apache.thrift.protocol.TType.MAP) {
+            {
+              org.apache.thrift.protocol.TMap _map163 = iprot.readMapBegin();
+              this.creds = new HashMap<String,String>(2*_map163.size);
+              for (int _i164 = 0; _i164 < _map163.size; ++_i164)
+              {
+                String _key165; // required
+                String _val166; // required
+                _key165 = iprot.readString();
+                _val166 = iprot.readString();
+                this.creds.put(_key165, _val166);
+              }
+              iprot.readMapEnd();
+            }
+          } else { 
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        default:
+          org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+      }
+      iprot.readFieldEnd();
+    }
+    iprot.readStructEnd();
+    validate();
+  }
+
+  public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+    validate();
+
+    oprot.writeStructBegin(STRUCT_DESC);
+    if (this.creds != null) {
+      oprot.writeFieldBegin(CREDS_FIELD_DESC);
+      {
+        oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, this.creds.size()));
+        for (Map.Entry<String, String> _iter167 : this.creds.entrySet())
+        {
+          oprot.writeString(_iter167.getKey());
+          oprot.writeString(_iter167.getValue());
+        }
+        oprot.writeMapEnd();
+      }
+      oprot.writeFieldEnd();
+    }
+    oprot.writeFieldStop();
+    oprot.writeStructEnd();
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("Credentials(");
+    boolean first = true;
+
+    sb.append("creds:");
+    if (this.creds == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.creds);
+    }
+    first = false;
+    sb.append(")");
+    return sb.toString();
+  }
+
+  public void validate() throws org.apache.thrift.TException {
+    // check for required fields
+    if (!is_set_creds()) {
+      throw new org.apache.thrift.protocol.TProtocolException("Required field 'creds' is unset! Struct:" + toString());
+    }
+
+  }
+
+  private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+    try {
+      write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+    try {
+      read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/generated/DistributedRPC.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/DistributedRPC.java b/storm-core/src/jvm/backtype/storm/generated/DistributedRPC.java
index 7922340..06c4f5c 100644
--- a/storm-core/src/jvm/backtype/storm/generated/DistributedRPC.java
+++ b/storm-core/src/jvm/backtype/storm/generated/DistributedRPC.java
@@ -42,7 +42,7 @@ public class DistributedRPC {
 
   public interface Iface {
 
-    public String execute(String functionName, String funcArgs) throws DRPCExecutionException, org.apache.thrift.TException;
+    public String execute(String functionName, String funcArgs) throws DRPCExecutionException, AuthorizationException, org.apache.thrift.TException;
 
   }
 
@@ -72,7 +72,7 @@ public class DistributedRPC {
       super(iprot, oprot);
     }
 
-    public String execute(String functionName, String funcArgs) throws DRPCExecutionException, org.apache.thrift.TException
+    public String execute(String functionName, String funcArgs) throws DRPCExecutionException, AuthorizationException, org.apache.thrift.TException
     {
       send_execute(functionName, funcArgs);
       return recv_execute();
@@ -86,7 +86,7 @@ public class DistributedRPC {
       sendBase("execute", args);
     }
 
-    public String recv_execute() throws DRPCExecutionException, org.apache.thrift.TException
+    public String recv_execute() throws DRPCExecutionException, AuthorizationException, org.apache.thrift.TException
     {
       execute_result result = new execute_result();
       receiveBase(result, "execute");
@@ -96,6 +96,9 @@ public class DistributedRPC {
       if (result.e != null) {
         throw result.e;
       }
+      if (result.aze != null) {
+        throw result.aze;
+      }
       throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "execute failed: unknown result");
     }
 
@@ -142,7 +145,7 @@ public class DistributedRPC {
         prot.writeMessageEnd();
       }
 
-      public String getResult() throws DRPCExecutionException, org.apache.thrift.TException {
+      public String getResult() throws DRPCExecutionException, AuthorizationException, org.apache.thrift.TException {
         if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
           throw new IllegalStateException("Method call not finished!");
         }
@@ -184,6 +187,8 @@ public class DistributedRPC {
           result.success = iface.execute(args.functionName, args.funcArgs);
         } catch (DRPCExecutionException e) {
           result.e = e;
+        } catch (AuthorizationException aze) {
+          result.aze = aze;
         }
         return result;
       }
@@ -590,14 +595,17 @@ public class DistributedRPC {
 
     private static final org.apache.thrift.protocol.TField SUCCESS_FIELD_DESC = new org.apache.thrift.protocol.TField("success", org.apache.thrift.protocol.TType.STRING, (short)0);
     private static final org.apache.thrift.protocol.TField E_FIELD_DESC = new org.apache.thrift.protocol.TField("e", org.apache.thrift.protocol.TType.STRUCT, (short)1);
+    private static final org.apache.thrift.protocol.TField AZE_FIELD_DESC = new org.apache.thrift.protocol.TField("aze", org.apache.thrift.protocol.TType.STRUCT, (short)2);
 
     private String success; // required
     private DRPCExecutionException e; // required
+    private AuthorizationException aze; // required
 
     /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
     public enum _Fields implements org.apache.thrift.TFieldIdEnum {
       SUCCESS((short)0, "success"),
-      E((short)1, "e");
+      E((short)1, "e"),
+      AZE((short)2, "aze");
 
       private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
 
@@ -616,6 +624,8 @@ public class DistributedRPC {
             return SUCCESS;
           case 1: // E
             return E;
+          case 2: // AZE
+            return AZE;
           default:
             return null;
         }
@@ -664,6 +674,8 @@ public class DistributedRPC {
           new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
       tmpMap.put(_Fields.E, new org.apache.thrift.meta_data.FieldMetaData("e", org.apache.thrift.TFieldRequirementType.DEFAULT, 
           new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRUCT)));
+      tmpMap.put(_Fields.AZE, new org.apache.thrift.meta_data.FieldMetaData("aze", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRUCT)));
       metaDataMap = Collections.unmodifiableMap(tmpMap);
       org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(execute_result.class, metaDataMap);
     }
@@ -673,11 +685,13 @@ public class DistributedRPC {
 
     public execute_result(
       String success,
-      DRPCExecutionException e)
+      DRPCExecutionException e,
+      AuthorizationException aze)
     {
       this();
       this.success = success;
       this.e = e;
+      this.aze = aze;
     }
 
     /**
@@ -690,6 +704,9 @@ public class DistributedRPC {
       if (other.is_set_e()) {
         this.e = new DRPCExecutionException(other.e);
       }
+      if (other.is_set_aze()) {
+        this.aze = new AuthorizationException(other.aze);
+      }
     }
 
     public execute_result deepCopy() {
@@ -700,6 +717,7 @@ public class DistributedRPC {
     public void clear() {
       this.success = null;
       this.e = null;
+      this.aze = null;
     }
 
     public String get_success() {
@@ -748,6 +766,29 @@ public class DistributedRPC {
       }
     }
 
+    public AuthorizationException get_aze() {
+      return this.aze;
+    }
+
+    public void set_aze(AuthorizationException aze) {
+      this.aze = aze;
+    }
+
+    public void unset_aze() {
+      this.aze = null;
+    }
+
+    /** Returns true if field aze is set (has been assigned a value) and false otherwise */
+    public boolean is_set_aze() {
+      return this.aze != null;
+    }
+
+    public void set_aze_isSet(boolean value) {
+      if (!value) {
+        this.aze = null;
+      }
+    }
+
     public void setFieldValue(_Fields field, Object value) {
       switch (field) {
       case SUCCESS:
@@ -766,6 +807,14 @@ public class DistributedRPC {
         }
         break;
 
+      case AZE:
+        if (value == null) {
+          unset_aze();
+        } else {
+          set_aze((AuthorizationException)value);
+        }
+        break;
+
       }
     }
 
@@ -777,6 +826,9 @@ public class DistributedRPC {
       case E:
         return get_e();
 
+      case AZE:
+        return get_aze();
+
       }
       throw new IllegalStateException();
     }
@@ -792,6 +844,8 @@ public class DistributedRPC {
         return is_set_success();
       case E:
         return is_set_e();
+      case AZE:
+        return is_set_aze();
       }
       throw new IllegalStateException();
     }
@@ -827,6 +881,15 @@ public class DistributedRPC {
           return false;
       }
 
+      boolean this_present_aze = true && this.is_set_aze();
+      boolean that_present_aze = true && that.is_set_aze();
+      if (this_present_aze || that_present_aze) {
+        if (!(this_present_aze && that_present_aze))
+          return false;
+        if (!this.aze.equals(that.aze))
+          return false;
+      }
+
       return true;
     }
 
@@ -844,6 +907,11 @@ public class DistributedRPC {
       if (present_e)
         builder.append(e);
 
+      boolean present_aze = true && (is_set_aze());
+      builder.append(present_aze);
+      if (present_aze)
+        builder.append(aze);
+
       return builder.toHashCode();
     }
 
@@ -875,6 +943,16 @@ public class DistributedRPC {
           return lastComparison;
         }
       }
+      lastComparison = Boolean.valueOf(is_set_aze()).compareTo(typedOther.is_set_aze());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (is_set_aze()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.aze, typedOther.aze);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
       return 0;
     }
 
@@ -907,6 +985,14 @@ public class DistributedRPC {
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
             }
             break;
+          case 2: // AZE
+            if (field.type == org.apache.thrift.protocol.TType.STRUCT) {
+              this.aze = new AuthorizationException();
+              this.aze.read(iprot);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+            }
+            break;
           default:
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
         }
@@ -927,6 +1013,10 @@ public class DistributedRPC {
         oprot.writeFieldBegin(E_FIELD_DESC);
         this.e.write(oprot);
         oprot.writeFieldEnd();
+      } else if (this.is_set_aze()) {
+        oprot.writeFieldBegin(AZE_FIELD_DESC);
+        this.aze.write(oprot);
+        oprot.writeFieldEnd();
       }
       oprot.writeFieldStop();
       oprot.writeStructEnd();
@@ -952,6 +1042,14 @@ public class DistributedRPC {
         sb.append(this.e);
       }
       first = false;
+      if (!first) sb.append(", ");
+      sb.append("aze:");
+      if (this.aze == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.aze);
+      }
+      first = false;
       sb.append(")");
       return sb.toString();
     }