You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/12/20 08:56:34 UTC

[1/8] storm git commit: STORM-2190: reduce contention between submission and scheduling

Repository: storm
Updated Branches:
  refs/heads/1.x-branch b515356c1 -> 4154490f7


STORM-2190: reduce contention between submission and scheduling


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cea8beaa
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cea8beaa
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cea8beaa

Branch: refs/heads/1.x-branch
Commit: cea8beaaa50b432277db4302d3c7ac5c1970bdc2
Parents: d06c8d5
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Nov 7 17:08:07 2016 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Dec 5 06:06:14 2016 -0600

----------------------------------------------------------------------
 .../apache/storm/starter/WordCountTopology.java |  4 +-
 .../org/apache/storm/command/kill_topology.clj  |  8 ++--
 .../src/clj/org/apache/storm/daemon/nimbus.clj  | 49 ++++++++++----------
 3 files changed, 31 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/cea8beaa/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
index e4a5711..b47981d 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
@@ -90,7 +90,9 @@ public class WordCountTopology {
     if (args != null && args.length > 0) {
       conf.setNumWorkers(3);
 
-      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
+      for (String name: args) {
+        StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
+      }
     }
     else {
       conf.setMaxTaskParallelism(3);

http://git-wip-us.apache.org/repos/asf/storm/blob/cea8beaa/storm-core/src/clj/org/apache/storm/command/kill_topology.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/kill_topology.clj b/storm-core/src/clj/org/apache/storm/command/kill_topology.clj
index 84e0a64..9fbdc6c 100644
--- a/storm-core/src/clj/org/apache/storm/command/kill_topology.clj
+++ b/storm-core/src/clj/org/apache/storm/command/kill_topology.clj
@@ -20,10 +20,10 @@
   (:gen-class))
 
 (defn -main [& args]
-  (let [[{wait :wait} [name] _] (cli args ["-w" "--wait" :default nil :parse-fn #(Integer/parseInt %)])
+  (let [[{wait :wait} names] (cli args ["-w" "--wait" :default nil :parse-fn #(Integer/parseInt %)])
         opts (KillOptions.)]
     (if wait (.set_wait_secs opts wait))
     (with-configured-nimbus-connection nimbus
-      (.killTopologyWithOpts nimbus name opts)
-      (log-message "Killed topology: " name)
-      )))
+      (doseq [name names]
+        (.killTopologyWithOpts nimbus name opts)
+        (log-message "Killed topology: " name)))))

http://git-wip-us.apache.org/repos/asf/storm/blob/cea8beaa/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 46ca121..3682ecf 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -193,6 +193,7 @@
                                                                        NIMBUS-ZK-ACLS)
                                                           :context (ClusterStateContext. DaemonType/NIMBUS))
      :submit-lock (Object.)
+     :sched-lock (Object.)
      :cred-update-lock (Object.)
      :log-update-lock (Object.)
      :heartbeats-cache (atom {})
@@ -963,9 +964,8 @@
         storm-cluster-state (:storm-cluster-state nimbus)
         ^INimbus inimbus (:inimbus nimbus)
         ;; read all the topologies
-        topology-ids (.active-storms storm-cluster-state)
-        topologies (into {} (for [tid topology-ids]
-                              {tid (read-topology-details nimbus tid)}))
+        topologies (locking (:submit-lock nimbus) (into {} (for [tid (.active-storms storm-cluster-state)]
+                              {tid (read-topology-details nimbus tid)})))
         topologies (Topologies. topologies)
         ;; read all the assignments
         assigned-topology-ids (.assignments storm-cluster-state nil)
@@ -976,11 +976,11 @@
                                         (when (or (nil? scratch-topology-id) (not= tid scratch-topology-id))
                                           {tid (.assignment-info storm-cluster-state tid nil)})))
         ;; make the new assignments for topologies
-        new-scheduler-assignments (compute-new-scheduler-assignments
+        new-scheduler-assignments (locking (:sched-lock nimbus) (compute-new-scheduler-assignments
                                        nimbus
                                        existing-assignments
                                        topologies
-                                       scratch-topology-id)
+                                       scratch-topology-id))
         topology->executor->node+port (compute-new-topology->executor->node+port new-scheduler-assignments existing-assignments)
 
         topology->executor->node+port (merge (into {} (for [id assigned-topology-ids] {id nil})) topology->executor->node+port)
@@ -1020,22 +1020,23 @@
       (reset! (:id->worker-resources nimbus) {}))
     ;; tasks figure out what tasks to talk to by looking at topology at runtime
     ;; only log/set when there's been a change to the assignment
-    (doseq [[topology-id assignment] new-assignments
-            :let [existing-assignment (get existing-assignments topology-id)
-                  topology-details (.getById topologies topology-id)]]
-      (if (= existing-assignment assignment)
-        (log-debug "Assignment for " topology-id " hasn't changed")
-        (do
-          (log-message "Setting new assignment for topology id " topology-id ": " (pr-str assignment))
-          (.set-assignment! storm-cluster-state topology-id assignment)
-          )))
-    (->> new-assignments
-          (map (fn [[topology-id assignment]]
-            (let [existing-assignment (get existing-assignments topology-id)]
-              [topology-id (map to-worker-slot (newly-added-slots existing-assignment assignment))]
-              )))
-          (into {})
-          (.assignSlots inimbus topologies)))
+    (locking (:sched-lock nimbus)
+      (doseq [[topology-id assignment] new-assignments
+              :let [existing-assignment (get existing-assignments topology-id)
+                    topology-details (.getById topologies topology-id)]]
+        (if (= existing-assignment assignment)
+          (log-debug "Assignment for " topology-id " hasn't changed")
+          (do
+            (log-message "Setting new assignment for topology id " topology-id ": " (pr-str assignment))
+            (.set-assignment! storm-cluster-state topology-id assignment)
+            )))
+      (->> new-assignments
+            (map (fn [[topology-id assignment]]
+              (let [existing-assignment (get existing-assignments topology-id)]
+                [topology-id (map to-worker-slot (newly-added-slots existing-assignment assignment))]
+                )))
+            (into {})
+            (.assignSlots inimbus topologies))))
     (log-message "not a leader, skipping assignments")))
 
 (defn notify-topology-action-listener [nimbus storm-id action]
@@ -1726,8 +1727,7 @@
                          )]
           (transition-name! nimbus storm-name [:kill wait-amt] true)
           (notify-topology-action-listener nimbus storm-name operation))
-        (add-topology-to-history-log (get-storm-id (:storm-cluster-state nimbus) storm-name)
-                                     nimbus topology-conf)))
+        (add-topology-to-history-log storm-id nimbus topology-conf)))
 
     (^void rebalance [this ^String storm-name ^RebalanceOptions options]
       (mark! nimbus:num-rebalance-calls)
@@ -2402,8 +2402,7 @@
                         (conf NIMBUS-MONITOR-FREQ-SECS)
                         (fn []
                           (when-not (conf NIMBUS-DO-NOT-REASSIGN)
-                            (locking (:submit-lock nimbus)
-                              (mk-assignments nimbus)))
+                            (mk-assignments nimbus))
                           (do-cleanup nimbus)))
     ;; Schedule Nimbus inbox cleaner
     (schedule-recurring (:timer nimbus)


[2/8] storm git commit: Fixed Race

Posted by ka...@apache.org.
Fixed Race

Conflicts:
	storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/93c1ea37
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/93c1ea37
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/93c1ea37

Branch: refs/heads/1.x-branch
Commit: 93c1ea37a0f38c147727d40e79de900f49e2e605
Parents: cea8bea
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Dec 2 08:46:38 2016 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Dec 5 06:26:12 2016 -0600

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/nimbus.clj  | 114 +++++++++----------
 1 file changed, 57 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/93c1ea37/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 3682ecf..e9d35a8 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -974,69 +974,69 @@
                                         ;; we exclude its assignment, meaning that all the slots occupied by its assignment
                                         ;; will be treated as free slot in the scheduler code.
                                         (when (or (nil? scratch-topology-id) (not= tid scratch-topology-id))
-                                          {tid (.assignment-info storm-cluster-state tid nil)})))
-        ;; make the new assignments for topologies
-        new-scheduler-assignments (locking (:sched-lock nimbus) (compute-new-scheduler-assignments
-                                       nimbus
-                                       existing-assignments
-                                       topologies
-                                       scratch-topology-id))
-        topology->executor->node+port (compute-new-topology->executor->node+port new-scheduler-assignments existing-assignments)
-
-        topology->executor->node+port (merge (into {} (for [id assigned-topology-ids] {id nil})) topology->executor->node+port)
-        new-assigned-worker->resources (convert-assignments-to-worker->resources new-scheduler-assignments)
-        now-secs (current-time-secs)
-
-        basic-supervisor-details-map (basic-supervisor-details-map storm-cluster-state)
-
-        ;; construct the final Assignments by adding start-times etc into it
-        new-assignments (into {} (for [[topology-id executor->node+port] topology->executor->node+port
-                                        :let [existing-assignment (get existing-assignments topology-id)
-                                              all-nodes (->> executor->node+port vals (map first) set)
-                                              node->host (->> all-nodes
-                                                              (mapcat (fn [node]
-                                                                        (if-let [host (.getHostName inimbus basic-supervisor-details-map node)]
-                                                                          [[node host]]
+                                          {tid (.assignment-info storm-cluster-state tid nil)})))]
+      ;; make the new assignments for topologies
+      (locking (:sched-lock nimbus) (let [
+          new-scheduler-assignments (compute-new-scheduler-assignments
+                                         nimbus
+                                         existing-assignments
+                                         topologies
+                                         scratch-topology-id)
+          topology->executor->node+port (compute-new-topology->executor->node+port new-scheduler-assignments existing-assignments)
+
+          topology->executor->node+port (merge (into {} (for [id assigned-topology-ids] {id nil})) topology->executor->node+port)
+          new-assigned-worker->resources (convert-assignments-to-worker->resources new-scheduler-assignments)
+          now-secs (current-time-secs)
+
+          basic-supervisor-details-map (basic-supervisor-details-map storm-cluster-state)
+
+          ;; construct the final Assignments by adding start-times etc into it
+          new-assignments (into {} (for [[topology-id executor->node+port] topology->executor->node+port
+                                          :let [existing-assignment (get existing-assignments topology-id)
+                                                all-nodes (->> executor->node+port vals (map first) set)
+                                                node->host (->> all-nodes
+                                                                (mapcat (fn [node]
+                                                                          (if-let [host (.getHostName inimbus basic-supervisor-details-map node)]
+                                                                            [[node host]]
+                                                                            )))
+                                                                (into {}))
+                                                all-node->host (merge (:node->host existing-assignment) node->host)
+                                                reassign-executors (changed-executors (:executor->node+port existing-assignment) executor->node+port)
+                                                start-times (merge (:executor->start-time-secs existing-assignment)
+                                                                  (into {}
+                                                                        (for [id reassign-executors]
+                                                                          [id now-secs]
                                                                           )))
-                                                              (into {}))
-                                              all-node->host (merge (:node->host existing-assignment) node->host)
-                                              reassign-executors (changed-executors (:executor->node+port existing-assignment) executor->node+port)
-                                              start-times (merge (:executor->start-time-secs existing-assignment)
-                                                                (into {}
-                                                                      (for [id reassign-executors]
-                                                                        [id now-secs]
-                                                                        )))
-                                              worker->resources (get new-assigned-worker->resources topology-id)]]
-                                   {topology-id (Assignment.
-                                                 (conf STORM-LOCAL-DIR)
-                                                 (select-keys all-node->host all-nodes)
-                                                 executor->node+port
-                                                 start-times
-                                                 worker->resources)}))]
-
-    (when (not= new-assignments existing-assignments)
-      (log-debug "RESETTING id->resources and id->worker-resources cache!")
-      (reset! (:id->resources nimbus) {})
-      (reset! (:id->worker-resources nimbus) {}))
-    ;; tasks figure out what tasks to talk to by looking at topology at runtime
-    ;; only log/set when there's been a change to the assignment
-    (locking (:sched-lock nimbus)
-      (doseq [[topology-id assignment] new-assignments
-              :let [existing-assignment (get existing-assignments topology-id)
-                    topology-details (.getById topologies topology-id)]]
-        (if (= existing-assignment assignment)
-          (log-debug "Assignment for " topology-id " hasn't changed")
-          (do
-            (log-message "Setting new assignment for topology id " topology-id ": " (pr-str assignment))
-            (.set-assignment! storm-cluster-state topology-id assignment)
-            )))
-      (->> new-assignments
+                                                worker->resources (get new-assigned-worker->resources topology-id)]]
+                                     {topology-id (Assignment.
+                                                   (conf STORM-LOCAL-DIR)
+                                                   (select-keys all-node->host all-nodes)
+                                                   executor->node+port
+                                                   start-times
+                                                   worker->resources)}))]
+
+        (when (not= new-assignments existing-assignments)
+          (log-debug "RESETTING id->resources and id->worker-resources cache!")
+          (reset! (:id->resources nimbus) {})
+          (reset! (:id->worker-resources nimbus) {}))
+        ;; tasks figure out what tasks to talk to by looking at topology at runtime
+        ;; only log/set when there's been a change to the assignment
+        (doseq [[topology-id assignment] new-assignments
+                :let [existing-assignment (get existing-assignments topology-id)
+                      topology-details (.getById topologies topology-id)]]
+          (if (= existing-assignment assignment)
+            (log-debug "Assignment for " topology-id " hasn't changed")
+            (do
+              (log-message "Setting new assignment for topology id " topology-id ": " (pr-str assignment))
+              (.set-assignment! storm-cluster-state topology-id assignment)
+              )))
+        (->> new-assignments
             (map (fn [[topology-id assignment]]
               (let [existing-assignment (get existing-assignments topology-id)]
                 [topology-id (map to-worker-slot (newly-added-slots existing-assignment assignment))]
                 )))
             (into {})
-            (.assignSlots inimbus topologies))))
+            (.assignSlots inimbus topologies)))))
     (log-message "not a leader, skipping assignments")))
 
 (defn notify-topology-action-listener [nimbus storm-id action]


[4/8] storm git commit: Added in some optimizations for better topology submission performance

Posted by ka...@apache.org.
http://git-wip-us.apache.org/repos/asf/storm/blob/897a6d2b/storm-core/src/jvm/org/apache/storm/generated/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/generated/Nimbus.java b/storm-core/src/jvm/org/apache/storm/generated/Nimbus.java
index ba9e66a..22a7205 100644
--- a/storm-core/src/jvm/org/apache/storm/generated/Nimbus.java
+++ b/storm-core/src/jvm/org/apache/storm/generated/Nimbus.java
@@ -136,6 +136,10 @@ public class Nimbus {
 
     public ClusterSummary getClusterInfo() throws AuthorizationException, org.apache.thrift.TException;
 
+    public NimbusSummary getLeader() throws AuthorizationException, org.apache.thrift.TException;
+
+    public boolean isTopologyNameAllowed(String name) throws AuthorizationException, org.apache.thrift.TException;
+
     public TopologyInfo getTopologyInfo(String id) throws NotAliveException, AuthorizationException, org.apache.thrift.TException;
 
     public TopologyInfo getTopologyInfoWithOpts(String id, GetInfoOptions options) throws NotAliveException, AuthorizationException, org.apache.thrift.TException;
@@ -236,6 +240,10 @@ public class Nimbus {
 
     public void getClusterInfo(org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
 
+    public void getLeader(org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
+
+    public void isTopologyNameAllowed(String name, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
+
     public void getTopologyInfo(String id, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
 
     public void getTopologyInfoWithOpts(String id, GetInfoOptions options, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
@@ -1167,6 +1175,57 @@ public class Nimbus {
       throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "getClusterInfo failed: unknown result");
     }
 
+    public NimbusSummary getLeader() throws AuthorizationException, org.apache.thrift.TException
+    {
+      send_getLeader();
+      return recv_getLeader();
+    }
+
+    public void send_getLeader() throws org.apache.thrift.TException
+    {
+      getLeader_args args = new getLeader_args();
+      sendBase("getLeader", args);
+    }
+
+    public NimbusSummary recv_getLeader() throws AuthorizationException, org.apache.thrift.TException
+    {
+      getLeader_result result = new getLeader_result();
+      receiveBase(result, "getLeader");
+      if (result.is_set_success()) {
+        return result.success;
+      }
+      if (result.aze != null) {
+        throw result.aze;
+      }
+      throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "getLeader failed: unknown result");
+    }
+
+    public boolean isTopologyNameAllowed(String name) throws AuthorizationException, org.apache.thrift.TException
+    {
+      send_isTopologyNameAllowed(name);
+      return recv_isTopologyNameAllowed();
+    }
+
+    public void send_isTopologyNameAllowed(String name) throws org.apache.thrift.TException
+    {
+      isTopologyNameAllowed_args args = new isTopologyNameAllowed_args();
+      args.set_name(name);
+      sendBase("isTopologyNameAllowed", args);
+    }
+
+    public boolean recv_isTopologyNameAllowed() throws AuthorizationException, org.apache.thrift.TException
+    {
+      isTopologyNameAllowed_result result = new isTopologyNameAllowed_result();
+      receiveBase(result, "isTopologyNameAllowed");
+      if (result.is_set_success()) {
+        return result.success;
+      }
+      if (result.aze != null) {
+        throw result.aze;
+      }
+      throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "isTopologyNameAllowed failed: unknown result");
+    }
+
     public TopologyInfo getTopologyInfo(String id) throws NotAliveException, AuthorizationException, org.apache.thrift.TException
     {
       send_getTopologyInfo(id);
@@ -2596,6 +2655,67 @@ public class Nimbus {
       }
     }
 
+    public void getLeader(org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
+      checkReady();
+      getLeader_call method_call = new getLeader_call(resultHandler, this, ___protocolFactory, ___transport);
+      this.___currentMethod = method_call;
+      ___manager.call(method_call);
+    }
+
+    public static class getLeader_call extends org.apache.thrift.async.TAsyncMethodCall {
+      public getLeader_call(org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+        super(client, protocolFactory, transport, resultHandler, false);
+      }
+
+      public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
+        prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("getLeader", org.apache.thrift.protocol.TMessageType.CALL, 0));
+        getLeader_args args = new getLeader_args();
+        args.write(prot);
+        prot.writeMessageEnd();
+      }
+
+      public NimbusSummary getResult() throws AuthorizationException, org.apache.thrift.TException {
+        if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
+          throw new IllegalStateException("Method call not finished!");
+        }
+        org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
+        org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
+        return (new Client(prot)).recv_getLeader();
+      }
+    }
+
+    public void isTopologyNameAllowed(String name, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
+      checkReady();
+      isTopologyNameAllowed_call method_call = new isTopologyNameAllowed_call(name, resultHandler, this, ___protocolFactory, ___transport);
+      this.___currentMethod = method_call;
+      ___manager.call(method_call);
+    }
+
+    public static class isTopologyNameAllowed_call extends org.apache.thrift.async.TAsyncMethodCall {
+      private String name;
+      public isTopologyNameAllowed_call(String name, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+        super(client, protocolFactory, transport, resultHandler, false);
+        this.name = name;
+      }
+
+      public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
+        prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("isTopologyNameAllowed", org.apache.thrift.protocol.TMessageType.CALL, 0));
+        isTopologyNameAllowed_args args = new isTopologyNameAllowed_args();
+        args.set_name(name);
+        args.write(prot);
+        prot.writeMessageEnd();
+      }
+
+      public boolean getResult() throws AuthorizationException, org.apache.thrift.TException {
+        if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
+          throw new IllegalStateException("Method call not finished!");
+        }
+        org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
+        org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
+        return (new Client(prot)).recv_isTopologyNameAllowed();
+      }
+    }
+
     public void getTopologyInfo(String id, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
       checkReady();
       getTopologyInfo_call method_call = new getTopologyInfo_call(id, resultHandler, this, ___protocolFactory, ___transport);
@@ -2955,6 +3075,8 @@ public class Nimbus {
       processMap.put("downloadChunk", new downloadChunk());
       processMap.put("getNimbusConf", new getNimbusConf());
       processMap.put("getClusterInfo", new getClusterInfo());
+      processMap.put("getLeader", new getLeader());
+      processMap.put("isTopologyNameAllowed", new isTopologyNameAllowed());
       processMap.put("getTopologyInfo", new getTopologyInfo());
       processMap.put("getTopologyInfoWithOpts", new getTopologyInfoWithOpts());
       processMap.put("getTopologyPageInfo", new getTopologyPageInfo());
@@ -3803,6 +3925,55 @@ public class Nimbus {
       }
     }
 
+    public static class getLeader<I extends Iface> extends org.apache.thrift.ProcessFunction<I, getLeader_args> {
+      public getLeader() {
+        super("getLeader");
+      }
+
+      public getLeader_args getEmptyArgsInstance() {
+        return new getLeader_args();
+      }
+
+      protected boolean isOneway() {
+        return false;
+      }
+
+      public getLeader_result getResult(I iface, getLeader_args args) throws org.apache.thrift.TException {
+        getLeader_result result = new getLeader_result();
+        try {
+          result.success = iface.getLeader();
+        } catch (AuthorizationException aze) {
+          result.aze = aze;
+        }
+        return result;
+      }
+    }
+
+    public static class isTopologyNameAllowed<I extends Iface> extends org.apache.thrift.ProcessFunction<I, isTopologyNameAllowed_args> {
+      public isTopologyNameAllowed() {
+        super("isTopologyNameAllowed");
+      }
+
+      public isTopologyNameAllowed_args getEmptyArgsInstance() {
+        return new isTopologyNameAllowed_args();
+      }
+
+      protected boolean isOneway() {
+        return false;
+      }
+
+      public isTopologyNameAllowed_result getResult(I iface, isTopologyNameAllowed_args args) throws org.apache.thrift.TException {
+        isTopologyNameAllowed_result result = new isTopologyNameAllowed_result();
+        try {
+          result.success = iface.isTopologyNameAllowed(args.name);
+          result.set_success_isSet(true);
+        } catch (AuthorizationException aze) {
+          result.aze = aze;
+        }
+        return result;
+      }
+    }
+
     public static class getTopologyInfo<I extends Iface> extends org.apache.thrift.ProcessFunction<I, getTopologyInfo_args> {
       public getTopologyInfo() {
         super("getTopologyInfo");
@@ -4082,6 +4253,8 @@ public class Nimbus {
       processMap.put("downloadChunk", new downloadChunk());
       processMap.put("getNimbusConf", new getNimbusConf());
       processMap.put("getClusterInfo", new getClusterInfo());
+      processMap.put("getLeader", new getLeader());
+      processMap.put("isTopologyNameAllowed", new isTopologyNameAllowed());
       processMap.put("getTopologyInfo", new getTopologyInfo());
       processMap.put("getTopologyInfoWithOpts", new getTopologyInfoWithOpts());
       processMap.put("getTopologyPageInfo", new getTopologyPageInfo());
@@ -6084,20 +6257,20 @@ public class Nimbus {
       }
     }
 
-    public static class getTopologyInfo<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, getTopologyInfo_args, TopologyInfo> {
-      public getTopologyInfo() {
-        super("getTopologyInfo");
+    public static class getLeader<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, getLeader_args, NimbusSummary> {
+      public getLeader() {
+        super("getLeader");
       }
 
-      public getTopologyInfo_args getEmptyArgsInstance() {
-        return new getTopologyInfo_args();
+      public getLeader_args getEmptyArgsInstance() {
+        return new getLeader_args();
       }
 
-      public AsyncMethodCallback<TopologyInfo> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
+      public AsyncMethodCallback<NimbusSummary> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
         final org.apache.thrift.AsyncProcessFunction fcall = this;
-        return new AsyncMethodCallback<TopologyInfo>() { 
-          public void onComplete(TopologyInfo o) {
-            getTopologyInfo_result result = new getTopologyInfo_result();
+        return new AsyncMethodCallback<NimbusSummary>() { 
+          public void onComplete(NimbusSummary o) {
+            getLeader_result result = new getLeader_result();
             result.success = o;
             try {
               fcall.sendResponse(fb,result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);
@@ -6110,13 +6283,66 @@ public class Nimbus {
           public void onError(Exception e) {
             byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
             org.apache.thrift.TBase msg;
-            getTopologyInfo_result result = new getTopologyInfo_result();
-            if (e instanceof NotAliveException) {
-                        result.e = (NotAliveException) e;
-                        result.set_e_isSet(true);
+            getLeader_result result = new getLeader_result();
+            if (e instanceof AuthorizationException) {
+                        result.aze = (AuthorizationException) e;
+                        result.set_aze_isSet(true);
                         msg = result;
             }
-            else             if (e instanceof AuthorizationException) {
+             else 
+            {
+              msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
+              msg = (org.apache.thrift.TBase)new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage());
+            }
+            try {
+              fcall.sendResponse(fb,msg,msgType,seqid);
+              return;
+            } catch (Exception ex) {
+              LOGGER.error("Exception writing to internal frame buffer", ex);
+            }
+            fb.close();
+          }
+        };
+      }
+
+      protected boolean isOneway() {
+        return false;
+      }
+
+      public void start(I iface, getLeader_args args, org.apache.thrift.async.AsyncMethodCallback<NimbusSummary> resultHandler) throws TException {
+        iface.getLeader(resultHandler);
+      }
+    }
+
+    public static class isTopologyNameAllowed<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, isTopologyNameAllowed_args, Boolean> {
+      public isTopologyNameAllowed() {
+        super("isTopologyNameAllowed");
+      }
+
+      public isTopologyNameAllowed_args getEmptyArgsInstance() {
+        return new isTopologyNameAllowed_args();
+      }
+
+      public AsyncMethodCallback<Boolean> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
+        final org.apache.thrift.AsyncProcessFunction fcall = this;
+        return new AsyncMethodCallback<Boolean>() { 
+          public void onComplete(Boolean o) {
+            isTopologyNameAllowed_result result = new isTopologyNameAllowed_result();
+            result.success = o;
+            result.set_success_isSet(true);
+            try {
+              fcall.sendResponse(fb,result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);
+              return;
+            } catch (Exception e) {
+              LOGGER.error("Exception writing to internal frame buffer", e);
+            }
+            fb.close();
+          }
+          public void onError(Exception e) {
+            byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
+            org.apache.thrift.TBase msg;
+            isTopologyNameAllowed_result result = new isTopologyNameAllowed_result();
+            if (e instanceof AuthorizationException) {
                         result.aze = (AuthorizationException) e;
                         result.set_aze_isSet(true);
                         msg = result;
@@ -6141,25 +6367,25 @@ public class Nimbus {
         return false;
       }
 
-      public void start(I iface, getTopologyInfo_args args, org.apache.thrift.async.AsyncMethodCallback<TopologyInfo> resultHandler) throws TException {
-        iface.getTopologyInfo(args.id,resultHandler);
+      public void start(I iface, isTopologyNameAllowed_args args, org.apache.thrift.async.AsyncMethodCallback<Boolean> resultHandler) throws TException {
+        iface.isTopologyNameAllowed(args.name,resultHandler);
       }
     }
 
-    public static class getTopologyInfoWithOpts<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, getTopologyInfoWithOpts_args, TopologyInfo> {
-      public getTopologyInfoWithOpts() {
-        super("getTopologyInfoWithOpts");
+    public static class getTopologyInfo<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, getTopologyInfo_args, TopologyInfo> {
+      public getTopologyInfo() {
+        super("getTopologyInfo");
       }
 
-      public getTopologyInfoWithOpts_args getEmptyArgsInstance() {
-        return new getTopologyInfoWithOpts_args();
+      public getTopologyInfo_args getEmptyArgsInstance() {
+        return new getTopologyInfo_args();
       }
 
       public AsyncMethodCallback<TopologyInfo> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
         final org.apache.thrift.AsyncProcessFunction fcall = this;
         return new AsyncMethodCallback<TopologyInfo>() { 
           public void onComplete(TopologyInfo o) {
-            getTopologyInfoWithOpts_result result = new getTopologyInfoWithOpts_result();
+            getTopologyInfo_result result = new getTopologyInfo_result();
             result.success = o;
             try {
               fcall.sendResponse(fb,result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);
@@ -6172,7 +6398,7 @@ public class Nimbus {
           public void onError(Exception e) {
             byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
             org.apache.thrift.TBase msg;
-            getTopologyInfoWithOpts_result result = new getTopologyInfoWithOpts_result();
+            getTopologyInfo_result result = new getTopologyInfo_result();
             if (e instanceof NotAliveException) {
                         result.e = (NotAliveException) e;
                         result.set_e_isSet(true);
@@ -6203,25 +6429,25 @@ public class Nimbus {
         return false;
       }
 
-      public void start(I iface, getTopologyInfoWithOpts_args args, org.apache.thrift.async.AsyncMethodCallback<TopologyInfo> resultHandler) throws TException {
-        iface.getTopologyInfoWithOpts(args.id, args.options,resultHandler);
+      public void start(I iface, getTopologyInfo_args args, org.apache.thrift.async.AsyncMethodCallback<TopologyInfo> resultHandler) throws TException {
+        iface.getTopologyInfo(args.id,resultHandler);
       }
     }
 
-    public static class getTopologyPageInfo<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, getTopologyPageInfo_args, TopologyPageInfo> {
-      public getTopologyPageInfo() {
-        super("getTopologyPageInfo");
+    public static class getTopologyInfoWithOpts<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, getTopologyInfoWithOpts_args, TopologyInfo> {
+      public getTopologyInfoWithOpts() {
+        super("getTopologyInfoWithOpts");
       }
 
-      public getTopologyPageInfo_args getEmptyArgsInstance() {
-        return new getTopologyPageInfo_args();
+      public getTopologyInfoWithOpts_args getEmptyArgsInstance() {
+        return new getTopologyInfoWithOpts_args();
       }
 
-      public AsyncMethodCallback<TopologyPageInfo> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
+      public AsyncMethodCallback<TopologyInfo> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
         final org.apache.thrift.AsyncProcessFunction fcall = this;
-        return new AsyncMethodCallback<TopologyPageInfo>() { 
-          public void onComplete(TopologyPageInfo o) {
-            getTopologyPageInfo_result result = new getTopologyPageInfo_result();
+        return new AsyncMethodCallback<TopologyInfo>() { 
+          public void onComplete(TopologyInfo o) {
+            getTopologyInfoWithOpts_result result = new getTopologyInfoWithOpts_result();
             result.success = o;
             try {
               fcall.sendResponse(fb,result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);
@@ -6234,7 +6460,7 @@ public class Nimbus {
           public void onError(Exception e) {
             byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
             org.apache.thrift.TBase msg;
-            getTopologyPageInfo_result result = new getTopologyPageInfo_result();
+            getTopologyInfoWithOpts_result result = new getTopologyInfoWithOpts_result();
             if (e instanceof NotAliveException) {
                         result.e = (NotAliveException) e;
                         result.set_e_isSet(true);
@@ -6265,25 +6491,25 @@ public class Nimbus {
         return false;
       }
 
-      public void start(I iface, getTopologyPageInfo_args args, org.apache.thrift.async.AsyncMethodCallback<TopologyPageInfo> resultHandler) throws TException {
-        iface.getTopologyPageInfo(args.id, args.window, args.is_include_sys,resultHandler);
+      public void start(I iface, getTopologyInfoWithOpts_args args, org.apache.thrift.async.AsyncMethodCallback<TopologyInfo> resultHandler) throws TException {
+        iface.getTopologyInfoWithOpts(args.id, args.options,resultHandler);
       }
     }
 
-    public static class getSupervisorPageInfo<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, getSupervisorPageInfo_args, SupervisorPageInfo> {
-      public getSupervisorPageInfo() {
-        super("getSupervisorPageInfo");
+    public static class getTopologyPageInfo<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, getTopologyPageInfo_args, TopologyPageInfo> {
+      public getTopologyPageInfo() {
+        super("getTopologyPageInfo");
       }
 
-      public getSupervisorPageInfo_args getEmptyArgsInstance() {
-        return new getSupervisorPageInfo_args();
+      public getTopologyPageInfo_args getEmptyArgsInstance() {
+        return new getTopologyPageInfo_args();
       }
 
-      public AsyncMethodCallback<SupervisorPageInfo> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
+      public AsyncMethodCallback<TopologyPageInfo> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
         final org.apache.thrift.AsyncProcessFunction fcall = this;
-        return new AsyncMethodCallback<SupervisorPageInfo>() { 
-          public void onComplete(SupervisorPageInfo o) {
-            getSupervisorPageInfo_result result = new getSupervisorPageInfo_result();
+        return new AsyncMethodCallback<TopologyPageInfo>() { 
+          public void onComplete(TopologyPageInfo o) {
+            getTopologyPageInfo_result result = new getTopologyPageInfo_result();
             result.success = o;
             try {
               fcall.sendResponse(fb,result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);
@@ -6296,7 +6522,7 @@ public class Nimbus {
           public void onError(Exception e) {
             byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
             org.apache.thrift.TBase msg;
-            getSupervisorPageInfo_result result = new getSupervisorPageInfo_result();
+            getTopologyPageInfo_result result = new getTopologyPageInfo_result();
             if (e instanceof NotAliveException) {
                         result.e = (NotAliveException) e;
                         result.set_e_isSet(true);
@@ -6327,25 +6553,25 @@ public class Nimbus {
         return false;
       }
 
-      public void start(I iface, getSupervisorPageInfo_args args, org.apache.thrift.async.AsyncMethodCallback<SupervisorPageInfo> resultHandler) throws TException {
-        iface.getSupervisorPageInfo(args.id, args.host, args.is_include_sys,resultHandler);
+      public void start(I iface, getTopologyPageInfo_args args, org.apache.thrift.async.AsyncMethodCallback<TopologyPageInfo> resultHandler) throws TException {
+        iface.getTopologyPageInfo(args.id, args.window, args.is_include_sys,resultHandler);
       }
     }
 
-    public static class getComponentPageInfo<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, getComponentPageInfo_args, ComponentPageInfo> {
-      public getComponentPageInfo() {
-        super("getComponentPageInfo");
+    public static class getSupervisorPageInfo<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, getSupervisorPageInfo_args, SupervisorPageInfo> {
+      public getSupervisorPageInfo() {
+        super("getSupervisorPageInfo");
       }
 
-      public getComponentPageInfo_args getEmptyArgsInstance() {
-        return new getComponentPageInfo_args();
+      public getSupervisorPageInfo_args getEmptyArgsInstance() {
+        return new getSupervisorPageInfo_args();
       }
 
-      public AsyncMethodCallback<ComponentPageInfo> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
+      public AsyncMethodCallback<SupervisorPageInfo> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
         final org.apache.thrift.AsyncProcessFunction fcall = this;
-        return new AsyncMethodCallback<ComponentPageInfo>() { 
-          public void onComplete(ComponentPageInfo o) {
-            getComponentPageInfo_result result = new getComponentPageInfo_result();
+        return new AsyncMethodCallback<SupervisorPageInfo>() { 
+          public void onComplete(SupervisorPageInfo o) {
+            getSupervisorPageInfo_result result = new getSupervisorPageInfo_result();
             result.success = o;
             try {
               fcall.sendResponse(fb,result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);
@@ -6358,7 +6584,69 @@ public class Nimbus {
           public void onError(Exception e) {
             byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
             org.apache.thrift.TBase msg;
-            getComponentPageInfo_result result = new getComponentPageInfo_result();
+            getSupervisorPageInfo_result result = new getSupervisorPageInfo_result();
+            if (e instanceof NotAliveException) {
+                        result.e = (NotAliveException) e;
+                        result.set_e_isSet(true);
+                        msg = result;
+            }
+            else             if (e instanceof AuthorizationException) {
+                        result.aze = (AuthorizationException) e;
+                        result.set_aze_isSet(true);
+                        msg = result;
+            }
+             else 
+            {
+              msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
+              msg = (org.apache.thrift.TBase)new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage());
+            }
+            try {
+              fcall.sendResponse(fb,msg,msgType,seqid);
+              return;
+            } catch (Exception ex) {
+              LOGGER.error("Exception writing to internal frame buffer", ex);
+            }
+            fb.close();
+          }
+        };
+      }
+
+      protected boolean isOneway() {
+        return false;
+      }
+
+      public void start(I iface, getSupervisorPageInfo_args args, org.apache.thrift.async.AsyncMethodCallback<SupervisorPageInfo> resultHandler) throws TException {
+        iface.getSupervisorPageInfo(args.id, args.host, args.is_include_sys,resultHandler);
+      }
+    }
+
+    public static class getComponentPageInfo<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, getComponentPageInfo_args, ComponentPageInfo> {
+      public getComponentPageInfo() {
+        super("getComponentPageInfo");
+      }
+
+      public getComponentPageInfo_args getEmptyArgsInstance() {
+        return new getComponentPageInfo_args();
+      }
+
+      public AsyncMethodCallback<ComponentPageInfo> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
+        final org.apache.thrift.AsyncProcessFunction fcall = this;
+        return new AsyncMethodCallback<ComponentPageInfo>() { 
+          public void onComplete(ComponentPageInfo o) {
+            getComponentPageInfo_result result = new getComponentPageInfo_result();
+            result.success = o;
+            try {
+              fcall.sendResponse(fb,result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);
+              return;
+            } catch (Exception e) {
+              LOGGER.error("Exception writing to internal frame buffer", e);
+            }
+            fb.close();
+          }
+          public void onError(Exception e) {
+            byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
+            org.apache.thrift.TBase msg;
+            getComponentPageInfo_result result = new getComponentPageInfo_result();
             if (e instanceof NotAliveException) {
                         result.e = (NotAliveException) e;
                         result.set_e_isSet(true);
@@ -31990,22 +32278,1481 @@ public class Nimbus {
 
     @Override
     public String toString() {
-      StringBuilder sb = new StringBuilder("uploadChunk_args(");
+      StringBuilder sb = new StringBuilder("uploadChunk_args(");
+      boolean first = true;
+
+      sb.append("location:");
+      if (this.location == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.location);
+      }
+      first = false;
+      if (!first) sb.append(", ");
+      sb.append("chunk:");
+      if (this.chunk == null) {
+        sb.append("null");
+      } else {
+        org.apache.thrift.TBaseHelper.toString(this.chunk, sb);
+      }
+      first = false;
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws org.apache.thrift.TException {
+      // check for required fields
+      // check for sub-struct validity
+    }
+
+    private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+      try {
+        write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+      try {
+        read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private static class uploadChunk_argsStandardSchemeFactory implements SchemeFactory {
+      public uploadChunk_argsStandardScheme getScheme() {
+        return new uploadChunk_argsStandardScheme();
+      }
+    }
+
+    private static class uploadChunk_argsStandardScheme extends StandardScheme<uploadChunk_args> {
+
+      public void read(org.apache.thrift.protocol.TProtocol iprot, uploadChunk_args struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TField schemeField;
+        iprot.readStructBegin();
+        while (true)
+        {
+          schemeField = iprot.readFieldBegin();
+          if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+            break;
+          }
+          switch (schemeField.id) {
+            case 1: // LOCATION
+              if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+                struct.location = iprot.readString();
+                struct.set_location_isSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
+            case 2: // CHUNK
+              if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+                struct.chunk = iprot.readBinary();
+                struct.set_chunk_isSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
+            default:
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+          }
+          iprot.readFieldEnd();
+        }
+        iprot.readStructEnd();
+        struct.validate();
+      }
+
+      public void write(org.apache.thrift.protocol.TProtocol oprot, uploadChunk_args struct) throws org.apache.thrift.TException {
+        struct.validate();
+
+        oprot.writeStructBegin(STRUCT_DESC);
+        if (struct.location != null) {
+          oprot.writeFieldBegin(LOCATION_FIELD_DESC);
+          oprot.writeString(struct.location);
+          oprot.writeFieldEnd();
+        }
+        if (struct.chunk != null) {
+          oprot.writeFieldBegin(CHUNK_FIELD_DESC);
+          oprot.writeBinary(struct.chunk);
+          oprot.writeFieldEnd();
+        }
+        oprot.writeFieldStop();
+        oprot.writeStructEnd();
+      }
+
+    }
+
+    private static class uploadChunk_argsTupleSchemeFactory implements SchemeFactory {
+      public uploadChunk_argsTupleScheme getScheme() {
+        return new uploadChunk_argsTupleScheme();
+      }
+    }
+
+    private static class uploadChunk_argsTupleScheme extends TupleScheme<uploadChunk_args> {
+
+      @Override
+      public void write(org.apache.thrift.protocol.TProtocol prot, uploadChunk_args struct) throws org.apache.thrift.TException {
+        TTupleProtocol oprot = (TTupleProtocol) prot;
+        BitSet optionals = new BitSet();
+        if (struct.is_set_location()) {
+          optionals.set(0);
+        }
+        if (struct.is_set_chunk()) {
+          optionals.set(1);
+        }
+        oprot.writeBitSet(optionals, 2);
+        if (struct.is_set_location()) {
+          oprot.writeString(struct.location);
+        }
+        if (struct.is_set_chunk()) {
+          oprot.writeBinary(struct.chunk);
+        }
+      }
+
+      @Override
+      public void read(org.apache.thrift.protocol.TProtocol prot, uploadChunk_args struct) throws org.apache.thrift.TException {
+        TTupleProtocol iprot = (TTupleProtocol) prot;
+        BitSet incoming = iprot.readBitSet(2);
+        if (incoming.get(0)) {
+          struct.location = iprot.readString();
+          struct.set_location_isSet(true);
+        }
+        if (incoming.get(1)) {
+          struct.chunk = iprot.readBinary();
+          struct.set_chunk_isSet(true);
+        }
+      }
+    }
+
+  }
+
+  public static class uploadChunk_result implements org.apache.thrift.TBase<uploadChunk_result, uploadChunk_result._Fields>, java.io.Serializable, Cloneable, Comparable<uploadChunk_result>   {
+    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("uploadChunk_result");
+
+    private static final org.apache.thrift.protocol.TField AZE_FIELD_DESC = new org.apache.thrift.protocol.TField("aze", org.apache.thrift.protocol.TType.STRUCT, (short)1);
+
+    private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+    static {
+      schemes.put(StandardScheme.class, new uploadChunk_resultStandardSchemeFactory());
+      schemes.put(TupleScheme.class, new uploadChunk_resultTupleSchemeFactory());
+    }
+
+    private AuthorizationException aze; // required
+
+    /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+    public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+      AZE((short)1, "aze");
+
+      private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+      static {
+        for (_Fields field : EnumSet.allOf(_Fields.class)) {
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not found.
+       */
+      public static _Fields findByThriftId(int fieldId) {
+        switch(fieldId) {
+          case 1: // AZE
+            return AZE;
+          default:
+            return null;
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      public static _Fields findByName(String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final String _fieldName;
+
+      _Fields(short thriftId, String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public String getFieldName() {
+        return _fieldName;
+      }
+    }
+
+    // isset id assignments
+    public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+    static {
+      Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+      tmpMap.put(_Fields.AZE, new org.apache.thrift.meta_data.FieldMetaData("aze", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRUCT)));
+      metaDataMap = Collections.unmodifiableMap(tmpMap);
+      org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(uploadChunk_result.class, metaDataMap);
+    }
+
+    public uploadChunk_result() {
+    }
+
+    public uploadChunk_result(
+      AuthorizationException aze)
+    {
+      this();
+      this.aze = aze;
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public uploadChunk_result(uploadChunk_result other) {
+      if (other.is_set_aze()) {
+        this.aze = new AuthorizationException(other.aze);
+      }
+    }
+
+    public uploadChunk_result deepCopy() {
+      return new uploadChunk_result(this);
+    }
+
+    @Override
+    public void clear() {
+      this.aze = null;
+    }
+
+    public AuthorizationException get_aze() {
+      return this.aze;
+    }
+
+    public void set_aze(AuthorizationException aze) {
+      this.aze = aze;
+    }
+
+    public void unset_aze() {
+      this.aze = null;
+    }
+
+    /** Returns true if field aze is set (has been assigned a value) and false otherwise */
+    public boolean is_set_aze() {
+      return this.aze != null;
+    }
+
+    public void set_aze_isSet(boolean value) {
+      if (!value) {
+        this.aze = null;
+      }
+    }
+
+    public void setFieldValue(_Fields field, Object value) {
+      switch (field) {
+      case AZE:
+        if (value == null) {
+          unset_aze();
+        } else {
+          set_aze((AuthorizationException)value);
+        }
+        break;
+
+      }
+    }
+
+    public Object getFieldValue(_Fields field) {
+      switch (field) {
+      case AZE:
+        return get_aze();
+
+      }
+      throw new IllegalStateException();
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      if (field == null) {
+        throw new IllegalArgumentException();
+      }
+
+      switch (field) {
+      case AZE:
+        return is_set_aze();
+      }
+      throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof uploadChunk_result)
+        return this.equals((uploadChunk_result)that);
+      return false;
+    }
+
+    public boolean equals(uploadChunk_result that) {
+      if (that == null)
+        return false;
+
+      boolean this_present_aze = true && this.is_set_aze();
+      boolean that_present_aze = true && that.is_set_aze();
+      if (this_present_aze || that_present_aze) {
+        if (!(this_present_aze && that_present_aze))
+          return false;
+        if (!this.aze.equals(that.aze))
+          return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      List<Object> list = new ArrayList<Object>();
+
+      boolean present_aze = true && (is_set_aze());
+      list.add(present_aze);
+      if (present_aze)
+        list.add(aze);
+
+      return list.hashCode();
+    }
+
+    @Override
+    public int compareTo(uploadChunk_result other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+
+      lastComparison = Boolean.valueOf(is_set_aze()).compareTo(other.is_set_aze());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (is_set_aze()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.aze, other.aze);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
+      return 0;
+    }
+
+    public _Fields fieldForId(int fieldId) {
+      return _Fields.findByThriftId(fieldId);
+    }
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+      schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+      schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+      }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("uploadChunk_result(");
+      boolean first = true;
+
+      sb.append("aze:");
+      if (this.aze == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.aze);
+      }
+      first = false;
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws org.apache.thrift.TException {
+      // check for required fields
+      // check for sub-struct validity
+    }
+
+    private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+      try {
+        write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+      try {
+        read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private static class uploadChunk_resultStandardSchemeFactory implements SchemeFactory {
+      public uploadChunk_resultStandardScheme getScheme() {
+        return new uploadChunk_resultStandardScheme();
+      }
+    }
+
+    private static class uploadChunk_resultStandardScheme extends StandardScheme<uploadChunk_result> {
+
+      public void read(org.apache.thrift.protocol.TProtocol iprot, uploadChunk_result struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TField schemeField;
+        iprot.readStructBegin();
+        while (true)
+        {
+          schemeField = iprot.readFieldBegin();
+          if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+            break;
+          }
+          switch (schemeField.id) {
+            case 1: // AZE
+              if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
+                struct.aze = new AuthorizationException();
+                struct.aze.read(iprot);
+                struct.set_aze_isSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
+            default:
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+          }
+          iprot.readFieldEnd();
+        }
+        iprot.readStructEnd();
+        struct.validate();
+      }
+
+      public void write(org.apache.thrift.protocol.TProtocol oprot, uploadChunk_result struct) throws org.apache.thrift.TException {
+        struct.validate();
+
+        oprot.writeStructBegin(STRUCT_DESC);
+        if (struct.aze != null) {
+          oprot.writeFieldBegin(AZE_FIELD_DESC);
+          struct.aze.write(oprot);
+          oprot.writeFieldEnd();
+        }
+        oprot.writeFieldStop();
+        oprot.writeStructEnd();
+      }
+
+    }
+
+    private static class uploadChunk_resultTupleSchemeFactory implements SchemeFactory {
+      public uploadChunk_resultTupleScheme getScheme() {
+        return new uploadChunk_resultTupleScheme();
+      }
+    }
+
+    private static class uploadChunk_resultTupleScheme extends TupleScheme<uploadChunk_result> {
+
+      @Override
+      public void write(org.apache.thrift.protocol.TProtocol prot, uploadChunk_result struct) throws org.apache.thrift.TException {
+        TTupleProtocol oprot = (TTupleProtocol) prot;
+        BitSet optionals = new BitSet();
+        if (struct.is_set_aze()) {
+          optionals.set(0);
+        }
+        oprot.writeBitSet(optionals, 1);
+        if (struct.is_set_aze()) {
+          struct.aze.write(oprot);
+        }
+      }
+
+      @Override
+      public void read(org.apache.thrift.protocol.TProtocol prot, uploadChunk_result struct) throws org.apache.thrift.TException {
+        TTupleProtocol iprot = (TTupleProtocol) prot;
+        BitSet incoming = iprot.readBitSet(1);
+        if (incoming.get(0)) {
+          struct.aze = new AuthorizationException();
+          struct.aze.read(iprot);
+          struct.set_aze_isSet(true);
+        }
+      }
+    }
+
+  }
+
+  public static class finishFileUpload_args implements org.apache.thrift.TBase<finishFileUpload_args, finishFileUpload_args._Fields>, java.io.Serializable, Cloneable, Comparable<finishFileUpload_args>   {
+    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("finishFileUpload_args");
+
+    private static final org.apache.thrift.protocol.TField LOCATION_FIELD_DESC = new org.apache.thrift.protocol.TField("location", org.apache.thrift.protocol.TType.STRING, (short)1);
+
+    private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+    static {
+      schemes.put(StandardScheme.class, new finishFileUpload_argsStandardSchemeFactory());
+      schemes.put(TupleScheme.class, new finishFileUpload_argsTupleSchemeFactory());
+    }
+
+    private String location; // required
+
+    /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+    public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+      LOCATION((short)1, "location");
+
+      private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+      static {
+        for (_Fields field : EnumSet.allOf(_Fields.class)) {
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not found.
+       */
+      public static _Fields findByThriftId(int fieldId) {
+        switch(fieldId) {
+          case 1: // LOCATION
+            return LOCATION;
+          default:
+            return null;
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      public static _Fields findByName(String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final String _fieldName;
+
+      _Fields(short thriftId, String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public String getFieldName() {
+        return _fieldName;
+      }
+    }
+
+    // isset id assignments
+    public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+    static {
+      Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+      tmpMap.put(_Fields.LOCATION, new org.apache.thrift.meta_data.FieldMetaData("location", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+      metaDataMap = Collections.unmodifiableMap(tmpMap);
+      org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(finishFileUpload_args.class, metaDataMap);
+    }
+
+    public finishFileUpload_args() {
+    }
+
+    public finishFileUpload_args(
+      String location)
+    {
+      this();
+      this.location = location;
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public finishFileUpload_args(finishFileUpload_args other) {
+      if (other.is_set_location()) {
+        this.location = other.location;
+      }
+    }
+
+    public finishFileUpload_args deepCopy() {
+      return new finishFileUpload_args(this);
+    }
+
+    @Override
+    public void clear() {
+      this.location = null;
+    }
+
+    public String get_location() {
+      return this.location;
+    }
+
+    public void set_location(String location) {
+      this.location = location;
+    }
+
+    public void unset_location() {
+      this.location = null;
+    }
+
+    /** Returns true if field location is set (has been assigned a value) and false otherwise */
+    public boolean is_set_location() {
+      return this.location != null;
+    }
+
+    public void set_location_isSet(boolean value) {
+      if (!value) {
+        this.location = null;
+      }
+    }
+
+    public void setFieldValue(_Fields field, Object value) {
+      switch (field) {
+      case LOCATION:
+        if (value == null) {
+          unset_location();
+        } else {
+          set_location((String)value);
+        }
+        break;
+
+      }
+    }
+
+    public Object getFieldValue(_Fields field) {
+      switch (field) {
+      case LOCATION:
+        return get_location();
+
+      }
+      throw new IllegalStateException();
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      if (field == null) {
+        throw new IllegalArgumentException();
+      }
+
+      switch (field) {
+      case LOCATION:
+        return is_set_location();
+      }
+      throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof finishFileUpload_args)
+        return this.equals((finishFileUpload_args)that);
+      return false;
+    }
+
+    public boolean equals(finishFileUpload_args that) {
+      if (that == null)
+        return false;
+
+      boolean this_present_location = true && this.is_set_location();
+      boolean that_present_location = true && that.is_set_location();
+      if (this_present_location || that_present_location) {
+        if (!(this_present_location && that_present_location))
+          return false;
+        if (!this.location.equals(that.location))
+          return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      List<Object> list = new ArrayList<Object>();
+
+      boolean present_location = true && (is_set_location());
+      list.add(present_location);
+      if (present_location)
+        list.add(location);
+
+      return list.hashCode();
+    }
+
+    @Override
+    public int compareTo(finishFileUpload_args other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+
+      lastComparison = Boolean.valueOf(is_set_location()).compareTo(other.is_set_location());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (is_set_location()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.location, other.location);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
+      return 0;
+    }
+
+    public _Fields fieldForId(int fieldId) {
+      return _Fields.findByThriftId(fieldId);
+    }
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+      schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+      schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("finishFileUpload_args(");
+      boolean first = true;
+
+      sb.append("location:");
+      if (this.location == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.location);
+      }
+      first = false;
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws org.apache.thrift.TException {
+      // check for required fields
+      // check for sub-struct validity
+    }
+
+    private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+      try {
+        write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+      try {
+        read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private static class finishFileUpload_argsStandardSchemeFactory implements SchemeFactory {
+      public finishFileUpload_argsStandardScheme getScheme() {
+        return new finishFileUpload_argsStandardScheme();
+      }
+    }
+
+    private static class finishFileUpload_argsStandardScheme extends StandardScheme<finishFileUpload_args> {
+
+      public void read(org.apache.thrift.protocol.TProtocol iprot, finishFileUpload_args struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TField schemeField;
+        iprot.readStructBegin();
+        while (true)
+        {
+          schemeField = iprot.readFieldBegin();
+          if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+            break;
+          }
+          switch (schemeField.id) {
+            case 1: // LOCATION
+              if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+                struct.location = iprot.readString();
+                struct.set_location_isSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
+            default:
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+          }
+          iprot.readFieldEnd();
+        }
+        iprot.readStructEnd();
+        struct.validate();
+      }
+
+      public void write(org.apache.thrift.protocol.TProtocol oprot, finishFileUpload_args struct) throws org.apache.thrift.TException {
+        struct.validate();
+
+        oprot.writeStructBegin(STRUCT_DESC);
+        if (struct.location != null) {
+          oprot.writeFieldBegin(LOCATION_FIELD_DESC);
+          oprot.writeString(struct.location);
+          oprot.writeFieldEnd();
+        }
+        oprot.writeFieldStop();
+        oprot.writeStructEnd();
+      }
+
+    }
+
+    private static class finishFileUpload_argsTupleSchemeFactory implements SchemeFactory {
+      public finishFileUpload_argsTupleScheme getScheme() {
+        return new finishFileUpload_argsTupleScheme();
+      }
+    }
+
+    private static class finishFileUpload_argsTupleScheme extends TupleScheme<finishFileUpload_args> {
+
+      @Override
+      public void write(org.apache.thrift.protocol.TProtocol prot, finishFileUpload_args struct) throws org.apache.thrift.TException {
+        TTupleProtocol oprot = (TTupleProtocol) prot;
+        BitSet optionals = new BitSet();
+        if (struct.is_set_location()) {
+          optionals.set(0);
+        }
+        oprot.writeBitSet(optionals, 1);
+        if (struct.is_set_location()) {
+          oprot.writeString(struct.location);
+        }
+      }
+
+      @Override
+      public void read(org.apache.thrift.protocol.TProtocol prot, finishFileUpload_args struct) throws org.apache.thrift.TException {
+        TTupleProtocol iprot = (TTupleProtocol) prot;
+        BitSet incoming = iprot.readBitSet(1);
+        if (incoming.get(0)) {
+          struct.location = iprot.readString();
+          struct.set_location_isSet(true);
+        }
+      }
+    }
+
+  }
+
+  public static class finishFileUpload_result implements org.apache.thrift.TBase<finishFileUpload_result, finishFileUpload_result._Fields>, java.io.Serializable, Cloneable, Comparable<finishFileUpload_result>   {
+    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("finishFileUpload_result");
+
+    private static final org.apache.thrift.protocol.TField AZE_FIELD_DESC = new org.apache.thrift.protocol.TField("aze", org.apache.thrift.protocol.TType.STRUCT, (short)1);
+
+    private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+    static {
+      schemes.put(StandardScheme.class, new finishFileUpload_resultStandardSchemeFactory());
+      schemes.put(TupleScheme.class, new finishFileUpload_resultTupleSchemeFactory());
+    }
+
+    private AuthorizationException aze; // required
+
+    /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+    public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+      AZE((short)1, "aze");
+
+      private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+      static {
+        for (_Fields field : EnumSet.allOf(_Fields.class)) {
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not found.
+       */
+      public static _Fields findByThriftId(int fieldId) {
+        switch(fieldId) {
+          case 1: // AZE
+            return AZE;
+          default:
+            return null;
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      public static _Fields findByName(String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final String _fieldName;
+
+      _Fields(short thriftId, String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public String getFieldName() {
+        return _fieldName;
+      }
+    }
+
+    // isset id assignments
+    public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+    static {
+      Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+      tmpMap.put(_Fields.AZE, new org.apache.thrift.meta_data.FieldMetaData("aze", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRUCT)));
+      metaDataMap = Collections.unmodifiableMap(tmpMap);
+      org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(finishFileUpload_result.class, metaDataMap);
+    }
+
+    public finishFileUpload_result() {
+    }
+
+    public finishFileUpload_result(
+      AuthorizationException aze)
+    {
+      this();
+      this.aze = aze;
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public finishFileUpload_result(finishFileUpload_result other) {
+      if (other.is_set_aze()) {
+        this.aze = new AuthorizationException(other.aze);
+      }
+    }
+
+    public finishFileUpload_result deepCopy() {
+      return new finishFileUpload_result(this);
+    }
+
+    @Override
+    public void clear() {
+      this.aze = null;
+    }
+
+    public AuthorizationException get_aze() {
+      return this.aze;
+    }
+
+    public void set_aze(AuthorizationException aze) {
+      this.aze = aze;
+    }
+
+    public void unset_aze() {
+      this.aze = null;
+    }
+
+    /** Returns true if field aze is set (has been assigned a value) and false otherwise */
+    public boolean is_set_aze() {
+      return this.aze != null;
+    }
+
+    public void set_aze_isSet(boolean value) {
+      if (!value) {
+        this.aze = null;
+      }
+    }
+
+    public void setFieldValue(_Fields field, Object value) {
+      switch (field) {
+      case AZE:
+        if (value == null) {
+          unset_aze();
+        } else {
+          set_aze((AuthorizationException)value);
+        }
+        break;
+
+      }
+    }
+
+    public Object getFieldValue(_Fields field) {
+      switch (field) {
+      case AZE:
+        return get_aze();
+
+      }
+      throw new IllegalStateException();
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      if (field == null) {
+        throw new IllegalArgumentException();
+      }
+
+      switch (field) {
+      case AZE:
+        return is_set_aze();
+      }
+      throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof finishFileUpload_result)
+        return this.equals((finishFileUpload_result)that);
+      return false;
+    }
+
+    public boolean equals(finishFileUpload_result that) {
+      if (that == null)
+        return false;
+
+      boolean this_present_aze = true && this.is_set_aze();
+      boolean that_present_aze = true && that.is_set_aze();
+      if (this_present_aze || that_present_aze) {
+        if (!(this_present_aze && that_present_aze))
+          return false;
+        if (!this.aze.equals(that.aze))
+          return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      List<Object> list = new ArrayList<Object>();
+
+      boolean present_aze = true && (is_set_aze());
+      list.add(present_aze);
+      if (present_aze)
+        list.add(aze);
+
+      return list.hashCode();
+    }
+
+    @Override
+    public int compareTo(finishFileUpload_result other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+
+      lastComparison = Boolean.valueOf(is_set_aze()).compareTo(other.is_set_aze());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (is_set_aze()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.aze, other.aze);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
+      return 0;
+    }
+
+    public _Fields fieldForId(int fieldId) {
+      return _Fields.findByThriftId(fieldId);
+    }
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+      schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+      schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+      }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("finishFileUpload_result(");
+      boolean first = true;
+
+      sb.append("aze:");
+      if (this.aze == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.aze);
+      }
+      first = false;
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws org.apache.thrift.TException {
+      // check for required fields
+      // check for sub-struct validity
+    }
+
+    private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+      try {
+        write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+      try {
+        read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private static class finishFileUpload_resultStandardSchemeFactory implements SchemeFactory {
+      public finishFileUpload_resultStandardScheme getScheme() {
+        return new finishFileUpload_resultStandardScheme();
+      }
+    }
+
+    private static class finishFileUpload_resultStandardScheme extends StandardScheme<finishFileUpload_result> {
+
+      public void read(org.apache.thrift.protocol.TProtocol iprot, finishFileUpload_result struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TField schemeField;
+        iprot.readStructBegin();
+        while (true)
+        {
+          schemeField = iprot.readFieldBegin();
+          if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+            break;
+          }
+          switch (schemeField.id) {
+            case 1: // AZE
+              if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
+                struct.aze = new AuthorizationException();
+                struct.aze.read(iprot);
+                struct.set_aze_isSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
+            default:
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+          }
+          iprot.readFieldEnd();
+        }
+        iprot.readStructEnd();
+        struct.validate();
+      }
+
+      public void write(org.apache.thrift.protocol.TProtocol oprot, finishFileUpload_result struct) throws org.apache.thrift.TException {
+        struct.validate();
+
+        oprot.writeStructBegin(STRUCT_DESC);
+        if (struct.aze != null) {
+          oprot.writeFieldBegin(AZE_FIELD_DESC);
+          struct.aze.write(oprot);
+          oprot.writeFieldEnd();
+        }
+        oprot.writeFieldStop();
+        oprot.writeStructEnd();
+      }
+
+    }
+
+    private static class finishFileUpload_resultTupleSchemeFactory implements SchemeFactory {
+      public finishFileUpload_resultTupleScheme getScheme() {
+        return new finishFileUpload_resultTupleScheme();
+      }
+    }
+
+    private static class finishFileUpload_resultTupleScheme extends TupleScheme<finishFileUpload_result> {
+
+      @Override
+      public void write(org.apache.thrift.protocol.TProtocol prot, finishFileUpload_result struct) throws org.apache.thrift.TException {
+        TTupleProtocol oprot = (TTupleProtocol) prot;
+        BitSet optionals = new BitSet();
+        if (struct.is_set_aze()) {
+          optionals.set(0);
+        }
+        oprot.writeBitSet(optionals, 1);
+        if (struct.is_set_aze()) {
+          struct.aze.write(oprot);
+        }
+      }
+
+      @Override
+      public void read(org.apache.thrift.protocol.TProtocol prot, finishFileUpload_result struct) throws org.apache.thrift.TException {
+        TTupleProtocol iprot = (TTupleProtocol) prot;
+        BitSet incoming = iprot.readBitSet(1);
+        if (incoming.get(0)) {
+          struct.aze = new AuthorizationException();
+          struct.aze.read(iprot);
+          struct.set_aze_isSet(true);
+        }
+      }
+    }
+
+  }
+
+  public static class beginFileDownload_args implements org.apache.thrift.TBase<beginFileDownload_args, beginFileDownload_args._Fields>, java.io.Serializable, Cloneable, Comparable<beginFileDownload_args>   {
+    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("beginFileDownload_args");
+
+    private static final org.apache.thrift.protocol.TField FILE_FIELD_DESC = new org.apache.thrift.protocol.TField("file", org.apache.thrift.protocol.TType.STRING, (short)1);
+
+    private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+    static {
+      schemes.put(StandardScheme.class, new beginFileDownload_argsStandardSchemeFactory());
+      schemes.put(TupleScheme.class, new beginFileDownload_argsTupleSchemeFactory());
+    }
+
+    private String file; // required
+
+    /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+    public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+      FILE((short)1, "file");
+
+      private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+      static {
+        for (_Fields field : EnumSet.allOf(_Fields.class)) {
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not found.
+       */
+      public static _Fields findByThriftId(int fieldId) {
+        switch(fieldId) {
+          case 1: // FILE
+            return FILE;
+          default:
+            return null;
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      public static _Fields findByName(String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final String _fieldName;
+
+      _Fields(short thriftId, String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public String getFieldName() {
+        return _fieldName;
+      }
+    }
+
+    // isset id assignments
+    public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+    static {
+      Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+      tmpMap.put(_Fields.FILE, new org.apache.thrift.meta_data.FieldMetaData("file", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+      metaDataMap = Collections.unmodifiableMap(tmpMap);
+      org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(beginFileDownload_args.class, metaDataMap);
+    }
+
+    public beginFileDownload_args() {
+    }
+
+    public beginFileDownload_args(
+      String file)
+    {
+      this();
+      this.file = file;
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public beginFileDownload_args(beginFileDownload_args other) {
+      if (other.is_set_file()) {
+        this.file = other.file;
+      }
+    }
+
+    public beginFileDownload_args deepCopy() {
+      return new beginFileDownload_args(this);
+    }
+
+    @Override
+    public void clear() {
+      this.file = null;
+    }
+
+    public String get_file() {
+      return this.file;
+    }
+
+    public void set_file(String file) {
+      this.file = file;
+    }
+
+    public void unset_file() {
+      this.file = null;
+    }
+
+    /** Returns true if field file is set (has been assigned a value) and false otherwise */
+    public boolean is_set_file() {
+      return this.file != null;
+    }
+
+    public void set_file_isSet(boolean value) {
+      if (!value) {
+        this.file = null;
+      }
+    }
+
+    public void setFieldValue(_Fields field, Object value) {
+      switch (field) {
+      case FILE:
+        if (value == null) {
+          unset_file();
+        } else {
+          set_file((String)value);
+        }
+        break;
+
+      }
+    }
+
+    public Object getFieldValue(_Fields field) {
+      switch (field) {
+      case FILE:
+        return get_file();
+
+      }
+      throw new IllegalStateException();
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      if (field == null) {
+        throw new IllegalArgumentException();
+      }
+
+      switch (field) {
+      case FILE:
+        return is_set_file();
+      }
+      throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof beginFileDownload_args)
+        return this.equals((beginFileDownload_args)that);
+      return false;
+    }
+
+    public boolean equals(beginFileDownload_args that) {
+      if (that == null)
+        return false;
+
+      boolean this_present_file = true && this.is_set_file();
+      boolean that_present_file = true && that.is_set_file();
+      if (this_present_file || that_present_file) {
+        if (!(this_present_file && that_present_file))
+          return false;
+        if (!this.file.equals(that.file))
+          return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      List<Object> list = new ArrayList<Object>();
+
+      boolean present_file = true && (is_set_file());
+      list.add(present_file);
+      if (present_file)
+        list.add(file);
+
+      return list.hashCode();
+    }
+
+    @Override
+    public int compareTo(beginFileDownload_args other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+
+      lastComparison = Boolean.valueOf(is_set_file()).compareTo(other.is_set_file());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (is_set_file()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.file, other.file);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
+      return 0;
+    }
+
+    public _Fields fieldForId(int fieldId) {
+      return _Fields.findByThriftId(fieldId);
+    }
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+      schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+      schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("beginFileDownload_args(");
       boolean first = true;
 
-      sb.append("location:");
-      if (this.location == null) {
-        sb.append("null");
-      } else {
-        sb.append(this.location);
-      }
-      first = false;
-      if (!first) sb.append(", ");
-      sb.append("chunk:");
-      if (this.chunk == null) {
+      sb.append("file:");
+      if (this.file == null) {
         sb.append("null");
       } else {
-        org.apache.thrift.TBaseHelper.toString(this.chunk, sb);
+        sb.append(this.file);
       }
       first = false;
       sb.append(")");
@@ -32033,15 +33780,15 @@ public class Nimbus {
       }
     }
 
-    private static class uploadChunk_argsStandardSchemeFactory implements SchemeFactory {
-      public uploadChunk_argsStandardScheme getScheme() {
-        return new uploadChunk_argsStandardScheme();
+    private static class beginFileDownload_argsStandardSchemeFactory implements SchemeFactory {
+      public beginFileDownload_argsStandardScheme getScheme() {
+        return new beginFileDownload_argsStandardScheme();
       }
     }
 
-    private static class uploadChunk_argsStandardScheme extends StandardScheme<uploadChunk_args> {
+    private static class beginFileDownload_argsStandardScheme extends StandardScheme<beginFileDownload_args> {
 
-      public void read(org.apache.thrift.protocol.TProtocol iprot, uploadChunk_args struct) throws org.apache.thrift.TException {
+      public void read(org.apache.thrift.protocol.TProtocol iprot, beginFileDownload_args struct) throws org.apache.thrift.TException {
         org.apache.thrift.protocol.TField schemeField;
         iprot.readStructBegin();
         while (true)
@@ -32051,18 +33798,10 @@ public class Nimbus {
             break;
           }
           switch (schemeField.id) {
-            case 1: // LOCATION
-              if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
-                struct.location = iprot.readString();
-                struct.set_location_isSet(true);
-              } else { 
-                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
-              }
-              break;
-            case 2: // CHUNK
+            case 1: // FILE
               if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
-                struct.chunk = iprot.readBinary();
-                struct.set_chunk_isSet(true);
+                struct.file = iprot.readString();
+                struct.set_file_isSet(true);
               } else { 
                 org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
               }
@@ -32076,18 +33815,13 @@ public class Nimbus {
         struct.validate();
       }
 
-      public void write(org.apache.thrift.protocol.TProtocol oprot, uploadChunk_args struct) throws org.apache.thrift.TException {
+      public void write(org.apache.thrift.protocol.TProtocol oprot, beginFileDownload_args struct) throws org.apache.thrift.TException {
         struct.validate();
 
         oprot.writeStructBegin(STRUCT_DESC);
-        if (struct.location != null) {
-          oprot.writeFieldBegin(LOCATION_FIELD_DESC);
-          oprot.writeString(struct.location);
-          oprot.writeFieldEnd();
-        }
-        if (struct.chunk != null) {
-          oprot.writeFieldBegin(CHUNK_FIELD_DESC);
-          oprot.writeBinary(struct.chunk);
+        if (struct.file != null) {
+          oprot.writeFieldBegin(FILE_FIELD_DESC);
+          oprot.writeString(struct.file);
           oprot.writeFieldEnd();
         }
         oprot.writeFieldStop();
@@ -32096,65 +33830,58 @@ public class Nimbus {
 
     }
 
-    private static class uploadChunk_argsTupleSchemeFactory implements SchemeFactory {
-      public uploadChunk_argsTupleScheme getScheme() {
-        return new uploadChunk_argsTupleScheme();
+    private static class beginFileDownload_argsTupleSchemeFactory implements SchemeFactory {
+      public beginFileDownload_argsTupleScheme getScheme() {
+        return new beginFileDownload_argsTupleScheme();
       }
     }
 
-    private static class uploadChunk_argsTupleScheme extends TupleScheme<uploadChunk_args> {
+    private static class beginFileDownload_argsTupleScheme extends TupleScheme<beginFileDownload_args> {
 
       @Override
-      public void write(org.apache.thrift.protocol.TProtocol prot, uploadChunk_args struct) throws org.apache.thrift.TException {
+      public void write(org.apache.thrift.protocol.TProtocol prot, beginFileDownload_args struct) throws org.apache.thrift.TException {
         TTupleProtocol oprot = (TTupleProtocol) prot;
         BitSet optionals = new BitSet();
-        if (struct.is_set_location()) {
+        if (struct.is_set_file()) {
           optionals.set(0);
         }
-        if (struct.is_set_chunk()) {
-          optionals.set(1);
-        }
-        oprot.writeBitSet(optionals, 2);
-        if (struct.is_set_location()) {
-          oprot.writeString(struct.location);
-        }
-        if (struct.is_set_chunk()) {
-          oprot.writeBinary(struct.chunk);
+        oprot.writeBitSet(optionals, 1);
+        if (struct.is_set_file()) {
+          oprot.writeString(struct.file);
         }
       }
 
       @Override
-      public void read(org.apache.thrift.protocol.TProtocol prot, uploadChunk_args struct) throws org.apache.thrift.TException {
+      public void read(org.apache.thrift.protocol.TProtocol prot, beginFileDownload_args struct) throws org.apache.thrift.TException {
         TTupleProtocol iprot = (TTupleProtocol) prot;
-        BitSet incoming = iprot.readBitSet(2);
+        BitSet incoming = iprot.readBitSet(1);
         if (incoming.get(0)) {
-          struct.location = iprot.readString();
-          struct.set_location_isSet(true);
-        }
-        if (incoming.get(1)) {
-          struct.chunk = iprot.readBinary();
-          struct.set_chunk_isSet(true);
+          struct.file = iprot.readString();
+          struct.set_file_isSet(true);
         }
       }
     }
 
   }
 
-  public static class uploadChunk_result implements org.apache.thrift.TBase<uploadChunk_result, uploadChunk_result._Fields>, java.io.Serializable, Cloneable, Comparable<uploadChunk_result>   {
-    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("uploadChunk_result");
+  public static class beginFileDownload_result implements org.apache.thrift.TBase<beginFileDownload_result, beginFileDownload_result._Fields>, java.io.Serializable, Cloneable, Comparable<beginFileDownload_result>   {
+    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("beginFileDownload_result");
 
+    private static final org.apache.thrift.protocol.TField SUCCESS_FIELD_DESC = new org.apache.thrift.protocol.TField("success", org.apache.thrift.protocol.TType.STRING, (short)0);
     private static final org.apache.thrift.protocol.TField AZE_FIELD_DESC = new org.apache.thrift.protocol.TField("aze", org.apache.thrift.protocol.TType.STRUCT, (short)1);
 
     private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
     static {
-      schemes.put(StandardScheme.class, new uploadChunk_resultStandardSchemeFactory());
-      schemes.put(TupleScheme.class, new uploadChunk_resultTupleSchemeFactory());
+      schemes.put(StandardScheme.class, new beginFileDownload_resultStandardSchemeFactory());
+      schemes.put(TupleScheme.class, new beginFileDownload_resultTupleSchemeFactory());
     }
 
+    private String success; // required
     private AuthorizationException aze; // required
 
     /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
     public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+      SUCCESS((short)0, "success"),
       AZE((short)1, "aze");
 
       private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -32170,6 +33897,8 @@ public class Nimbus {
        */
       public static _Fields findByThriftId(int fieldId) {
         switch(fieldId) {
+          case 0: // SUCCESS
+            return SUCCESS;
           case 1: // AZE
             return AZE;
           default:
@@ -32215,40 +33944,71 @@ public class Nimbus {
     public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
     static {
       Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+      tmpMap.put(_Fields.SUCCESS, new org.apache.thrift.meta_data.FieldMetaData("success", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
       tmpMap.put(_Fields.AZE, new org.apache.thrift.meta_data.FieldMetaData("aze", org.apache.thrift.TFieldRequirementType.DEFAULT, 
           new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRUCT)));
       metaDataMap = Collections.unmodifiableMap(tmpMap);
-      org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(uploadChunk_result.class, metaDataMap);
+      org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(beginFileDownload_result.class, metaDataMap);
     }
 
-    public uploadChunk_result() {
+    public beginFileDownload_result() {
     }
 
-    public uploadChunk_result(
+    public beginFileDownload_result(
+      String success,
       AuthorizationException aze)
     {
       this();
+      this.success = success;
       this.aze = aze;
     }
 
     /**
      * Performs a deep copy on <i>other</i>.
      */
-    public uploadChunk_result(uploadChunk_result other) {
+    public beginFileDownload_result(beginFileDownload_result other) {
+      if (other.is_set_success()) {
+        this.success = other.success;
+      }
       if (other.is_set_aze()) {
         this.aze = new AuthorizationException(other.aze);
       }
     }
 
-    public uploadChunk_result deepCopy() {
-      return new uploadChunk_result(this);
+    public beginFileDownload_result deepCopy() {
+      return new beginFileDownload_result(this);
     }
 
     @Override
     public void clear() {
+      this.success = null;
       this.aze = null;
     }
 
+    public String get_success() {
+      return this.success;
+    }
+
+    public void set_success(String success) {
+      this.success = success;
+    }
+
+    public void unset_success() {
+      this.success = null;
+    }
+
+    /** Returns true if field success is set (has been assigned a value) and false otherwise */
+    public boolean is_set_success() {
+      return this.success != null;
+    }
+
+    public void set_success_isSet(boolean value) {
+      if (!value) {
+        this.success = null;
+      }
+    }
+
     public AuthorizationException get_aze() {
       return this.aze;
     }
@@ -32274,6 +34034,14 @@ public class Nimbus {
 
     public void setFieldValue(_Fields field, Object value) {
       switch (field) {
+      case SUCCESS:
+        if (value == null) {
+          unset_success();
+        } else {
+          set_success((String)value);
+        }
+        break;
+
       case AZE:
         if (value == null) {
           unset_aze();
@@ -32287,6 +34055,9 @@ public class Nimbus {
 
     public Object getFieldValue(_Fields field) {
       switch (field) {
+      case SUCCESS:
+        return get_success();
+
       case AZE:
         return get_aze();
 
@@ -32301,6 +34072,8 @@ public class Nimbus {
       }
 
       switch (field) {
+   

<TRUNCATED>

[6/8] storm git commit: addressed review comments

Posted by ka...@apache.org.
addressed review comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0f9dbf40
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0f9dbf40
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0f9dbf40

Branch: refs/heads/1.x-branch
Commit: 0f9dbf401dc1c2adab4f856067350a370b03e27e
Parents: 897a6d2
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Dec 2 15:29:38 2016 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Dec 5 07:48:57 2016 -0600

----------------------------------------------------------------------
 .../src/jvm/org/apache/storm/starter/WordCountTopology.java      | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0f9dbf40/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
index b47981d..e4a5711 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
@@ -90,9 +90,7 @@ public class WordCountTopology {
     if (args != null && args.length > 0) {
       conf.setNumWorkers(3);
 
-      for (String name: args) {
-        StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
-      }
+      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
     }
     else {
       conf.setMaxTaskParallelism(3);


[3/8] storm git commit: Added in some optimizations for better topology submission performance

Posted by ka...@apache.org.
http://git-wip-us.apache.org/repos/asf/storm/blob/897a6d2b/storm-core/src/jvm/org/apache/storm/utils/NimbusClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/NimbusClient.java b/storm-core/src/jvm/org/apache/storm/utils/NimbusClient.java
index e8cef09..85209d6 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/NimbusClient.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/NimbusClient.java
@@ -18,7 +18,6 @@
 package org.apache.storm.utils;
 
 import org.apache.storm.Config;
-import org.apache.storm.generated.ClusterSummary;
 import org.apache.storm.generated.Nimbus;
 import org.apache.storm.generated.NimbusSummary;
 import org.apache.storm.security.auth.ReqContext;
@@ -78,32 +77,36 @@ public class NimbusClient extends ThriftClient {
 
         for (String host : seeds) {
             int port = Integer.parseInt(conf.get(Config.NIMBUS_THRIFT_PORT).toString());
-            ClusterSummary clusterInfo;
-            try (NimbusClient client = new NimbusClient(conf, host, port, null, asUser)) {
-                clusterInfo = client.getClient().getClusterInfo();
+            NimbusSummary nimbusSummary;
+            NimbusClient client = null;
+            try {
+                client = new NimbusClient(conf, host, port, null, asUser);
+                nimbusSummary = client.getClient().getLeader();
+                if (nimbusSummary != null) {
+                    String leaderNimbus = nimbusSummary.get_host() + ":" + nimbusSummary.get_port();
+                    LOG.info("Found leader nimbus : {}", leaderNimbus);
+                    if (nimbusSummary.get_host().equals(host) && nimbusSummary.get_port() == port) {
+                        NimbusClient ret = client;
+                        client = null;
+                        return ret;
+                    }
+                    try {
+                        return new NimbusClient(conf, nimbusSummary.get_host(), nimbusSummary.get_port(), null, asUser);
+                    } catch (TTransportException e) {
+                        throw new RuntimeException("Failed to create a nimbus client for the leader " + leaderNimbus, e);
+                    }
+                }
             } catch (Exception e) {
                 LOG.warn("Ignoring exception while trying to get leader nimbus info from " + host
                         + ". will retry with a different seed host.", e);
                 continue;
-            }
-            List<NimbusSummary> nimbuses = clusterInfo.get_nimbuses();
-            if (nimbuses != null) {
-                for (NimbusSummary nimbusSummary : nimbuses) {
-                    if (nimbusSummary.is_isLeader()) {
-                        String leaderNimbus = nimbusSummary.get_host() + ":" + nimbusSummary.get_port();
-                        LOG.info("Found leader nimbus : {}", leaderNimbus);
-
-                        try {
-                            return new NimbusClient(conf, nimbusSummary.get_host(), nimbusSummary.get_port(), null, asUser);
-                        } catch (TTransportException e) {
-                            throw new RuntimeException("Failed to create a nimbus client for the leader " + leaderNimbus, e);
-                        }
-                    }
+            } finally {
+                if (client != null) {
+                    client.close();
                 }
-                throw new NimbusLeaderNotFoundException(
-                        "Found nimbuses " + nimbuses + " none of which is elected as leader, please try " +
-                        "again after some time.");
             }
+            throw new NimbusLeaderNotFoundException("Could not find a nimbus leader, please try " +
+                    "again after some time.");
         }
         throw new NimbusLeaderNotFoundException(
                 "Could not find leader nimbus from seed hosts " + seeds + ". " +

http://git-wip-us.apache.org/repos/asf/storm/blob/897a6d2b/storm-core/src/jvm/org/apache/storm/utils/StormBoundedExponentialBackoffRetry.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/StormBoundedExponentialBackoffRetry.java b/storm-core/src/jvm/org/apache/storm/utils/StormBoundedExponentialBackoffRetry.java
index 7528221..33c2c1b 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/StormBoundedExponentialBackoffRetry.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/StormBoundedExponentialBackoffRetry.java
@@ -66,11 +66,14 @@ public class StormBoundedExponentialBackoffRetry extends BoundedExponentialBacko
             int exp = 1 << retryCount;
             int jitter = random.nextInt(exp);
             int sleepTimeMs = super.getBaseSleepTimeMs() + exp + jitter;
+            LOG.warn("WILL SLEEP FOR {}ms (NOT MAX)", sleepTimeMs);
             return sleepTimeMs;
         } else {
             int stepJitter = random.nextInt(stepSize);
-            return Math.min(super.getMaxSleepTimeMs(), (linearBaseSleepMs +
+            int sleepTimeMs = Math.min(super.getMaxSleepTimeMs(), (linearBaseSleepMs +
                     (stepSize * (retryCount - expRetriesThreshold)) + stepJitter));
+            LOG.warn("WILL SLEEP FOR {}ms (MAX)", sleepTimeMs);
+            return sleepTimeMs;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/897a6d2b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index ed5c552..75ffd23 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -1142,12 +1142,14 @@ public class Utils {
 
     public static CuratorFramework newCuratorStarted(Map conf, List<String> servers, Object port, String root, ZookeeperAuthInfo auth) {
         CuratorFramework ret = newCurator(conf, servers, port, root, auth);
+        LOG.info("Starting Utils Curator...");
         ret.start();
         return ret;
     }
 
     public static CuratorFramework newCuratorStarted(Map conf, List<String> servers, Object port, ZookeeperAuthInfo auth) {
         CuratorFramework ret = newCurator(conf, servers, port, auth);
+        LOG.info("Starting Utils Curator (2)...");
         ret.start();
         return ret;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/897a6d2b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
index 0580f41..ef35307 100644
--- a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
+++ b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
@@ -122,6 +122,7 @@ public class Zookeeper {
                 }
             }
         });
+        LOG.info("Staring ZK Curator");
         fk.start();
         return fk;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/897a6d2b/storm-core/src/py/storm/Nimbus-remote
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/Nimbus-remote b/storm-core/src/py/storm/Nimbus-remote
index b39050e..0d643c3 100644
--- a/storm-core/src/py/storm/Nimbus-remote
+++ b/storm-core/src/py/storm/Nimbus-remote
@@ -76,6 +76,8 @@ if len(sys.argv) <= 1 or sys.argv[1] == '--help':
   print('  string downloadChunk(string id)')
   print('  string getNimbusConf()')
   print('  ClusterSummary getClusterInfo()')
+  print('  NimbusSummary getLeader()')
+  print('  bool isTopologyNameAllowed(string name)')
   print('  TopologyInfo getTopologyInfo(string id)')
   print('  TopologyInfo getTopologyInfoWithOpts(string id, GetInfoOptions options)')
   print('  TopologyPageInfo getTopologyPageInfo(string id, string window, bool is_include_sys)')
@@ -345,6 +347,18 @@ elif cmd == 'getClusterInfo':
     sys.exit(1)
   pp.pprint(client.getClusterInfo())
 
+elif cmd == 'getLeader':
+  if len(args) != 0:
+    print('getLeader requires 0 args')
+    sys.exit(1)
+  pp.pprint(client.getLeader())
+
+elif cmd == 'isTopologyNameAllowed':
+  if len(args) != 1:
+    print('isTopologyNameAllowed requires 1 args')
+    sys.exit(1)
+  pp.pprint(client.isTopologyNameAllowed(args[0],))
+
 elif cmd == 'getTopologyInfo':
   if len(args) != 1:
     print('getTopologyInfo requires 1 args')

http://git-wip-us.apache.org/repos/asf/storm/blob/897a6d2b/storm-core/src/py/storm/Nimbus.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/Nimbus.py b/storm-core/src/py/storm/Nimbus.py
index 7e5470a..1c5e86e 100644
--- a/storm-core/src/py/storm/Nimbus.py
+++ b/storm-core/src/py/storm/Nimbus.py
@@ -291,6 +291,16 @@ class Iface:
   def getClusterInfo(self):
     pass
 
+  def getLeader(self):
+    pass
+
+  def isTopologyNameAllowed(self, name):
+    """
+    Parameters:
+     - name
+    """
+    pass
+
   def getTopologyInfo(self, id):
     """
     Parameters:
@@ -1523,6 +1533,67 @@ class Client(Iface):
       raise result.aze
     raise TApplicationException(TApplicationException.MISSING_RESULT, "getClusterInfo failed: unknown result")
 
+  def getLeader(self):
+    self.send_getLeader()
+    return self.recv_getLeader()
+
+  def send_getLeader(self):
+    self._oprot.writeMessageBegin('getLeader', TMessageType.CALL, self._seqid)
+    args = getLeader_args()
+    args.write(self._oprot)
+    self._oprot.writeMessageEnd()
+    self._oprot.trans.flush()
+
+  def recv_getLeader(self):
+    iprot = self._iprot
+    (fname, mtype, rseqid) = iprot.readMessageBegin()
+    if mtype == TMessageType.EXCEPTION:
+      x = TApplicationException()
+      x.read(iprot)
+      iprot.readMessageEnd()
+      raise x
+    result = getLeader_result()
+    result.read(iprot)
+    iprot.readMessageEnd()
+    if result.success is not None:
+      return result.success
+    if result.aze is not None:
+      raise result.aze
+    raise TApplicationException(TApplicationException.MISSING_RESULT, "getLeader failed: unknown result")
+
+  def isTopologyNameAllowed(self, name):
+    """
+    Parameters:
+     - name
+    """
+    self.send_isTopologyNameAllowed(name)
+    return self.recv_isTopologyNameAllowed()
+
+  def send_isTopologyNameAllowed(self, name):
+    self._oprot.writeMessageBegin('isTopologyNameAllowed', TMessageType.CALL, self._seqid)
+    args = isTopologyNameAllowed_args()
+    args.name = name
+    args.write(self._oprot)
+    self._oprot.writeMessageEnd()
+    self._oprot.trans.flush()
+
+  def recv_isTopologyNameAllowed(self):
+    iprot = self._iprot
+    (fname, mtype, rseqid) = iprot.readMessageBegin()
+    if mtype == TMessageType.EXCEPTION:
+      x = TApplicationException()
+      x.read(iprot)
+      iprot.readMessageEnd()
+      raise x
+    result = isTopologyNameAllowed_result()
+    result.read(iprot)
+    iprot.readMessageEnd()
+    if result.success is not None:
+      return result.success
+    if result.aze is not None:
+      raise result.aze
+    raise TApplicationException(TApplicationException.MISSING_RESULT, "isTopologyNameAllowed failed: unknown result")
+
   def getTopologyInfo(self, id):
     """
     Parameters:
@@ -1895,6 +1966,8 @@ class Processor(Iface, TProcessor):
     self._processMap["downloadChunk"] = Processor.process_downloadChunk
     self._processMap["getNimbusConf"] = Processor.process_getNimbusConf
     self._processMap["getClusterInfo"] = Processor.process_getClusterInfo
+    self._processMap["getLeader"] = Processor.process_getLeader
+    self._processMap["isTopologyNameAllowed"] = Processor.process_isTopologyNameAllowed
     self._processMap["getTopologyInfo"] = Processor.process_getTopologyInfo
     self._processMap["getTopologyInfoWithOpts"] = Processor.process_getTopologyInfoWithOpts
     self._processMap["getTopologyPageInfo"] = Processor.process_getTopologyPageInfo
@@ -2713,6 +2786,50 @@ class Processor(Iface, TProcessor):
     oprot.writeMessageEnd()
     oprot.trans.flush()
 
+  def process_getLeader(self, seqid, iprot, oprot):
+    args = getLeader_args()
+    args.read(iprot)
+    iprot.readMessageEnd()
+    result = getLeader_result()
+    try:
+      result.success = self._handler.getLeader()
+      msg_type = TMessageType.REPLY
+    except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+      raise
+    except AuthorizationException as aze:
+      msg_type = TMessageType.REPLY
+      result.aze = aze
+    except Exception as ex:
+      msg_type = TMessageType.EXCEPTION
+      logging.exception(ex)
+      result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+    oprot.writeMessageBegin("getLeader", msg_type, seqid)
+    result.write(oprot)
+    oprot.writeMessageEnd()
+    oprot.trans.flush()
+
+  def process_isTopologyNameAllowed(self, seqid, iprot, oprot):
+    args = isTopologyNameAllowed_args()
+    args.read(iprot)
+    iprot.readMessageEnd()
+    result = isTopologyNameAllowed_result()
+    try:
+      result.success = self._handler.isTopologyNameAllowed(args.name)
+      msg_type = TMessageType.REPLY
+    except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+      raise
+    except AuthorizationException as aze:
+      msg_type = TMessageType.REPLY
+      result.aze = aze
+    except Exception as ex:
+      msg_type = TMessageType.EXCEPTION
+      logging.exception(ex)
+      result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+    oprot.writeMessageBegin("isTopologyNameAllowed", msg_type, seqid)
+    result.write(oprot)
+    oprot.writeMessageEnd()
+    oprot.trans.flush()
+
   def process_getTopologyInfo(self, seqid, iprot, oprot):
     args = getTopologyInfo_args()
     args.read(iprot)
@@ -8017,6 +8134,274 @@ class getClusterInfo_result:
   def __ne__(self, other):
     return not (self == other)
 
+class getLeader_args:
+
+  thrift_spec = (
+  )
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('getLeader_args')
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    return
+
+
+  def __hash__(self):
+    value = 17
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
+class getLeader_result:
+  """
+  Attributes:
+   - success
+   - aze
+  """
+
+  thrift_spec = (
+    (0, TType.STRUCT, 'success', (NimbusSummary, NimbusSummary.thrift_spec), None, ), # 0
+    (1, TType.STRUCT, 'aze', (AuthorizationException, AuthorizationException.thrift_spec), None, ), # 1
+  )
+
+  def __init__(self, success=None, aze=None,):
+    self.success = success
+    self.aze = aze
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 0:
+        if ftype == TType.STRUCT:
+          self.success = NimbusSummary()
+          self.success.read(iprot)
+        else:
+          iprot.skip(ftype)
+      elif fid == 1:
+        if ftype == TType.STRUCT:
+          self.aze = AuthorizationException()
+          self.aze.read(iprot)
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('getLeader_result')
+    if self.success is not None:
+      oprot.writeFieldBegin('success', TType.STRUCT, 0)
+      self.success.write(oprot)
+      oprot.writeFieldEnd()
+    if self.aze is not None:
+      oprot.writeFieldBegin('aze', TType.STRUCT, 1)
+      self.aze.write(oprot)
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    return
+
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.success)
+    value = (value * 31) ^ hash(self.aze)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
+class isTopologyNameAllowed_args:
+  """
+  Attributes:
+   - name
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.STRING, 'name', None, None, ), # 1
+  )
+
+  def __init__(self, name=None,):
+    self.name = name
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.STRING:
+          self.name = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('isTopologyNameAllowed_args')
+    if self.name is not None:
+      oprot.writeFieldBegin('name', TType.STRING, 1)
+      oprot.writeString(self.name.encode('utf-8'))
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    return
+
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.name)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
+class isTopologyNameAllowed_result:
+  """
+  Attributes:
+   - success
+   - aze
+  """
+
+  thrift_spec = (
+    (0, TType.BOOL, 'success', None, None, ), # 0
+    (1, TType.STRUCT, 'aze', (AuthorizationException, AuthorizationException.thrift_spec), None, ), # 1
+  )
+
+  def __init__(self, success=None, aze=None,):
+    self.success = success
+    self.aze = aze
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 0:
+        if ftype == TType.BOOL:
+          self.success = iprot.readBool()
+        else:
+          iprot.skip(ftype)
+      elif fid == 1:
+        if ftype == TType.STRUCT:
+          self.aze = AuthorizationException()
+          self.aze.read(iprot)
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('isTopologyNameAllowed_result')
+    if self.success is not None:
+      oprot.writeFieldBegin('success', TType.BOOL, 0)
+      oprot.writeBool(self.success)
+      oprot.writeFieldEnd()
+    if self.aze is not None:
+      oprot.writeFieldBegin('aze', TType.STRUCT, 1)
+      self.aze.write(oprot)
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    return
+
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.success)
+    value = (value * 31) ^ hash(self.aze)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
 class getTopologyInfo_args:
   """
   Attributes:

http://git-wip-us.apache.org/repos/asf/storm/blob/897a6d2b/storm-core/src/storm.thrift
----------------------------------------------------------------------
diff --git a/storm-core/src/storm.thrift b/storm-core/src/storm.thrift
index 8e80d2a..700e5a0 100644
--- a/storm-core/src/storm.thrift
+++ b/storm-core/src/storm.thrift
@@ -667,6 +667,8 @@ service Nimbus {
   string getNimbusConf() throws (1: AuthorizationException aze);
   // stats functions
   ClusterSummary getClusterInfo() throws (1: AuthorizationException aze);
+  NimbusSummary getLeader() throws (1: AuthorizationException aze);
+  bool isTopologyNameAllowed(1: string name) throws (1: AuthorizationException aze);
   TopologyInfo getTopologyInfo(1: string id) throws (1: NotAliveException e, 2: AuthorizationException aze);
   TopologyInfo getTopologyInfoWithOpts(1: string id, 2: GetInfoOptions options) throws (1: NotAliveException e, 2: AuthorizationException aze);
   TopologyPageInfo getTopologyPageInfo(1: string id, 2: string window, 3: bool is_include_sys) throws (1: NotAliveException e, 2: AuthorizationException aze);


[7/8] storm git commit: Merge branch 'STORM-2190-1.x' of https://github.com/revans2/incubator-storm into STORM-2190-1.x-merge

Posted by ka...@apache.org.
Merge branch 'STORM-2190-1.x' of https://github.com/revans2/incubator-storm into STORM-2190-1.x-merge


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/af9b7c8c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/af9b7c8c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/af9b7c8c

Branch: refs/heads/1.x-branch
Commit: af9b7c8c241922f7688c3da18577684b742e08af
Parents: b515356 0f9dbf4
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Dec 20 17:36:57 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Dec 20 17:36:57 2016 +0900

----------------------------------------------------------------------
 .../org/apache/storm/command/kill_topology.clj  |    8 +-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |  161 +-
 .../jvm/org/apache/storm/StormSubmitter.java    |  121 +-
 .../storm/dependency/DependencyUploader.java    |   22 +-
 .../jvm/org/apache/storm/generated/Nimbus.java  | 3377 ++++++++++++++----
 .../org/apache/storm/utils/NimbusClient.java    |   45 +-
 .../StormBoundedExponentialBackoffRetry.java    |    5 +-
 .../src/jvm/org/apache/storm/utils/Utils.java   |    2 +
 .../org/apache/storm/zookeeper/Zookeeper.java   |    1 +
 storm-core/src/py/storm/Nimbus-remote           |   14 +
 storm-core/src/py/storm/Nimbus.py               |  385 ++
 storm-core/src/storm.thrift                     |    2 +
 12 files changed, 3207 insertions(+), 936 deletions(-)
----------------------------------------------------------------------



[8/8] storm git commit: STORM-2190: CHANGELOG

Posted by ka...@apache.org.
STORM-2190: CHANGELOG


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4154490f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4154490f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4154490f

Branch: refs/heads/1.x-branch
Commit: 4154490f71b188996b51bd4cd9dbb47dc3871f21
Parents: af9b7c8
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Dec 20 17:56:13 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Dec 20 17:56:13 2016 +0900

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4154490f/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index b965cf4..63a5777 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 1.1.0
+ * STORM-2190: reduce contention between submission and scheduling
  * STORM-2239: Handle InterruptException in new Kafka spout
  * STORM-2087: Storm-kafka-client: Failed tuples are not always replayed
  * STORM-2238: Add Timestamp extractor for windowed bolt


[5/8] storm git commit: Added in some optimizations for better topology submission performance

Posted by ka...@apache.org.
Added in some optimizations for better topology submission performance

Conflicts:
	storm-core/src/jvm/org/apache/storm/StormSubmitter.java
	storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/897a6d2b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/897a6d2b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/897a6d2b

Branch: refs/heads/1.x-branch
Commit: 897a6d2b9c2e34d06ba949c227d84f63904f5b5e
Parents: 93c1ea3
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Dec 2 15:28:17 2016 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Dec 5 07:48:44 2016 -0600

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |   28 +
 .../jvm/org/apache/storm/StormSubmitter.java    |  121 +-
 .../storm/dependency/DependencyUploader.java    |   22 +-
 .../jvm/org/apache/storm/generated/Nimbus.java  | 3377 ++++++++++++++----
 .../org/apache/storm/utils/NimbusClient.java    |   45 +-
 .../StormBoundedExponentialBackoffRetry.java    |    5 +-
 .../src/jvm/org/apache/storm/utils/Utils.java   |    2 +
 .../org/apache/storm/zookeeper/Zookeeper.java   |    1 +
 storm-core/src/py/storm/Nimbus-remote           |   14 +
 storm-core/src/py/storm/Nimbus.py               |  385 ++
 storm-core/src/storm.thrift                     |    2 +
 11 files changed, 3137 insertions(+), 865 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/897a6d2b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index e9d35a8..112d00b 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -94,6 +94,8 @@
 (defmeter nimbus:num-getTopology-calls)
 (defmeter nimbus:num-getUserTopology-calls)
 (defmeter nimbus:num-getClusterInfo-calls)
+(defmeter nimbus:num-getLeader-calls)
+(defmeter nimbus:num-isTopologyNameAllowed-calls)
 (defmeter nimbus:num-getTopologyInfoWithOpts-calls)
 (defmeter nimbus:num-getTopologyInfo-calls)
 (defmeter nimbus:num-getTopologyPageInfo-calls)
@@ -2353,6 +2355,32 @@
             topo-history-list (read-topology-history nimbus user admin-users)]
         (TopologyHistoryInfo. (distinct (concat active-ids-for-user topo-history-list)))))
 
+    (^NimbusSummary getLeader [this]
+      (mark! nimbus:num-getLeader-calls)
+      (check-authorization! nimbus nil nil "getClusterInfo")
+      (let [storm-cluster-state (:storm-cluster-state nimbus)
+            nimbuses (.nimbuses storm-cluster-state)
+            leader-elector (:leader-elector nimbus)
+            leader (.getLeader leader-elector)
+            leader-host (.getHost leader)
+            leader-port (.getPort leader)
+            leader-summary (first (filter
+               (fn [nimbus-summary] (and (= leader-host (.get_host nimbus-summary)) (= leader-port (.get_port nimbus-summary))))
+               nimbuses))]
+        (.set_uptime_secs leader-summary (time-delta (.get_uptime_secs leader-summary)))
+        (.set_isLeader leader-summary true)
+        leader-summary))
+        
+    (^boolean isTopologyNameAllowed [this ^String name]
+      (mark! nimbus:num-isTopologyNameAllowed-calls)
+      (check-authorization! nimbus nil nil "getClusterInfo")
+      (try
+        (validate-topology-name! name)
+        (check-storm-active! nimbus name false)
+        true
+        (catch InvalidTopologyException e false)
+        (catch AlreadyAliveException e false)))
+ 
     Shutdownable
     (shutdown [this]
       (mark! nimbus:num-shutdown-calls)

http://git-wip-us.apache.org/repos/asf/storm/blob/897a6d2b/storm-core/src/jvm/org/apache/storm/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/StormSubmitter.java b/storm-core/src/jvm/org/apache/storm/StormSubmitter.java
index effeed4..e414d60 100644
--- a/storm-core/src/jvm/org/apache/storm/StormSubmitter.java
+++ b/storm-core/src/jvm/org/apache/storm/StormSubmitter.java
@@ -128,19 +128,19 @@ public class StormSubmitter {
             return;
         }
         try {
-            if(localNimbus!=null) {
-                LOG.info("Pushing Credentials to topology " + name + " in local mode");
+            if (localNimbus!=null) {
+                LOG.info("Pushing Credentials to topology {} in local mode", name);
                 localNimbus.uploadNewCredentials(name, new Credentials(fullCreds));
             } else {
                 NimbusClient client = NimbusClient.getConfiguredClient(conf);
                 try {
-                    LOG.info("Uploading new credentials to " +  name);
+                    LOG.info("Uploading new credentials to {}", name);
                     client.getClient().uploadNewCredentials(name, new Credentials(fullCreds));
                 } finally {
                     client.close();
                 }
             }
-            LOG.info("Finished submitting topology: " +  name);
+            LOG.info("Finished pushing creds to topology: {}", name);
         } catch(TException e) {
             throw new RuntimeException(e);
         }
@@ -227,9 +227,9 @@ public class StormSubmitter {
             opts.set_creds(new Credentials(fullCreds));
         }
         try {
-            if(localNimbus!=null) {
+            if (localNimbus!=null) {
                 LOG.info("Submitting topology " + name + " in local mode");
-                if(opts!=null) {
+                if (opts!=null) {
                     localNimbus.submitTopologyWithOpts(name, stormConf, topology, opts);
                 } else {
                     // this is for backwards compatibility
@@ -238,37 +238,41 @@ public class StormSubmitter {
                 LOG.info("Finished submitting topology: " +  name);
             } else {
                 String serConf = JSONValue.toJSONString(stormConf);
-                if(topologyNameExists(conf, name, asUser)) {
-                    throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
-                }
-
-                // Dependency uploading only makes sense for distributed mode
-                List<String> jarsBlobKeys = Collections.emptyList();
-                List<String> artifactsBlobKeys;
-
-                DependencyUploader uploader = new DependencyUploader();
-                try {
-                    uploader.init();
-                    jarsBlobKeys = uploadDependencyJarsToBlobStore(uploader);
-                    artifactsBlobKeys = uploadDependencyArtifactsToBlobStore(uploader);
-                } catch (Throwable e) {
-                    // remove uploaded jars blobs, not artifacts since they're shared across the cluster
-                    uploader.deleteBlobs(jarsBlobKeys);
-                    uploader.shutdown();
-                    throw e;
-                }
-
-                try {
-                    setDependencyBlobsToTopology(topology, jarsBlobKeys, artifactsBlobKeys);
-                    submitTopologyInDistributeMode(name, topology, opts, progressListener, asUser, conf, serConf);
-                } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
-                    // remove uploaded jars blobs, not artifacts since they're shared across the cluster
-                    // Note that we don't handle TException to delete jars blobs
-                    // because it's safer to leave some blobs instead of topology not running
-                    uploader.deleteBlobs(jarsBlobKeys);
-                    throw e;
-                } finally {
-                    uploader.shutdown();
+                try (NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser)) {
+                    if (topologyNameExists(name, client)) {
+                        throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
+                    }
+
+                    // Dependency uploading only makes sense for distributed mode
+                    List<String> jarsBlobKeys = Collections.emptyList();
+                    List<String> artifactsBlobKeys;
+
+                    DependencyUploader uploader = new DependencyUploader();
+                    try {
+                        uploader.init();
+
+                        jarsBlobKeys = uploadDependencyJarsToBlobStore(uploader);
+
+                        artifactsBlobKeys = uploadDependencyArtifactsToBlobStore(uploader);
+                    } catch (Throwable e) {
+                        // remove uploaded jars blobs, not artifacts since they're shared across the cluster
+                        uploader.deleteBlobs(jarsBlobKeys);
+                        uploader.shutdown();
+                        throw e;
+                    }
+
+                    try {
+                        setDependencyBlobsToTopology(topology, jarsBlobKeys, artifactsBlobKeys);
+                        submitTopologyInDistributeMode(name, topology, opts, progressListener, asUser, conf, serConf, client);
+                    } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
+                        // remove uploaded jars blobs, not artifacts since they're shared across the cluster
+                        // Note that we don't handle TException to delete jars blobs
+                        // because it's safer to leave some blobs instead of topology not running
+                        uploader.deleteBlobs(jarsBlobKeys);
+                        throw e;
+                    } finally {
+                        uploader.shutdown();
+                    }
                 }
             }
         } catch(TException e) {
@@ -316,19 +320,20 @@ public class StormSubmitter {
 
     private static void submitTopologyInDistributeMode(String name, StormTopology topology, SubmitOptions opts,
                                                        ProgressListener progressListener, String asUser, Map conf,
-                                                       String serConf) throws TException {
-        String jar = submitJarAs(conf, System.getProperty("storm.jar"), progressListener, asUser);
-        try (NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser)){
-            LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf);
+                                                       String serConf, NimbusClient client) throws TException {
+        try {
+            String jar = submitJarAs(conf, System.getProperty("storm.jar"), progressListener, client);
+            LOG.info("Submitting topology {} in distributed mode with conf {}", name, serConf);
+
             if (opts != null) {
                 client.getClient().submitTopologyWithOpts(name, jar, serConf, topology, opts);
             } else {
                 // this is for backwards compatibility
                 client.getClient().submitTopology(name, jar, serConf, topology);
             }
-            LOG.info("Finished submitting topology: " + name);
+            LOG.info("Finished submitting topology: {}", name);
         } catch (InvalidTopologyException e) {
-            LOG.warn("Topology submission exception: " + e.get_msg());
+            LOG.warn("Topology submission exception: {}", e.get_msg());
             throw e;
         } catch (AlreadyAliveException e) {
             LOG.warn("Topology already alive exception", e);
@@ -444,16 +449,9 @@ public class StormSubmitter {
         });
     }
 
-    private static boolean topologyNameExists(Map conf, String name, String asUser) {
-        try (NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser)) {
-            ClusterSummary summary = client.getClient().getClusterInfo();
-            for(TopologySummary s : summary.get_topologies()) {
-                if(s.get_name().equals(name)) {
-                    return true;
-                }
-            }
-            return false;
-
+    private static boolean topologyNameExists(String name, NimbusClient client) {
+        try {
+            return !client.getClient().isTopologyNameAllowed(name);
         } catch(Exception e) {
             throw new RuntimeException(e);
         }
@@ -473,13 +471,12 @@ public class StormSubmitter {
         return submitJar(conf, localJar, null);
     }
 
-
-    public static String submitJarAs(Map conf, String localJar, ProgressListener listener, String asUser) {
+    public static String submitJarAs(Map conf, String localJar, ProgressListener listener, NimbusClient client) {
         if (localJar == null) {
             throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
         }
 
-        try (NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser)) {
+        try {
             String uploadLocation = client.getClient().beginFileUpload();
             LOG.info("Uploading topology jar " + localJar + " to assigned location: " + uploadLocation);
             BufferFileInputStream is = new BufferFileInputStream(localJar, THRIFT_CHUNK_SIZE_BYTES);
@@ -512,6 +509,16 @@ public class StormSubmitter {
             throw new RuntimeException(e);
         }
     }
+    
+    public static String submitJarAs(Map conf, String localJar, ProgressListener listener, String asUser) {
+        if (localJar == null) {
+            throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
+        }
+
+        try (NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser)) {
+            return submitJarAs(conf, localJar, listener, client);
+        }
+    }
 
     /**
      * Submit jar file
@@ -521,7 +528,7 @@ public class StormSubmitter {
      * @return the remote location of the submitted jar
      */
     public static String submitJar(Map conf, String localJar, ProgressListener listener) {
-        return submitJarAs(conf,localJar, listener, null);
+        return submitJarAs(conf,localJar, listener, (String)null);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/897a6d2b/storm-core/src/jvm/org/apache/storm/dependency/DependencyUploader.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/dependency/DependencyUploader.java b/storm-core/src/jvm/org/apache/storm/dependency/DependencyUploader.java
index 4f71f67..636f454 100644
--- a/storm-core/src/jvm/org/apache/storm/dependency/DependencyUploader.java
+++ b/storm-core/src/jvm/org/apache/storm/dependency/DependencyUploader.java
@@ -18,7 +18,6 @@
 package org.apache.storm.dependency;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang.StringUtils;
 import org.apache.storm.blobstore.AtomicOutputStream;
 import org.apache.storm.blobstore.BlobStoreUtils;
 import org.apache.storm.blobstore.ClientBlobStore;
@@ -38,7 +37,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 
 public class DependencyUploader {
     public static final Logger LOG = LoggerFactory.getLogger(DependencyUploader.class);
@@ -51,9 +49,7 @@ public class DependencyUploader {
     }
 
     public void init() {
-        if (blobStore == null) {
-            blobStore = Utils.getClientBlobStore(conf);
-        }
+        //NOOP
     }
 
     public void shutdown() {
@@ -67,7 +63,13 @@ public class DependencyUploader {
         this.blobStore = blobStore;
     }
 
-    @SuppressWarnings("unchecked")
+    private synchronized ClientBlobStore getBlobStore() {
+        if (blobStore == null) {
+            blobStore = Utils.getClientBlobStore(conf);
+        }
+        return blobStore;
+    }
+
     public List<String> uploadFiles(List<File> dependencies, boolean cleanupIfFails) throws IOException, AuthorizationException {
         checkFilesExist(dependencies);
 
@@ -87,7 +89,7 @@ public class DependencyUploader {
                 keys.add(key);
             }
         } catch (Throwable e) {
-            if (blobStore != null && cleanupIfFails) {
+            if (getBlobStore() != null && cleanupIfFails) {
                 deleteBlobs(keys);
             }
             throw new RuntimeException(e);
@@ -124,7 +126,7 @@ public class DependencyUploader {
     public void deleteBlobs(List<String> keys) {
         for (String key : keys) {
             try {
-                blobStore.deleteBlob(key);
+                getBlobStore().deleteBlob(key);
             } catch (Throwable e) {
                 LOG.warn("blob delete failed - key: {} continue...", key);
             }
@@ -142,10 +144,10 @@ public class DependencyUploader {
         try {
             // FIXME: we can filter by listKeys() with local blobstore when STORM-1986 is going to be resolved
             // as a workaround, we call getBlobMeta() for all keys
-            blobStore.getBlobMeta(key);
+            getBlobStore().getBlobMeta(key);
         } catch (KeyNotFoundException e) {
             // TODO: do we want to add ACL here?
-            AtomicOutputStream blob = blobStore
+            AtomicOutputStream blob = getBlobStore()
                     .createBlob(key, new SettableBlobMeta(new ArrayList<AccessControl>()));
             Files.copy(dependency.toPath(), blob);
             blob.close();