You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/12/20 08:56:34 UTC
[1/8] storm git commit: STORM-2190: reduce contention between
submission and scheduling
Repository: storm
Updated Branches:
refs/heads/1.x-branch b515356c1 -> 4154490f7
STORM-2190: reduce contention between submission and scheduling
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cea8beaa
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cea8beaa
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cea8beaa
Branch: refs/heads/1.x-branch
Commit: cea8beaaa50b432277db4302d3c7ac5c1970bdc2
Parents: d06c8d5
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Nov 7 17:08:07 2016 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Dec 5 06:06:14 2016 -0600
----------------------------------------------------------------------
.../apache/storm/starter/WordCountTopology.java | 4 +-
.../org/apache/storm/command/kill_topology.clj | 8 ++--
.../src/clj/org/apache/storm/daemon/nimbus.clj | 49 ++++++++++----------
3 files changed, 31 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/cea8beaa/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
index e4a5711..b47981d 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
@@ -90,7 +90,9 @@ public class WordCountTopology {
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
- StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
+ for (String name: args) {
+ StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
+ }
}
else {
conf.setMaxTaskParallelism(3);
http://git-wip-us.apache.org/repos/asf/storm/blob/cea8beaa/storm-core/src/clj/org/apache/storm/command/kill_topology.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/kill_topology.clj b/storm-core/src/clj/org/apache/storm/command/kill_topology.clj
index 84e0a64..9fbdc6c 100644
--- a/storm-core/src/clj/org/apache/storm/command/kill_topology.clj
+++ b/storm-core/src/clj/org/apache/storm/command/kill_topology.clj
@@ -20,10 +20,10 @@
(:gen-class))
(defn -main [& args]
- (let [[{wait :wait} [name] _] (cli args ["-w" "--wait" :default nil :parse-fn #(Integer/parseInt %)])
+ (let [[{wait :wait} names] (cli args ["-w" "--wait" :default nil :parse-fn #(Integer/parseInt %)])
opts (KillOptions.)]
(if wait (.set_wait_secs opts wait))
(with-configured-nimbus-connection nimbus
- (.killTopologyWithOpts nimbus name opts)
- (log-message "Killed topology: " name)
- )))
+ (doseq [name names]
+ (.killTopologyWithOpts nimbus name opts)
+ (log-message "Killed topology: " name)))))
http://git-wip-us.apache.org/repos/asf/storm/blob/cea8beaa/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 46ca121..3682ecf 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -193,6 +193,7 @@
NIMBUS-ZK-ACLS)
:context (ClusterStateContext. DaemonType/NIMBUS))
:submit-lock (Object.)
+ :sched-lock (Object.)
:cred-update-lock (Object.)
:log-update-lock (Object.)
:heartbeats-cache (atom {})
@@ -963,9 +964,8 @@
storm-cluster-state (:storm-cluster-state nimbus)
^INimbus inimbus (:inimbus nimbus)
;; read all the topologies
- topology-ids (.active-storms storm-cluster-state)
- topologies (into {} (for [tid topology-ids]
- {tid (read-topology-details nimbus tid)}))
+ topologies (locking (:submit-lock nimbus) (into {} (for [tid (.active-storms storm-cluster-state)]
+ {tid (read-topology-details nimbus tid)})))
topologies (Topologies. topologies)
;; read all the assignments
assigned-topology-ids (.assignments storm-cluster-state nil)
@@ -976,11 +976,11 @@
(when (or (nil? scratch-topology-id) (not= tid scratch-topology-id))
{tid (.assignment-info storm-cluster-state tid nil)})))
;; make the new assignments for topologies
- new-scheduler-assignments (compute-new-scheduler-assignments
+ new-scheduler-assignments (locking (:sched-lock nimbus) (compute-new-scheduler-assignments
nimbus
existing-assignments
topologies
- scratch-topology-id)
+ scratch-topology-id))
topology->executor->node+port (compute-new-topology->executor->node+port new-scheduler-assignments existing-assignments)
topology->executor->node+port (merge (into {} (for [id assigned-topology-ids] {id nil})) topology->executor->node+port)
@@ -1020,22 +1020,23 @@
(reset! (:id->worker-resources nimbus) {}))
;; tasks figure out what tasks to talk to by looking at topology at runtime
;; only log/set when there's been a change to the assignment
- (doseq [[topology-id assignment] new-assignments
- :let [existing-assignment (get existing-assignments topology-id)
- topology-details (.getById topologies topology-id)]]
- (if (= existing-assignment assignment)
- (log-debug "Assignment for " topology-id " hasn't changed")
- (do
- (log-message "Setting new assignment for topology id " topology-id ": " (pr-str assignment))
- (.set-assignment! storm-cluster-state topology-id assignment)
- )))
- (->> new-assignments
- (map (fn [[topology-id assignment]]
- (let [existing-assignment (get existing-assignments topology-id)]
- [topology-id (map to-worker-slot (newly-added-slots existing-assignment assignment))]
- )))
- (into {})
- (.assignSlots inimbus topologies)))
+ (locking (:sched-lock nimbus)
+ (doseq [[topology-id assignment] new-assignments
+ :let [existing-assignment (get existing-assignments topology-id)
+ topology-details (.getById topologies topology-id)]]
+ (if (= existing-assignment assignment)
+ (log-debug "Assignment for " topology-id " hasn't changed")
+ (do
+ (log-message "Setting new assignment for topology id " topology-id ": " (pr-str assignment))
+ (.set-assignment! storm-cluster-state topology-id assignment)
+ )))
+ (->> new-assignments
+ (map (fn [[topology-id assignment]]
+ (let [existing-assignment (get existing-assignments topology-id)]
+ [topology-id (map to-worker-slot (newly-added-slots existing-assignment assignment))]
+ )))
+ (into {})
+ (.assignSlots inimbus topologies))))
(log-message "not a leader, skipping assignments")))
(defn notify-topology-action-listener [nimbus storm-id action]
@@ -1726,8 +1727,7 @@
)]
(transition-name! nimbus storm-name [:kill wait-amt] true)
(notify-topology-action-listener nimbus storm-name operation))
- (add-topology-to-history-log (get-storm-id (:storm-cluster-state nimbus) storm-name)
- nimbus topology-conf)))
+ (add-topology-to-history-log storm-id nimbus topology-conf)))
(^void rebalance [this ^String storm-name ^RebalanceOptions options]
(mark! nimbus:num-rebalance-calls)
@@ -2402,8 +2402,7 @@
(conf NIMBUS-MONITOR-FREQ-SECS)
(fn []
(when-not (conf NIMBUS-DO-NOT-REASSIGN)
- (locking (:submit-lock nimbus)
- (mk-assignments nimbus)))
+ (mk-assignments nimbus))
(do-cleanup nimbus)))
;; Schedule Nimbus inbox cleaner
(schedule-recurring (:timer nimbus)
[2/8] storm git commit: Fixed Race
Posted by ka...@apache.org.
Fixed Race
Conflicts:
storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/93c1ea37
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/93c1ea37
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/93c1ea37
Branch: refs/heads/1.x-branch
Commit: 93c1ea37a0f38c147727d40e79de900f49e2e605
Parents: cea8bea
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Dec 2 08:46:38 2016 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Dec 5 06:26:12 2016 -0600
----------------------------------------------------------------------
.../src/clj/org/apache/storm/daemon/nimbus.clj | 114 +++++++++----------
1 file changed, 57 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/93c1ea37/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 3682ecf..e9d35a8 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -974,69 +974,69 @@
;; we exclude its assignment, meaning that all the slots occupied by its assignment
;; will be treated as free slot in the scheduler code.
(when (or (nil? scratch-topology-id) (not= tid scratch-topology-id))
- {tid (.assignment-info storm-cluster-state tid nil)})))
- ;; make the new assignments for topologies
- new-scheduler-assignments (locking (:sched-lock nimbus) (compute-new-scheduler-assignments
- nimbus
- existing-assignments
- topologies
- scratch-topology-id))
- topology->executor->node+port (compute-new-topology->executor->node+port new-scheduler-assignments existing-assignments)
-
- topology->executor->node+port (merge (into {} (for [id assigned-topology-ids] {id nil})) topology->executor->node+port)
- new-assigned-worker->resources (convert-assignments-to-worker->resources new-scheduler-assignments)
- now-secs (current-time-secs)
-
- basic-supervisor-details-map (basic-supervisor-details-map storm-cluster-state)
-
- ;; construct the final Assignments by adding start-times etc into it
- new-assignments (into {} (for [[topology-id executor->node+port] topology->executor->node+port
- :let [existing-assignment (get existing-assignments topology-id)
- all-nodes (->> executor->node+port vals (map first) set)
- node->host (->> all-nodes
- (mapcat (fn [node]
- (if-let [host (.getHostName inimbus basic-supervisor-details-map node)]
- [[node host]]
+ {tid (.assignment-info storm-cluster-state tid nil)})))]
+ ;; make the new assignments for topologies
+ (locking (:sched-lock nimbus) (let [
+ new-scheduler-assignments (compute-new-scheduler-assignments
+ nimbus
+ existing-assignments
+ topologies
+ scratch-topology-id)
+ topology->executor->node+port (compute-new-topology->executor->node+port new-scheduler-assignments existing-assignments)
+
+ topology->executor->node+port (merge (into {} (for [id assigned-topology-ids] {id nil})) topology->executor->node+port)
+ new-assigned-worker->resources (convert-assignments-to-worker->resources new-scheduler-assignments)
+ now-secs (current-time-secs)
+
+ basic-supervisor-details-map (basic-supervisor-details-map storm-cluster-state)
+
+ ;; construct the final Assignments by adding start-times etc into it
+ new-assignments (into {} (for [[topology-id executor->node+port] topology->executor->node+port
+ :let [existing-assignment (get existing-assignments topology-id)
+ all-nodes (->> executor->node+port vals (map first) set)
+ node->host (->> all-nodes
+ (mapcat (fn [node]
+ (if-let [host (.getHostName inimbus basic-supervisor-details-map node)]
+ [[node host]]
+ )))
+ (into {}))
+ all-node->host (merge (:node->host existing-assignment) node->host)
+ reassign-executors (changed-executors (:executor->node+port existing-assignment) executor->node+port)
+ start-times (merge (:executor->start-time-secs existing-assignment)
+ (into {}
+ (for [id reassign-executors]
+ [id now-secs]
)))
- (into {}))
- all-node->host (merge (:node->host existing-assignment) node->host)
- reassign-executors (changed-executors (:executor->node+port existing-assignment) executor->node+port)
- start-times (merge (:executor->start-time-secs existing-assignment)
- (into {}
- (for [id reassign-executors]
- [id now-secs]
- )))
- worker->resources (get new-assigned-worker->resources topology-id)]]
- {topology-id (Assignment.
- (conf STORM-LOCAL-DIR)
- (select-keys all-node->host all-nodes)
- executor->node+port
- start-times
- worker->resources)}))]
-
- (when (not= new-assignments existing-assignments)
- (log-debug "RESETTING id->resources and id->worker-resources cache!")
- (reset! (:id->resources nimbus) {})
- (reset! (:id->worker-resources nimbus) {}))
- ;; tasks figure out what tasks to talk to by looking at topology at runtime
- ;; only log/set when there's been a change to the assignment
- (locking (:sched-lock nimbus)
- (doseq [[topology-id assignment] new-assignments
- :let [existing-assignment (get existing-assignments topology-id)
- topology-details (.getById topologies topology-id)]]
- (if (= existing-assignment assignment)
- (log-debug "Assignment for " topology-id " hasn't changed")
- (do
- (log-message "Setting new assignment for topology id " topology-id ": " (pr-str assignment))
- (.set-assignment! storm-cluster-state topology-id assignment)
- )))
- (->> new-assignments
+ worker->resources (get new-assigned-worker->resources topology-id)]]
+ {topology-id (Assignment.
+ (conf STORM-LOCAL-DIR)
+ (select-keys all-node->host all-nodes)
+ executor->node+port
+ start-times
+ worker->resources)}))]
+
+ (when (not= new-assignments existing-assignments)
+ (log-debug "RESETTING id->resources and id->worker-resources cache!")
+ (reset! (:id->resources nimbus) {})
+ (reset! (:id->worker-resources nimbus) {}))
+ ;; tasks figure out what tasks to talk to by looking at topology at runtime
+ ;; only log/set when there's been a change to the assignment
+ (doseq [[topology-id assignment] new-assignments
+ :let [existing-assignment (get existing-assignments topology-id)
+ topology-details (.getById topologies topology-id)]]
+ (if (= existing-assignment assignment)
+ (log-debug "Assignment for " topology-id " hasn't changed")
+ (do
+ (log-message "Setting new assignment for topology id " topology-id ": " (pr-str assignment))
+ (.set-assignment! storm-cluster-state topology-id assignment)
+ )))
+ (->> new-assignments
(map (fn [[topology-id assignment]]
(let [existing-assignment (get existing-assignments topology-id)]
[topology-id (map to-worker-slot (newly-added-slots existing-assignment assignment))]
)))
(into {})
- (.assignSlots inimbus topologies))))
+ (.assignSlots inimbus topologies)))))
(log-message "not a leader, skipping assignments")))
(defn notify-topology-action-listener [nimbus storm-id action]
[4/8] storm git commit: Added in some optimizations for better
topology submission performance
Posted by ka...@apache.org.
http://git-wip-us.apache.org/repos/asf/storm/blob/897a6d2b/storm-core/src/jvm/org/apache/storm/generated/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/generated/Nimbus.java b/storm-core/src/jvm/org/apache/storm/generated/Nimbus.java
index ba9e66a..22a7205 100644
--- a/storm-core/src/jvm/org/apache/storm/generated/Nimbus.java
+++ b/storm-core/src/jvm/org/apache/storm/generated/Nimbus.java
@@ -136,6 +136,10 @@ public class Nimbus {
public ClusterSummary getClusterInfo() throws AuthorizationException, org.apache.thrift.TException;
+ public NimbusSummary getLeader() throws AuthorizationException, org.apache.thrift.TException;
+
+ public boolean isTopologyNameAllowed(String name) throws AuthorizationException, org.apache.thrift.TException;
+
public TopologyInfo getTopologyInfo(String id) throws NotAliveException, AuthorizationException, org.apache.thrift.TException;
public TopologyInfo getTopologyInfoWithOpts(String id, GetInfoOptions options) throws NotAliveException, AuthorizationException, org.apache.thrift.TException;
@@ -236,6 +240,10 @@ public class Nimbus {
public void getClusterInfo(org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
+ public void getLeader(org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
+
+ public void isTopologyNameAllowed(String name, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
+
public void getTopologyInfo(String id, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
public void getTopologyInfoWithOpts(String id, GetInfoOptions options, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
@@ -1167,6 +1175,57 @@ public class Nimbus {
throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "getClusterInfo failed: unknown result");
}
+ public NimbusSummary getLeader() throws AuthorizationException, org.apache.thrift.TException
+ {
+ send_getLeader();
+ return recv_getLeader();
+ }
+
+ public void send_getLeader() throws org.apache.thrift.TException
+ {
+ getLeader_args args = new getLeader_args();
+ sendBase("getLeader", args);
+ }
+
+ public NimbusSummary recv_getLeader() throws AuthorizationException, org.apache.thrift.TException
+ {
+ getLeader_result result = new getLeader_result();
+ receiveBase(result, "getLeader");
+ if (result.is_set_success()) {
+ return result.success;
+ }
+ if (result.aze != null) {
+ throw result.aze;
+ }
+ throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "getLeader failed: unknown result");
+ }
+
+ public boolean isTopologyNameAllowed(String name) throws AuthorizationException, org.apache.thrift.TException
+ {
+ send_isTopologyNameAllowed(name);
+ return recv_isTopologyNameAllowed();
+ }
+
+ public void send_isTopologyNameAllowed(String name) throws org.apache.thrift.TException
+ {
+ isTopologyNameAllowed_args args = new isTopologyNameAllowed_args();
+ args.set_name(name);
+ sendBase("isTopologyNameAllowed", args);
+ }
+
+ public boolean recv_isTopologyNameAllowed() throws AuthorizationException, org.apache.thrift.TException
+ {
+ isTopologyNameAllowed_result result = new isTopologyNameAllowed_result();
+ receiveBase(result, "isTopologyNameAllowed");
+ if (result.is_set_success()) {
+ return result.success;
+ }
+ if (result.aze != null) {
+ throw result.aze;
+ }
+ throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "isTopologyNameAllowed failed: unknown result");
+ }
+
public TopologyInfo getTopologyInfo(String id) throws NotAliveException, AuthorizationException, org.apache.thrift.TException
{
send_getTopologyInfo(id);
@@ -2596,6 +2655,67 @@ public class Nimbus {
}
}
+ public void getLeader(org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
+ checkReady();
+ getLeader_call method_call = new getLeader_call(resultHandler, this, ___protocolFactory, ___transport);
+ this.___currentMethod = method_call;
+ ___manager.call(method_call);
+ }
+
+ public static class getLeader_call extends org.apache.thrift.async.TAsyncMethodCall {
+ public getLeader_call(org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+ super(client, protocolFactory, transport, resultHandler, false);
+ }
+
+ public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
+ prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("getLeader", org.apache.thrift.protocol.TMessageType.CALL, 0));
+ getLeader_args args = new getLeader_args();
+ args.write(prot);
+ prot.writeMessageEnd();
+ }
+
+ public NimbusSummary getResult() throws AuthorizationException, org.apache.thrift.TException {
+ if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
+ throw new IllegalStateException("Method call not finished!");
+ }
+ org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
+ org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
+ return (new Client(prot)).recv_getLeader();
+ }
+ }
+
+ public void isTopologyNameAllowed(String name, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
+ checkReady();
+ isTopologyNameAllowed_call method_call = new isTopologyNameAllowed_call(name, resultHandler, this, ___protocolFactory, ___transport);
+ this.___currentMethod = method_call;
+ ___manager.call(method_call);
+ }
+
+ public static class isTopologyNameAllowed_call extends org.apache.thrift.async.TAsyncMethodCall {
+ private String name;
+ public isTopologyNameAllowed_call(String name, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+ super(client, protocolFactory, transport, resultHandler, false);
+ this.name = name;
+ }
+
+ public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
+ prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("isTopologyNameAllowed", org.apache.thrift.protocol.TMessageType.CALL, 0));
+ isTopologyNameAllowed_args args = new isTopologyNameAllowed_args();
+ args.set_name(name);
+ args.write(prot);
+ prot.writeMessageEnd();
+ }
+
+ public boolean getResult() throws AuthorizationException, org.apache.thrift.TException {
+ if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
+ throw new IllegalStateException("Method call not finished!");
+ }
+ org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
+ org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
+ return (new Client(prot)).recv_isTopologyNameAllowed();
+ }
+ }
+
public void getTopologyInfo(String id, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
checkReady();
getTopologyInfo_call method_call = new getTopologyInfo_call(id, resultHandler, this, ___protocolFactory, ___transport);
@@ -2955,6 +3075,8 @@ public class Nimbus {
processMap.put("downloadChunk", new downloadChunk());
processMap.put("getNimbusConf", new getNimbusConf());
processMap.put("getClusterInfo", new getClusterInfo());
+ processMap.put("getLeader", new getLeader());
+ processMap.put("isTopologyNameAllowed", new isTopologyNameAllowed());
processMap.put("getTopologyInfo", new getTopologyInfo());
processMap.put("getTopologyInfoWithOpts", new getTopologyInfoWithOpts());
processMap.put("getTopologyPageInfo", new getTopologyPageInfo());
@@ -3803,6 +3925,55 @@ public class Nimbus {
}
}
+ public static class getLeader<I extends Iface> extends org.apache.thrift.ProcessFunction<I, getLeader_args> {
+ public getLeader() {
+ super("getLeader");
+ }
+
+ public getLeader_args getEmptyArgsInstance() {
+ return new getLeader_args();
+ }
+
+ protected boolean isOneway() {
+ return false;
+ }
+
+ public getLeader_result getResult(I iface, getLeader_args args) throws org.apache.thrift.TException {
+ getLeader_result result = new getLeader_result();
+ try {
+ result.success = iface.getLeader();
+ } catch (AuthorizationException aze) {
+ result.aze = aze;
+ }
+ return result;
+ }
+ }
+
+ public static class isTopologyNameAllowed<I extends Iface> extends org.apache.thrift.ProcessFunction<I, isTopologyNameAllowed_args> {
+ public isTopologyNameAllowed() {
+ super("isTopologyNameAllowed");
+ }
+
+ public isTopologyNameAllowed_args getEmptyArgsInstance() {
+ return new isTopologyNameAllowed_args();
+ }
+
+ protected boolean isOneway() {
+ return false;
+ }
+
+ public isTopologyNameAllowed_result getResult(I iface, isTopologyNameAllowed_args args) throws org.apache.thrift.TException {
+ isTopologyNameAllowed_result result = new isTopologyNameAllowed_result();
+ try {
+ result.success = iface.isTopologyNameAllowed(args.name);
+ result.set_success_isSet(true);
+ } catch (AuthorizationException aze) {
+ result.aze = aze;
+ }
+ return result;
+ }
+ }
+
public static class getTopologyInfo<I extends Iface> extends org.apache.thrift.ProcessFunction<I, getTopologyInfo_args> {
public getTopologyInfo() {
super("getTopologyInfo");
@@ -4082,6 +4253,8 @@ public class Nimbus {
processMap.put("downloadChunk", new downloadChunk());
processMap.put("getNimbusConf", new getNimbusConf());
processMap.put("getClusterInfo", new getClusterInfo());
+ processMap.put("getLeader", new getLeader());
+ processMap.put("isTopologyNameAllowed", new isTopologyNameAllowed());
processMap.put("getTopologyInfo", new getTopologyInfo());
processMap.put("getTopologyInfoWithOpts", new getTopologyInfoWithOpts());
processMap.put("getTopologyPageInfo", new getTopologyPageInfo());
@@ -6084,20 +6257,20 @@ public class Nimbus {
}
}
- public static class getTopologyInfo<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, getTopologyInfo_args, TopologyInfo> {
- public getTopologyInfo() {
- super("getTopologyInfo");
+ public static class getLeader<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, getLeader_args, NimbusSummary> {
+ public getLeader() {
+ super("getLeader");
}
- public getTopologyInfo_args getEmptyArgsInstance() {
- return new getTopologyInfo_args();
+ public getLeader_args getEmptyArgsInstance() {
+ return new getLeader_args();
}
- public AsyncMethodCallback<TopologyInfo> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
+ public AsyncMethodCallback<NimbusSummary> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
final org.apache.thrift.AsyncProcessFunction fcall = this;
- return new AsyncMethodCallback<TopologyInfo>() {
- public void onComplete(TopologyInfo o) {
- getTopologyInfo_result result = new getTopologyInfo_result();
+ return new AsyncMethodCallback<NimbusSummary>() {
+ public void onComplete(NimbusSummary o) {
+ getLeader_result result = new getLeader_result();
result.success = o;
try {
fcall.sendResponse(fb,result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);
@@ -6110,13 +6283,66 @@ public class Nimbus {
public void onError(Exception e) {
byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
org.apache.thrift.TBase msg;
- getTopologyInfo_result result = new getTopologyInfo_result();
- if (e instanceof NotAliveException) {
- result.e = (NotAliveException) e;
- result.set_e_isSet(true);
+ getLeader_result result = new getLeader_result();
+ if (e instanceof AuthorizationException) {
+ result.aze = (AuthorizationException) e;
+ result.set_aze_isSet(true);
msg = result;
}
- else if (e instanceof AuthorizationException) {
+ else
+ {
+ msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
+ msg = (org.apache.thrift.TBase)new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage());
+ }
+ try {
+ fcall.sendResponse(fb,msg,msgType,seqid);
+ return;
+ } catch (Exception ex) {
+ LOGGER.error("Exception writing to internal frame buffer", ex);
+ }
+ fb.close();
+ }
+ };
+ }
+
+ protected boolean isOneway() {
+ return false;
+ }
+
+ public void start(I iface, getLeader_args args, org.apache.thrift.async.AsyncMethodCallback<NimbusSummary> resultHandler) throws TException {
+ iface.getLeader(resultHandler);
+ }
+ }
+
+ public static class isTopologyNameAllowed<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, isTopologyNameAllowed_args, Boolean> {
+ public isTopologyNameAllowed() {
+ super("isTopologyNameAllowed");
+ }
+
+ public isTopologyNameAllowed_args getEmptyArgsInstance() {
+ return new isTopologyNameAllowed_args();
+ }
+
+ public AsyncMethodCallback<Boolean> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
+ final org.apache.thrift.AsyncProcessFunction fcall = this;
+ return new AsyncMethodCallback<Boolean>() {
+ public void onComplete(Boolean o) {
+ isTopologyNameAllowed_result result = new isTopologyNameAllowed_result();
+ result.success = o;
+ result.set_success_isSet(true);
+ try {
+ fcall.sendResponse(fb,result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);
+ return;
+ } catch (Exception e) {
+ LOGGER.error("Exception writing to internal frame buffer", e);
+ }
+ fb.close();
+ }
+ public void onError(Exception e) {
+ byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
+ org.apache.thrift.TBase msg;
+ isTopologyNameAllowed_result result = new isTopologyNameAllowed_result();
+ if (e instanceof AuthorizationException) {
result.aze = (AuthorizationException) e;
result.set_aze_isSet(true);
msg = result;
@@ -6141,25 +6367,25 @@ public class Nimbus {
return false;
}
- public void start(I iface, getTopologyInfo_args args, org.apache.thrift.async.AsyncMethodCallback<TopologyInfo> resultHandler) throws TException {
- iface.getTopologyInfo(args.id,resultHandler);
+ public void start(I iface, isTopologyNameAllowed_args args, org.apache.thrift.async.AsyncMethodCallback<Boolean> resultHandler) throws TException {
+ iface.isTopologyNameAllowed(args.name,resultHandler);
}
}
- public static class getTopologyInfoWithOpts<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, getTopologyInfoWithOpts_args, TopologyInfo> {
- public getTopologyInfoWithOpts() {
- super("getTopologyInfoWithOpts");
+ public static class getTopologyInfo<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, getTopologyInfo_args, TopologyInfo> {
+ public getTopologyInfo() {
+ super("getTopologyInfo");
}
- public getTopologyInfoWithOpts_args getEmptyArgsInstance() {
- return new getTopologyInfoWithOpts_args();
+ public getTopologyInfo_args getEmptyArgsInstance() {
+ return new getTopologyInfo_args();
}
public AsyncMethodCallback<TopologyInfo> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
final org.apache.thrift.AsyncProcessFunction fcall = this;
return new AsyncMethodCallback<TopologyInfo>() {
public void onComplete(TopologyInfo o) {
- getTopologyInfoWithOpts_result result = new getTopologyInfoWithOpts_result();
+ getTopologyInfo_result result = new getTopologyInfo_result();
result.success = o;
try {
fcall.sendResponse(fb,result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);
@@ -6172,7 +6398,7 @@ public class Nimbus {
public void onError(Exception e) {
byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
org.apache.thrift.TBase msg;
- getTopologyInfoWithOpts_result result = new getTopologyInfoWithOpts_result();
+ getTopologyInfo_result result = new getTopologyInfo_result();
if (e instanceof NotAliveException) {
result.e = (NotAliveException) e;
result.set_e_isSet(true);
@@ -6203,25 +6429,25 @@ public class Nimbus {
return false;
}
- public void start(I iface, getTopologyInfoWithOpts_args args, org.apache.thrift.async.AsyncMethodCallback<TopologyInfo> resultHandler) throws TException {
- iface.getTopologyInfoWithOpts(args.id, args.options,resultHandler);
+ public void start(I iface, getTopologyInfo_args args, org.apache.thrift.async.AsyncMethodCallback<TopologyInfo> resultHandler) throws TException {
+ iface.getTopologyInfo(args.id,resultHandler);
}
}
- public static class getTopologyPageInfo<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, getTopologyPageInfo_args, TopologyPageInfo> {
- public getTopologyPageInfo() {
- super("getTopologyPageInfo");
+ public static class getTopologyInfoWithOpts<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, getTopologyInfoWithOpts_args, TopologyInfo> {
+ public getTopologyInfoWithOpts() {
+ super("getTopologyInfoWithOpts");
}
- public getTopologyPageInfo_args getEmptyArgsInstance() {
- return new getTopologyPageInfo_args();
+ public getTopologyInfoWithOpts_args getEmptyArgsInstance() {
+ return new getTopologyInfoWithOpts_args();
}
- public AsyncMethodCallback<TopologyPageInfo> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
+ public AsyncMethodCallback<TopologyInfo> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
final org.apache.thrift.AsyncProcessFunction fcall = this;
- return new AsyncMethodCallback<TopologyPageInfo>() {
- public void onComplete(TopologyPageInfo o) {
- getTopologyPageInfo_result result = new getTopologyPageInfo_result();
+ return new AsyncMethodCallback<TopologyInfo>() {
+ public void onComplete(TopologyInfo o) {
+ getTopologyInfoWithOpts_result result = new getTopologyInfoWithOpts_result();
result.success = o;
try {
fcall.sendResponse(fb,result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);
@@ -6234,7 +6460,7 @@ public class Nimbus {
public void onError(Exception e) {
byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
org.apache.thrift.TBase msg;
- getTopologyPageInfo_result result = new getTopologyPageInfo_result();
+ getTopologyInfoWithOpts_result result = new getTopologyInfoWithOpts_result();
if (e instanceof NotAliveException) {
result.e = (NotAliveException) e;
result.set_e_isSet(true);
@@ -6265,25 +6491,25 @@ public class Nimbus {
return false;
}
- public void start(I iface, getTopologyPageInfo_args args, org.apache.thrift.async.AsyncMethodCallback<TopologyPageInfo> resultHandler) throws TException {
- iface.getTopologyPageInfo(args.id, args.window, args.is_include_sys,resultHandler);
+ public void start(I iface, getTopologyInfoWithOpts_args args, org.apache.thrift.async.AsyncMethodCallback<TopologyInfo> resultHandler) throws TException {
+ iface.getTopologyInfoWithOpts(args.id, args.options,resultHandler);
}
}
- public static class getSupervisorPageInfo<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, getSupervisorPageInfo_args, SupervisorPageInfo> {
- public getSupervisorPageInfo() {
- super("getSupervisorPageInfo");
+ public static class getTopologyPageInfo<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, getTopologyPageInfo_args, TopologyPageInfo> {
+ public getTopologyPageInfo() {
+ super("getTopologyPageInfo");
}
- public getSupervisorPageInfo_args getEmptyArgsInstance() {
- return new getSupervisorPageInfo_args();
+ public getTopologyPageInfo_args getEmptyArgsInstance() {
+ return new getTopologyPageInfo_args();
}
- public AsyncMethodCallback<SupervisorPageInfo> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
+ public AsyncMethodCallback<TopologyPageInfo> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
final org.apache.thrift.AsyncProcessFunction fcall = this;
- return new AsyncMethodCallback<SupervisorPageInfo>() {
- public void onComplete(SupervisorPageInfo o) {
- getSupervisorPageInfo_result result = new getSupervisorPageInfo_result();
+ return new AsyncMethodCallback<TopologyPageInfo>() {
+ public void onComplete(TopologyPageInfo o) {
+ getTopologyPageInfo_result result = new getTopologyPageInfo_result();
result.success = o;
try {
fcall.sendResponse(fb,result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);
@@ -6296,7 +6522,7 @@ public class Nimbus {
public void onError(Exception e) {
byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
org.apache.thrift.TBase msg;
- getSupervisorPageInfo_result result = new getSupervisorPageInfo_result();
+ getTopologyPageInfo_result result = new getTopologyPageInfo_result();
if (e instanceof NotAliveException) {
result.e = (NotAliveException) e;
result.set_e_isSet(true);
@@ -6327,25 +6553,25 @@ public class Nimbus {
return false;
}
- public void start(I iface, getSupervisorPageInfo_args args, org.apache.thrift.async.AsyncMethodCallback<SupervisorPageInfo> resultHandler) throws TException {
- iface.getSupervisorPageInfo(args.id, args.host, args.is_include_sys,resultHandler);
+ public void start(I iface, getTopologyPageInfo_args args, org.apache.thrift.async.AsyncMethodCallback<TopologyPageInfo> resultHandler) throws TException {
+ iface.getTopologyPageInfo(args.id, args.window, args.is_include_sys,resultHandler);
}
}
- public static class getComponentPageInfo<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, getComponentPageInfo_args, ComponentPageInfo> {
- public getComponentPageInfo() {
- super("getComponentPageInfo");
+ public static class getSupervisorPageInfo<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, getSupervisorPageInfo_args, SupervisorPageInfo> {
+ public getSupervisorPageInfo() {
+ super("getSupervisorPageInfo");
}
- public getComponentPageInfo_args getEmptyArgsInstance() {
- return new getComponentPageInfo_args();
+ public getSupervisorPageInfo_args getEmptyArgsInstance() {
+ return new getSupervisorPageInfo_args();
}
- public AsyncMethodCallback<ComponentPageInfo> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
+ public AsyncMethodCallback<SupervisorPageInfo> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
final org.apache.thrift.AsyncProcessFunction fcall = this;
- return new AsyncMethodCallback<ComponentPageInfo>() {
- public void onComplete(ComponentPageInfo o) {
- getComponentPageInfo_result result = new getComponentPageInfo_result();
+ return new AsyncMethodCallback<SupervisorPageInfo>() {
+ public void onComplete(SupervisorPageInfo o) {
+ getSupervisorPageInfo_result result = new getSupervisorPageInfo_result();
result.success = o;
try {
fcall.sendResponse(fb,result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);
@@ -6358,7 +6584,69 @@ public class Nimbus {
public void onError(Exception e) {
byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
org.apache.thrift.TBase msg;
- getComponentPageInfo_result result = new getComponentPageInfo_result();
+ getSupervisorPageInfo_result result = new getSupervisorPageInfo_result();
+ if (e instanceof NotAliveException) {
+ result.e = (NotAliveException) e;
+ result.set_e_isSet(true);
+ msg = result;
+ }
+ else if (e instanceof AuthorizationException) {
+ result.aze = (AuthorizationException) e;
+ result.set_aze_isSet(true);
+ msg = result;
+ }
+ else
+ {
+ msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
+ msg = (org.apache.thrift.TBase)new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage());
+ }
+ try {
+ fcall.sendResponse(fb,msg,msgType,seqid);
+ return;
+ } catch (Exception ex) {
+ LOGGER.error("Exception writing to internal frame buffer", ex);
+ }
+ fb.close();
+ }
+ };
+ }
+
+ protected boolean isOneway() {
+ return false;
+ }
+
+ public void start(I iface, getSupervisorPageInfo_args args, org.apache.thrift.async.AsyncMethodCallback<SupervisorPageInfo> resultHandler) throws TException {
+ iface.getSupervisorPageInfo(args.id, args.host, args.is_include_sys,resultHandler);
+ }
+ }
+
+ public static class getComponentPageInfo<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, getComponentPageInfo_args, ComponentPageInfo> {
+ public getComponentPageInfo() {
+ super("getComponentPageInfo");
+ }
+
+ public getComponentPageInfo_args getEmptyArgsInstance() {
+ return new getComponentPageInfo_args();
+ }
+
+ public AsyncMethodCallback<ComponentPageInfo> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
+ final org.apache.thrift.AsyncProcessFunction fcall = this;
+ return new AsyncMethodCallback<ComponentPageInfo>() {
+ public void onComplete(ComponentPageInfo o) {
+ getComponentPageInfo_result result = new getComponentPageInfo_result();
+ result.success = o;
+ try {
+ fcall.sendResponse(fb,result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);
+ return;
+ } catch (Exception e) {
+ LOGGER.error("Exception writing to internal frame buffer", e);
+ }
+ fb.close();
+ }
+ public void onError(Exception e) {
+ byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
+ org.apache.thrift.TBase msg;
+ getComponentPageInfo_result result = new getComponentPageInfo_result();
if (e instanceof NotAliveException) {
result.e = (NotAliveException) e;
result.set_e_isSet(true);
@@ -31990,22 +32278,1481 @@ public class Nimbus {
@Override
public String toString() {
- StringBuilder sb = new StringBuilder("uploadChunk_args(");
+ StringBuilder sb = new StringBuilder("uploadChunk_args(");
+ boolean first = true;
+
+ sb.append("location:");
+ if (this.location == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.location);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("chunk:");
+ if (this.chunk == null) {
+ sb.append("null");
+ } else {
+ org.apache.thrift.TBaseHelper.toString(this.chunk, sb);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class uploadChunk_argsStandardSchemeFactory implements SchemeFactory {
+ public uploadChunk_argsStandardScheme getScheme() {
+ return new uploadChunk_argsStandardScheme();
+ }
+ }
+
+ private static class uploadChunk_argsStandardScheme extends StandardScheme<uploadChunk_args> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, uploadChunk_args struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // LOCATION
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.location = iprot.readString();
+ struct.set_location_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 2: // CHUNK
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.chunk = iprot.readBinary();
+ struct.set_chunk_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, uploadChunk_args struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.location != null) {
+ oprot.writeFieldBegin(LOCATION_FIELD_DESC);
+ oprot.writeString(struct.location);
+ oprot.writeFieldEnd();
+ }
+ if (struct.chunk != null) {
+ oprot.writeFieldBegin(CHUNK_FIELD_DESC);
+ oprot.writeBinary(struct.chunk);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class uploadChunk_argsTupleSchemeFactory implements SchemeFactory {
+ public uploadChunk_argsTupleScheme getScheme() {
+ return new uploadChunk_argsTupleScheme();
+ }
+ }
+
+ private static class uploadChunk_argsTupleScheme extends TupleScheme<uploadChunk_args> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, uploadChunk_args struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ BitSet optionals = new BitSet();
+ if (struct.is_set_location()) {
+ optionals.set(0);
+ }
+ if (struct.is_set_chunk()) {
+ optionals.set(1);
+ }
+ oprot.writeBitSet(optionals, 2);
+ if (struct.is_set_location()) {
+ oprot.writeString(struct.location);
+ }
+ if (struct.is_set_chunk()) {
+ oprot.writeBinary(struct.chunk);
+ }
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, uploadChunk_args struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ BitSet incoming = iprot.readBitSet(2);
+ if (incoming.get(0)) {
+ struct.location = iprot.readString();
+ struct.set_location_isSet(true);
+ }
+ if (incoming.get(1)) {
+ struct.chunk = iprot.readBinary();
+ struct.set_chunk_isSet(true);
+ }
+ }
+ }
+
+ }
+
+ public static class uploadChunk_result implements org.apache.thrift.TBase<uploadChunk_result, uploadChunk_result._Fields>, java.io.Serializable, Cloneable, Comparable<uploadChunk_result> {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("uploadChunk_result");
+
+ private static final org.apache.thrift.protocol.TField AZE_FIELD_DESC = new org.apache.thrift.protocol.TField("aze", org.apache.thrift.protocol.TType.STRUCT, (short)1);
+
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new uploadChunk_resultStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new uploadChunk_resultTupleSchemeFactory());
+ }
+
+ private AuthorizationException aze; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ AZE((short)1, "aze");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // AZE
+ return AZE;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.AZE, new org.apache.thrift.meta_data.FieldMetaData("aze", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRUCT)));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(uploadChunk_result.class, metaDataMap);
+ }
+
+ public uploadChunk_result() {
+ }
+
+ public uploadChunk_result(
+ AuthorizationException aze)
+ {
+ this();
+ this.aze = aze;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public uploadChunk_result(uploadChunk_result other) {
+ if (other.is_set_aze()) {
+ this.aze = new AuthorizationException(other.aze);
+ }
+ }
+
+ public uploadChunk_result deepCopy() {
+ return new uploadChunk_result(this);
+ }
+
+ @Override
+ public void clear() {
+ this.aze = null;
+ }
+
+ public AuthorizationException get_aze() {
+ return this.aze;
+ }
+
+ public void set_aze(AuthorizationException aze) {
+ this.aze = aze;
+ }
+
+ public void unset_aze() {
+ this.aze = null;
+ }
+
+ /** Returns true if field aze is set (has been assigned a value) and false otherwise */
+ public boolean is_set_aze() {
+ return this.aze != null;
+ }
+
+ public void set_aze_isSet(boolean value) {
+ if (!value) {
+ this.aze = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case AZE:
+ if (value == null) {
+ unset_aze();
+ } else {
+ set_aze((AuthorizationException)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case AZE:
+ return get_aze();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case AZE:
+ return is_set_aze();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof uploadChunk_result)
+ return this.equals((uploadChunk_result)that);
+ return false;
+ }
+
+ public boolean equals(uploadChunk_result that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_aze = true && this.is_set_aze();
+ boolean that_present_aze = true && that.is_set_aze();
+ if (this_present_aze || that_present_aze) {
+ if (!(this_present_aze && that_present_aze))
+ return false;
+ if (!this.aze.equals(that.aze))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ List<Object> list = new ArrayList<Object>();
+
+ boolean present_aze = true && (is_set_aze());
+ list.add(present_aze);
+ if (present_aze)
+ list.add(aze);
+
+ return list.hashCode();
+ }
+
+ @Override
+ public int compareTo(uploadChunk_result other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+
+ lastComparison = Boolean.valueOf(is_set_aze()).compareTo(other.is_set_aze());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_aze()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.aze, other.aze);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("uploadChunk_result(");
+ boolean first = true;
+
+ sb.append("aze:");
+ if (this.aze == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.aze);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class uploadChunk_resultStandardSchemeFactory implements SchemeFactory {
+ public uploadChunk_resultStandardScheme getScheme() {
+ return new uploadChunk_resultStandardScheme();
+ }
+ }
+
+ private static class uploadChunk_resultStandardScheme extends StandardScheme<uploadChunk_result> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, uploadChunk_result struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // AZE
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
+ struct.aze = new AuthorizationException();
+ struct.aze.read(iprot);
+ struct.set_aze_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, uploadChunk_result struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.aze != null) {
+ oprot.writeFieldBegin(AZE_FIELD_DESC);
+ struct.aze.write(oprot);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class uploadChunk_resultTupleSchemeFactory implements SchemeFactory {
+ public uploadChunk_resultTupleScheme getScheme() {
+ return new uploadChunk_resultTupleScheme();
+ }
+ }
+
+ private static class uploadChunk_resultTupleScheme extends TupleScheme<uploadChunk_result> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, uploadChunk_result struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ BitSet optionals = new BitSet();
+ if (struct.is_set_aze()) {
+ optionals.set(0);
+ }
+ oprot.writeBitSet(optionals, 1);
+ if (struct.is_set_aze()) {
+ struct.aze.write(oprot);
+ }
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, uploadChunk_result struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ BitSet incoming = iprot.readBitSet(1);
+ if (incoming.get(0)) {
+ struct.aze = new AuthorizationException();
+ struct.aze.read(iprot);
+ struct.set_aze_isSet(true);
+ }
+ }
+ }
+
+ }
+
+ public static class finishFileUpload_args implements org.apache.thrift.TBase<finishFileUpload_args, finishFileUpload_args._Fields>, java.io.Serializable, Cloneable, Comparable<finishFileUpload_args> {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("finishFileUpload_args");
+
+ private static final org.apache.thrift.protocol.TField LOCATION_FIELD_DESC = new org.apache.thrift.protocol.TField("location", org.apache.thrift.protocol.TType.STRING, (short)1);
+
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new finishFileUpload_argsStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new finishFileUpload_argsTupleSchemeFactory());
+ }
+
+ private String location; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ LOCATION((short)1, "location");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // LOCATION
+ return LOCATION;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.LOCATION, new org.apache.thrift.meta_data.FieldMetaData("location", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(finishFileUpload_args.class, metaDataMap);
+ }
+
+ public finishFileUpload_args() {
+ }
+
+ public finishFileUpload_args(
+ String location)
+ {
+ this();
+ this.location = location;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public finishFileUpload_args(finishFileUpload_args other) {
+ if (other.is_set_location()) {
+ this.location = other.location;
+ }
+ }
+
+ public finishFileUpload_args deepCopy() {
+ return new finishFileUpload_args(this);
+ }
+
+ @Override
+ public void clear() {
+ this.location = null;
+ }
+
+ public String get_location() {
+ return this.location;
+ }
+
+ public void set_location(String location) {
+ this.location = location;
+ }
+
+ public void unset_location() {
+ this.location = null;
+ }
+
+ /** Returns true if field location is set (has been assigned a value) and false otherwise */
+ public boolean is_set_location() {
+ return this.location != null;
+ }
+
+ public void set_location_isSet(boolean value) {
+ if (!value) {
+ this.location = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case LOCATION:
+ if (value == null) {
+ unset_location();
+ } else {
+ set_location((String)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case LOCATION:
+ return get_location();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case LOCATION:
+ return is_set_location();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof finishFileUpload_args)
+ return this.equals((finishFileUpload_args)that);
+ return false;
+ }
+
+ public boolean equals(finishFileUpload_args that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_location = true && this.is_set_location();
+ boolean that_present_location = true && that.is_set_location();
+ if (this_present_location || that_present_location) {
+ if (!(this_present_location && that_present_location))
+ return false;
+ if (!this.location.equals(that.location))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ List<Object> list = new ArrayList<Object>();
+
+ boolean present_location = true && (is_set_location());
+ list.add(present_location);
+ if (present_location)
+ list.add(location);
+
+ return list.hashCode();
+ }
+
+ @Override
+ public int compareTo(finishFileUpload_args other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+
+ lastComparison = Boolean.valueOf(is_set_location()).compareTo(other.is_set_location());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_location()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.location, other.location);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("finishFileUpload_args(");
+ boolean first = true;
+
+ sb.append("location:");
+ if (this.location == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.location);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class finishFileUpload_argsStandardSchemeFactory implements SchemeFactory {
+ public finishFileUpload_argsStandardScheme getScheme() {
+ return new finishFileUpload_argsStandardScheme();
+ }
+ }
+
+ private static class finishFileUpload_argsStandardScheme extends StandardScheme<finishFileUpload_args> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, finishFileUpload_args struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // LOCATION
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.location = iprot.readString();
+ struct.set_location_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, finishFileUpload_args struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.location != null) {
+ oprot.writeFieldBegin(LOCATION_FIELD_DESC);
+ oprot.writeString(struct.location);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class finishFileUpload_argsTupleSchemeFactory implements SchemeFactory {
+ public finishFileUpload_argsTupleScheme getScheme() {
+ return new finishFileUpload_argsTupleScheme();
+ }
+ }
+
+ private static class finishFileUpload_argsTupleScheme extends TupleScheme<finishFileUpload_args> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, finishFileUpload_args struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ BitSet optionals = new BitSet();
+ if (struct.is_set_location()) {
+ optionals.set(0);
+ }
+ oprot.writeBitSet(optionals, 1);
+ if (struct.is_set_location()) {
+ oprot.writeString(struct.location);
+ }
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, finishFileUpload_args struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ BitSet incoming = iprot.readBitSet(1);
+ if (incoming.get(0)) {
+ struct.location = iprot.readString();
+ struct.set_location_isSet(true);
+ }
+ }
+ }
+
+ }
+
+ public static class finishFileUpload_result implements org.apache.thrift.TBase<finishFileUpload_result, finishFileUpload_result._Fields>, java.io.Serializable, Cloneable, Comparable<finishFileUpload_result> {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("finishFileUpload_result");
+
+ private static final org.apache.thrift.protocol.TField AZE_FIELD_DESC = new org.apache.thrift.protocol.TField("aze", org.apache.thrift.protocol.TType.STRUCT, (short)1);
+
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new finishFileUpload_resultStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new finishFileUpload_resultTupleSchemeFactory());
+ }
+
+ private AuthorizationException aze; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ AZE((short)1, "aze");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // AZE
+ return AZE;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.AZE, new org.apache.thrift.meta_data.FieldMetaData("aze", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRUCT)));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(finishFileUpload_result.class, metaDataMap);
+ }
+
+ public finishFileUpload_result() {
+ }
+
+ public finishFileUpload_result(
+ AuthorizationException aze)
+ {
+ this();
+ this.aze = aze;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public finishFileUpload_result(finishFileUpload_result other) {
+ if (other.is_set_aze()) {
+ this.aze = new AuthorizationException(other.aze);
+ }
+ }
+
+ public finishFileUpload_result deepCopy() {
+ return new finishFileUpload_result(this);
+ }
+
+ @Override
+ public void clear() {
+ this.aze = null;
+ }
+
+ public AuthorizationException get_aze() {
+ return this.aze;
+ }
+
+ public void set_aze(AuthorizationException aze) {
+ this.aze = aze;
+ }
+
+ public void unset_aze() {
+ this.aze = null;
+ }
+
+ /** Returns true if field aze is set (has been assigned a value) and false otherwise */
+ public boolean is_set_aze() {
+ return this.aze != null;
+ }
+
+ public void set_aze_isSet(boolean value) {
+ if (!value) {
+ this.aze = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case AZE:
+ if (value == null) {
+ unset_aze();
+ } else {
+ set_aze((AuthorizationException)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case AZE:
+ return get_aze();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case AZE:
+ return is_set_aze();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof finishFileUpload_result)
+ return this.equals((finishFileUpload_result)that);
+ return false;
+ }
+
+ public boolean equals(finishFileUpload_result that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_aze = true && this.is_set_aze();
+ boolean that_present_aze = true && that.is_set_aze();
+ if (this_present_aze || that_present_aze) {
+ if (!(this_present_aze && that_present_aze))
+ return false;
+ if (!this.aze.equals(that.aze))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ List<Object> list = new ArrayList<Object>();
+
+ boolean present_aze = true && (is_set_aze());
+ list.add(present_aze);
+ if (present_aze)
+ list.add(aze);
+
+ return list.hashCode();
+ }
+
+ @Override
+ public int compareTo(finishFileUpload_result other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+
+ lastComparison = Boolean.valueOf(is_set_aze()).compareTo(other.is_set_aze());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_aze()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.aze, other.aze);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("finishFileUpload_result(");
+ boolean first = true;
+
+ sb.append("aze:");
+ if (this.aze == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.aze);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class finishFileUpload_resultStandardSchemeFactory implements SchemeFactory {
+ public finishFileUpload_resultStandardScheme getScheme() {
+ return new finishFileUpload_resultStandardScheme();
+ }
+ }
+
+ private static class finishFileUpload_resultStandardScheme extends StandardScheme<finishFileUpload_result> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, finishFileUpload_result struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // AZE
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
+ struct.aze = new AuthorizationException();
+ struct.aze.read(iprot);
+ struct.set_aze_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, finishFileUpload_result struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.aze != null) {
+ oprot.writeFieldBegin(AZE_FIELD_DESC);
+ struct.aze.write(oprot);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class finishFileUpload_resultTupleSchemeFactory implements SchemeFactory {
+ public finishFileUpload_resultTupleScheme getScheme() {
+ return new finishFileUpload_resultTupleScheme();
+ }
+ }
+
+ private static class finishFileUpload_resultTupleScheme extends TupleScheme<finishFileUpload_result> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, finishFileUpload_result struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ BitSet optionals = new BitSet();
+ if (struct.is_set_aze()) {
+ optionals.set(0);
+ }
+ oprot.writeBitSet(optionals, 1);
+ if (struct.is_set_aze()) {
+ struct.aze.write(oprot);
+ }
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, finishFileUpload_result struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ BitSet incoming = iprot.readBitSet(1);
+ if (incoming.get(0)) {
+ struct.aze = new AuthorizationException();
+ struct.aze.read(iprot);
+ struct.set_aze_isSet(true);
+ }
+ }
+ }
+
+ }
+
+ public static class beginFileDownload_args implements org.apache.thrift.TBase<beginFileDownload_args, beginFileDownload_args._Fields>, java.io.Serializable, Cloneable, Comparable<beginFileDownload_args> {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("beginFileDownload_args");
+
+ private static final org.apache.thrift.protocol.TField FILE_FIELD_DESC = new org.apache.thrift.protocol.TField("file", org.apache.thrift.protocol.TType.STRING, (short)1);
+
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new beginFileDownload_argsStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new beginFileDownload_argsTupleSchemeFactory());
+ }
+
+ private String file; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ FILE((short)1, "file");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // FILE
+ return FILE;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.FILE, new org.apache.thrift.meta_data.FieldMetaData("file", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(beginFileDownload_args.class, metaDataMap);
+ }
+
+ public beginFileDownload_args() {
+ }
+
+ public beginFileDownload_args(
+ String file)
+ {
+ this();
+ this.file = file;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public beginFileDownload_args(beginFileDownload_args other) {
+ if (other.is_set_file()) {
+ this.file = other.file;
+ }
+ }
+
+ public beginFileDownload_args deepCopy() {
+ return new beginFileDownload_args(this);
+ }
+
+ @Override
+ public void clear() {
+ this.file = null;
+ }
+
+ public String get_file() {
+ return this.file;
+ }
+
+ public void set_file(String file) {
+ this.file = file;
+ }
+
+ public void unset_file() {
+ this.file = null;
+ }
+
+ /** Returns true if field file is set (has been assigned a value) and false otherwise */
+ public boolean is_set_file() {
+ return this.file != null;
+ }
+
+ public void set_file_isSet(boolean value) {
+ if (!value) {
+ this.file = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case FILE:
+ if (value == null) {
+ unset_file();
+ } else {
+ set_file((String)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case FILE:
+ return get_file();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case FILE:
+ return is_set_file();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof beginFileDownload_args)
+ return this.equals((beginFileDownload_args)that);
+ return false;
+ }
+
+ public boolean equals(beginFileDownload_args that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_file = true && this.is_set_file();
+ boolean that_present_file = true && that.is_set_file();
+ if (this_present_file || that_present_file) {
+ if (!(this_present_file && that_present_file))
+ return false;
+ if (!this.file.equals(that.file))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ List<Object> list = new ArrayList<Object>();
+
+ boolean present_file = true && (is_set_file());
+ list.add(present_file);
+ if (present_file)
+ list.add(file);
+
+ return list.hashCode();
+ }
+
+ @Override
+ public int compareTo(beginFileDownload_args other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+
+ lastComparison = Boolean.valueOf(is_set_file()).compareTo(other.is_set_file());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_file()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.file, other.file);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("beginFileDownload_args(");
boolean first = true;
- sb.append("location:");
- if (this.location == null) {
- sb.append("null");
- } else {
- sb.append(this.location);
- }
- first = false;
- if (!first) sb.append(", ");
- sb.append("chunk:");
- if (this.chunk == null) {
+ sb.append("file:");
+ if (this.file == null) {
sb.append("null");
} else {
- org.apache.thrift.TBaseHelper.toString(this.chunk, sb);
+ sb.append(this.file);
}
first = false;
sb.append(")");
@@ -32033,15 +33780,15 @@ public class Nimbus {
}
}
- private static class uploadChunk_argsStandardSchemeFactory implements SchemeFactory {
- public uploadChunk_argsStandardScheme getScheme() {
- return new uploadChunk_argsStandardScheme();
+ private static class beginFileDownload_argsStandardSchemeFactory implements SchemeFactory {
+ public beginFileDownload_argsStandardScheme getScheme() {
+ return new beginFileDownload_argsStandardScheme();
}
}
- private static class uploadChunk_argsStandardScheme extends StandardScheme<uploadChunk_args> {
+ private static class beginFileDownload_argsStandardScheme extends StandardScheme<beginFileDownload_args> {
- public void read(org.apache.thrift.protocol.TProtocol iprot, uploadChunk_args struct) throws org.apache.thrift.TException {
+ public void read(org.apache.thrift.protocol.TProtocol iprot, beginFileDownload_args struct) throws org.apache.thrift.TException {
org.apache.thrift.protocol.TField schemeField;
iprot.readStructBegin();
while (true)
@@ -32051,18 +33798,10 @@ public class Nimbus {
break;
}
switch (schemeField.id) {
- case 1: // LOCATION
- if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
- struct.location = iprot.readString();
- struct.set_location_isSet(true);
- } else {
- org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
- }
- break;
- case 2: // CHUNK
+ case 1: // FILE
if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
- struct.chunk = iprot.readBinary();
- struct.set_chunk_isSet(true);
+ struct.file = iprot.readString();
+ struct.set_file_isSet(true);
} else {
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@@ -32076,18 +33815,13 @@ public class Nimbus {
struct.validate();
}
- public void write(org.apache.thrift.protocol.TProtocol oprot, uploadChunk_args struct) throws org.apache.thrift.TException {
+ public void write(org.apache.thrift.protocol.TProtocol oprot, beginFileDownload_args struct) throws org.apache.thrift.TException {
struct.validate();
oprot.writeStructBegin(STRUCT_DESC);
- if (struct.location != null) {
- oprot.writeFieldBegin(LOCATION_FIELD_DESC);
- oprot.writeString(struct.location);
- oprot.writeFieldEnd();
- }
- if (struct.chunk != null) {
- oprot.writeFieldBegin(CHUNK_FIELD_DESC);
- oprot.writeBinary(struct.chunk);
+ if (struct.file != null) {
+ oprot.writeFieldBegin(FILE_FIELD_DESC);
+ oprot.writeString(struct.file);
oprot.writeFieldEnd();
}
oprot.writeFieldStop();
@@ -32096,65 +33830,58 @@ public class Nimbus {
}
- private static class uploadChunk_argsTupleSchemeFactory implements SchemeFactory {
- public uploadChunk_argsTupleScheme getScheme() {
- return new uploadChunk_argsTupleScheme();
+ private static class beginFileDownload_argsTupleSchemeFactory implements SchemeFactory {
+ public beginFileDownload_argsTupleScheme getScheme() {
+ return new beginFileDownload_argsTupleScheme();
}
}
- private static class uploadChunk_argsTupleScheme extends TupleScheme<uploadChunk_args> {
+ private static class beginFileDownload_argsTupleScheme extends TupleScheme<beginFileDownload_args> {
@Override
- public void write(org.apache.thrift.protocol.TProtocol prot, uploadChunk_args struct) throws org.apache.thrift.TException {
+ public void write(org.apache.thrift.protocol.TProtocol prot, beginFileDownload_args struct) throws org.apache.thrift.TException {
TTupleProtocol oprot = (TTupleProtocol) prot;
BitSet optionals = new BitSet();
- if (struct.is_set_location()) {
+ if (struct.is_set_file()) {
optionals.set(0);
}
- if (struct.is_set_chunk()) {
- optionals.set(1);
- }
- oprot.writeBitSet(optionals, 2);
- if (struct.is_set_location()) {
- oprot.writeString(struct.location);
- }
- if (struct.is_set_chunk()) {
- oprot.writeBinary(struct.chunk);
+ oprot.writeBitSet(optionals, 1);
+ if (struct.is_set_file()) {
+ oprot.writeString(struct.file);
}
}
@Override
- public void read(org.apache.thrift.protocol.TProtocol prot, uploadChunk_args struct) throws org.apache.thrift.TException {
+ public void read(org.apache.thrift.protocol.TProtocol prot, beginFileDownload_args struct) throws org.apache.thrift.TException {
TTupleProtocol iprot = (TTupleProtocol) prot;
- BitSet incoming = iprot.readBitSet(2);
+ BitSet incoming = iprot.readBitSet(1);
if (incoming.get(0)) {
- struct.location = iprot.readString();
- struct.set_location_isSet(true);
- }
- if (incoming.get(1)) {
- struct.chunk = iprot.readBinary();
- struct.set_chunk_isSet(true);
+ struct.file = iprot.readString();
+ struct.set_file_isSet(true);
}
}
}
}
- public static class uploadChunk_result implements org.apache.thrift.TBase<uploadChunk_result, uploadChunk_result._Fields>, java.io.Serializable, Cloneable, Comparable<uploadChunk_result> {
- private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("uploadChunk_result");
+ public static class beginFileDownload_result implements org.apache.thrift.TBase<beginFileDownload_result, beginFileDownload_result._Fields>, java.io.Serializable, Cloneable, Comparable<beginFileDownload_result> {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("beginFileDownload_result");
+ private static final org.apache.thrift.protocol.TField SUCCESS_FIELD_DESC = new org.apache.thrift.protocol.TField("success", org.apache.thrift.protocol.TType.STRING, (short)0);
private static final org.apache.thrift.protocol.TField AZE_FIELD_DESC = new org.apache.thrift.protocol.TField("aze", org.apache.thrift.protocol.TType.STRUCT, (short)1);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
- schemes.put(StandardScheme.class, new uploadChunk_resultStandardSchemeFactory());
- schemes.put(TupleScheme.class, new uploadChunk_resultTupleSchemeFactory());
+ schemes.put(StandardScheme.class, new beginFileDownload_resultStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new beginFileDownload_resultTupleSchemeFactory());
}
+ private String success; // required
private AuthorizationException aze; // required
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ SUCCESS((short)0, "success"),
AZE((short)1, "aze");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -32170,6 +33897,8 @@ public class Nimbus {
*/
public static _Fields findByThriftId(int fieldId) {
switch(fieldId) {
+ case 0: // SUCCESS
+ return SUCCESS;
case 1: // AZE
return AZE;
default:
@@ -32215,40 +33944,71 @@ public class Nimbus {
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.SUCCESS, new org.apache.thrift.meta_data.FieldMetaData("success", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
tmpMap.put(_Fields.AZE, new org.apache.thrift.meta_data.FieldMetaData("aze", org.apache.thrift.TFieldRequirementType.DEFAULT,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRUCT)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
- org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(uploadChunk_result.class, metaDataMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(beginFileDownload_result.class, metaDataMap);
}
- public uploadChunk_result() {
+ public beginFileDownload_result() {
}
- public uploadChunk_result(
+ public beginFileDownload_result(
+ String success,
AuthorizationException aze)
{
this();
+ this.success = success;
this.aze = aze;
}
/**
* Performs a deep copy on <i>other</i>.
*/
- public uploadChunk_result(uploadChunk_result other) {
+ public beginFileDownload_result(beginFileDownload_result other) {
+ if (other.is_set_success()) {
+ this.success = other.success;
+ }
if (other.is_set_aze()) {
this.aze = new AuthorizationException(other.aze);
}
}
- public uploadChunk_result deepCopy() {
- return new uploadChunk_result(this);
+ public beginFileDownload_result deepCopy() {
+ return new beginFileDownload_result(this);
}
@Override
public void clear() {
+ this.success = null;
this.aze = null;
}
+ public String get_success() {
+ return this.success;
+ }
+
+ public void set_success(String success) {
+ this.success = success;
+ }
+
+ public void unset_success() {
+ this.success = null;
+ }
+
+ /** Returns true if field success is set (has been assigned a value) and false otherwise */
+ public boolean is_set_success() {
+ return this.success != null;
+ }
+
+ public void set_success_isSet(boolean value) {
+ if (!value) {
+ this.success = null;
+ }
+ }
+
public AuthorizationException get_aze() {
return this.aze;
}
@@ -32274,6 +34034,14 @@ public class Nimbus {
public void setFieldValue(_Fields field, Object value) {
switch (field) {
+ case SUCCESS:
+ if (value == null) {
+ unset_success();
+ } else {
+ set_success((String)value);
+ }
+ break;
+
case AZE:
if (value == null) {
unset_aze();
@@ -32287,6 +34055,9 @@ public class Nimbus {
public Object getFieldValue(_Fields field) {
switch (field) {
+ case SUCCESS:
+ return get_success();
+
case AZE:
return get_aze();
@@ -32301,6 +34072,8 @@ public class Nimbus {
}
switch (field) {
+
<TRUNCATED>
[6/8] storm git commit: addressed review comments
Posted by ka...@apache.org.
addressed review comments
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0f9dbf40
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0f9dbf40
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0f9dbf40
Branch: refs/heads/1.x-branch
Commit: 0f9dbf401dc1c2adab4f856067350a370b03e27e
Parents: 897a6d2
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Dec 2 15:29:38 2016 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Dec 5 07:48:57 2016 -0600
----------------------------------------------------------------------
.../src/jvm/org/apache/storm/starter/WordCountTopology.java | 4 +---
1 file changed, 1 insertion(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/0f9dbf40/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
index b47981d..e4a5711 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
@@ -90,9 +90,7 @@ public class WordCountTopology {
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
- for (String name: args) {
- StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
- }
+ StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
}
else {
conf.setMaxTaskParallelism(3);
[3/8] storm git commit: Added in some optimizations for better
topology submission performance
Posted by ka...@apache.org.
http://git-wip-us.apache.org/repos/asf/storm/blob/897a6d2b/storm-core/src/jvm/org/apache/storm/utils/NimbusClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/NimbusClient.java b/storm-core/src/jvm/org/apache/storm/utils/NimbusClient.java
index e8cef09..85209d6 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/NimbusClient.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/NimbusClient.java
@@ -18,7 +18,6 @@
package org.apache.storm.utils;
import org.apache.storm.Config;
-import org.apache.storm.generated.ClusterSummary;
import org.apache.storm.generated.Nimbus;
import org.apache.storm.generated.NimbusSummary;
import org.apache.storm.security.auth.ReqContext;
@@ -78,32 +77,36 @@ public class NimbusClient extends ThriftClient {
for (String host : seeds) {
int port = Integer.parseInt(conf.get(Config.NIMBUS_THRIFT_PORT).toString());
- ClusterSummary clusterInfo;
- try (NimbusClient client = new NimbusClient(conf, host, port, null, asUser)) {
- clusterInfo = client.getClient().getClusterInfo();
+ NimbusSummary nimbusSummary;
+ NimbusClient client = null;
+ try {
+ client = new NimbusClient(conf, host, port, null, asUser);
+ nimbusSummary = client.getClient().getLeader();
+ if (nimbusSummary != null) {
+ String leaderNimbus = nimbusSummary.get_host() + ":" + nimbusSummary.get_port();
+ LOG.info("Found leader nimbus : {}", leaderNimbus);
+ if (nimbusSummary.get_host().equals(host) && nimbusSummary.get_port() == port) {
+ NimbusClient ret = client;
+ client = null;
+ return ret;
+ }
+ try {
+ return new NimbusClient(conf, nimbusSummary.get_host(), nimbusSummary.get_port(), null, asUser);
+ } catch (TTransportException e) {
+ throw new RuntimeException("Failed to create a nimbus client for the leader " + leaderNimbus, e);
+ }
+ }
} catch (Exception e) {
LOG.warn("Ignoring exception while trying to get leader nimbus info from " + host
+ ". will retry with a different seed host.", e);
continue;
- }
- List<NimbusSummary> nimbuses = clusterInfo.get_nimbuses();
- if (nimbuses != null) {
- for (NimbusSummary nimbusSummary : nimbuses) {
- if (nimbusSummary.is_isLeader()) {
- String leaderNimbus = nimbusSummary.get_host() + ":" + nimbusSummary.get_port();
- LOG.info("Found leader nimbus : {}", leaderNimbus);
-
- try {
- return new NimbusClient(conf, nimbusSummary.get_host(), nimbusSummary.get_port(), null, asUser);
- } catch (TTransportException e) {
- throw new RuntimeException("Failed to create a nimbus client for the leader " + leaderNimbus, e);
- }
- }
+ } finally {
+ if (client != null) {
+ client.close();
}
- throw new NimbusLeaderNotFoundException(
- "Found nimbuses " + nimbuses + " none of which is elected as leader, please try " +
- "again after some time.");
}
+ throw new NimbusLeaderNotFoundException("Could not find a nimbus leader, please try " +
+ "again after some time.");
}
throw new NimbusLeaderNotFoundException(
"Could not find leader nimbus from seed hosts " + seeds + ". " +
http://git-wip-us.apache.org/repos/asf/storm/blob/897a6d2b/storm-core/src/jvm/org/apache/storm/utils/StormBoundedExponentialBackoffRetry.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/StormBoundedExponentialBackoffRetry.java b/storm-core/src/jvm/org/apache/storm/utils/StormBoundedExponentialBackoffRetry.java
index 7528221..33c2c1b 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/StormBoundedExponentialBackoffRetry.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/StormBoundedExponentialBackoffRetry.java
@@ -66,11 +66,14 @@ public class StormBoundedExponentialBackoffRetry extends BoundedExponentialBacko
int exp = 1 << retryCount;
int jitter = random.nextInt(exp);
int sleepTimeMs = super.getBaseSleepTimeMs() + exp + jitter;
+ LOG.warn("WILL SLEEP FOR {}ms (NOT MAX)", sleepTimeMs);
return sleepTimeMs;
} else {
int stepJitter = random.nextInt(stepSize);
- return Math.min(super.getMaxSleepTimeMs(), (linearBaseSleepMs +
+ int sleepTimeMs = Math.min(super.getMaxSleepTimeMs(), (linearBaseSleepMs +
(stepSize * (retryCount - expRetriesThreshold)) + stepJitter));
+ LOG.warn("WILL SLEEP FOR {}ms (MAX)", sleepTimeMs);
+ return sleepTimeMs;
}
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/897a6d2b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index ed5c552..75ffd23 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -1142,12 +1142,14 @@ public class Utils {
public static CuratorFramework newCuratorStarted(Map conf, List<String> servers, Object port, String root, ZookeeperAuthInfo auth) {
CuratorFramework ret = newCurator(conf, servers, port, root, auth);
+ LOG.info("Starting Utils Curator...");
ret.start();
return ret;
}
public static CuratorFramework newCuratorStarted(Map conf, List<String> servers, Object port, ZookeeperAuthInfo auth) {
CuratorFramework ret = newCurator(conf, servers, port, auth);
+ LOG.info("Starting Utils Curator (2)...");
ret.start();
return ret;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/897a6d2b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
index 0580f41..ef35307 100644
--- a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
+++ b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
@@ -122,6 +122,7 @@ public class Zookeeper {
}
}
});
+ LOG.info("Staring ZK Curator");
fk.start();
return fk;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/897a6d2b/storm-core/src/py/storm/Nimbus-remote
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/Nimbus-remote b/storm-core/src/py/storm/Nimbus-remote
index b39050e..0d643c3 100644
--- a/storm-core/src/py/storm/Nimbus-remote
+++ b/storm-core/src/py/storm/Nimbus-remote
@@ -76,6 +76,8 @@ if len(sys.argv) <= 1 or sys.argv[1] == '--help':
print(' string downloadChunk(string id)')
print(' string getNimbusConf()')
print(' ClusterSummary getClusterInfo()')
+ print(' NimbusSummary getLeader()')
+ print(' bool isTopologyNameAllowed(string name)')
print(' TopologyInfo getTopologyInfo(string id)')
print(' TopologyInfo getTopologyInfoWithOpts(string id, GetInfoOptions options)')
print(' TopologyPageInfo getTopologyPageInfo(string id, string window, bool is_include_sys)')
@@ -345,6 +347,18 @@ elif cmd == 'getClusterInfo':
sys.exit(1)
pp.pprint(client.getClusterInfo())
+elif cmd == 'getLeader':
+ if len(args) != 0:
+ print('getLeader requires 0 args')
+ sys.exit(1)
+ pp.pprint(client.getLeader())
+
+elif cmd == 'isTopologyNameAllowed':
+ if len(args) != 1:
+ print('isTopologyNameAllowed requires 1 args')
+ sys.exit(1)
+ pp.pprint(client.isTopologyNameAllowed(args[0],))
+
elif cmd == 'getTopologyInfo':
if len(args) != 1:
print('getTopologyInfo requires 1 args')
http://git-wip-us.apache.org/repos/asf/storm/blob/897a6d2b/storm-core/src/py/storm/Nimbus.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/Nimbus.py b/storm-core/src/py/storm/Nimbus.py
index 7e5470a..1c5e86e 100644
--- a/storm-core/src/py/storm/Nimbus.py
+++ b/storm-core/src/py/storm/Nimbus.py
@@ -291,6 +291,16 @@ class Iface:
def getClusterInfo(self):
pass
+ def getLeader(self):
+ pass
+
+ def isTopologyNameAllowed(self, name):
+ """
+ Parameters:
+ - name
+ """
+ pass
+
def getTopologyInfo(self, id):
"""
Parameters:
@@ -1523,6 +1533,67 @@ class Client(Iface):
raise result.aze
raise TApplicationException(TApplicationException.MISSING_RESULT, "getClusterInfo failed: unknown result")
+ def getLeader(self):
+ self.send_getLeader()
+ return self.recv_getLeader()
+
+ def send_getLeader(self):
+ self._oprot.writeMessageBegin('getLeader', TMessageType.CALL, self._seqid)
+ args = getLeader_args()
+ args.write(self._oprot)
+ self._oprot.writeMessageEnd()
+ self._oprot.trans.flush()
+
+ def recv_getLeader(self):
+ iprot = self._iprot
+ (fname, mtype, rseqid) = iprot.readMessageBegin()
+ if mtype == TMessageType.EXCEPTION:
+ x = TApplicationException()
+ x.read(iprot)
+ iprot.readMessageEnd()
+ raise x
+ result = getLeader_result()
+ result.read(iprot)
+ iprot.readMessageEnd()
+ if result.success is not None:
+ return result.success
+ if result.aze is not None:
+ raise result.aze
+ raise TApplicationException(TApplicationException.MISSING_RESULT, "getLeader failed: unknown result")
+
+ def isTopologyNameAllowed(self, name):
+ """
+ Parameters:
+ - name
+ """
+ self.send_isTopologyNameAllowed(name)
+ return self.recv_isTopologyNameAllowed()
+
+ def send_isTopologyNameAllowed(self, name):
+ self._oprot.writeMessageBegin('isTopologyNameAllowed', TMessageType.CALL, self._seqid)
+ args = isTopologyNameAllowed_args()
+ args.name = name
+ args.write(self._oprot)
+ self._oprot.writeMessageEnd()
+ self._oprot.trans.flush()
+
+ def recv_isTopologyNameAllowed(self):
+ iprot = self._iprot
+ (fname, mtype, rseqid) = iprot.readMessageBegin()
+ if mtype == TMessageType.EXCEPTION:
+ x = TApplicationException()
+ x.read(iprot)
+ iprot.readMessageEnd()
+ raise x
+ result = isTopologyNameAllowed_result()
+ result.read(iprot)
+ iprot.readMessageEnd()
+ if result.success is not None:
+ return result.success
+ if result.aze is not None:
+ raise result.aze
+ raise TApplicationException(TApplicationException.MISSING_RESULT, "isTopologyNameAllowed failed: unknown result")
+
def getTopologyInfo(self, id):
"""
Parameters:
@@ -1895,6 +1966,8 @@ class Processor(Iface, TProcessor):
self._processMap["downloadChunk"] = Processor.process_downloadChunk
self._processMap["getNimbusConf"] = Processor.process_getNimbusConf
self._processMap["getClusterInfo"] = Processor.process_getClusterInfo
+ self._processMap["getLeader"] = Processor.process_getLeader
+ self._processMap["isTopologyNameAllowed"] = Processor.process_isTopologyNameAllowed
self._processMap["getTopologyInfo"] = Processor.process_getTopologyInfo
self._processMap["getTopologyInfoWithOpts"] = Processor.process_getTopologyInfoWithOpts
self._processMap["getTopologyPageInfo"] = Processor.process_getTopologyPageInfo
@@ -2713,6 +2786,50 @@ class Processor(Iface, TProcessor):
oprot.writeMessageEnd()
oprot.trans.flush()
+ def process_getLeader(self, seqid, iprot, oprot):
+ args = getLeader_args()
+ args.read(iprot)
+ iprot.readMessageEnd()
+ result = getLeader_result()
+ try:
+ result.success = self._handler.getLeader()
+ msg_type = TMessageType.REPLY
+ except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+ raise
+ except AuthorizationException as aze:
+ msg_type = TMessageType.REPLY
+ result.aze = aze
+ except Exception as ex:
+ msg_type = TMessageType.EXCEPTION
+ logging.exception(ex)
+ result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+ oprot.writeMessageBegin("getLeader", msg_type, seqid)
+ result.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
+
+ def process_isTopologyNameAllowed(self, seqid, iprot, oprot):
+ args = isTopologyNameAllowed_args()
+ args.read(iprot)
+ iprot.readMessageEnd()
+ result = isTopologyNameAllowed_result()
+ try:
+ result.success = self._handler.isTopologyNameAllowed(args.name)
+ msg_type = TMessageType.REPLY
+ except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+ raise
+ except AuthorizationException as aze:
+ msg_type = TMessageType.REPLY
+ result.aze = aze
+ except Exception as ex:
+ msg_type = TMessageType.EXCEPTION
+ logging.exception(ex)
+ result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+ oprot.writeMessageBegin("isTopologyNameAllowed", msg_type, seqid)
+ result.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
+
def process_getTopologyInfo(self, seqid, iprot, oprot):
args = getTopologyInfo_args()
args.read(iprot)
@@ -8017,6 +8134,274 @@ class getClusterInfo_result:
def __ne__(self, other):
return not (self == other)
+class getLeader_args:
+
+ thrift_spec = (
+ )
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('getLeader_args')
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ return
+
+
+ def __hash__(self):
+ value = 17
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class getLeader_result:
+ """
+ Attributes:
+ - success
+ - aze
+ """
+
+ thrift_spec = (
+ (0, TType.STRUCT, 'success', (NimbusSummary, NimbusSummary.thrift_spec), None, ), # 0
+ (1, TType.STRUCT, 'aze', (AuthorizationException, AuthorizationException.thrift_spec), None, ), # 1
+ )
+
+ def __init__(self, success=None, aze=None,):
+ self.success = success
+ self.aze = aze
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 0:
+ if ftype == TType.STRUCT:
+ self.success = NimbusSummary()
+ self.success.read(iprot)
+ else:
+ iprot.skip(ftype)
+ elif fid == 1:
+ if ftype == TType.STRUCT:
+ self.aze = AuthorizationException()
+ self.aze.read(iprot)
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('getLeader_result')
+ if self.success is not None:
+ oprot.writeFieldBegin('success', TType.STRUCT, 0)
+ self.success.write(oprot)
+ oprot.writeFieldEnd()
+ if self.aze is not None:
+ oprot.writeFieldBegin('aze', TType.STRUCT, 1)
+ self.aze.write(oprot)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ return
+
+
+ def __hash__(self):
+ value = 17
+ value = (value * 31) ^ hash(self.success)
+ value = (value * 31) ^ hash(self.aze)
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class isTopologyNameAllowed_args:
+ """
+ Attributes:
+ - name
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.STRING, 'name', None, None, ), # 1
+ )
+
+ def __init__(self, name=None,):
+ self.name = name
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.STRING:
+ self.name = iprot.readString().decode('utf-8')
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('isTopologyNameAllowed_args')
+ if self.name is not None:
+ oprot.writeFieldBegin('name', TType.STRING, 1)
+ oprot.writeString(self.name.encode('utf-8'))
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ return
+
+
+ def __hash__(self):
+ value = 17
+ value = (value * 31) ^ hash(self.name)
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class isTopologyNameAllowed_result:
+ """
+ Attributes:
+ - success
+ - aze
+ """
+
+ thrift_spec = (
+ (0, TType.BOOL, 'success', None, None, ), # 0
+ (1, TType.STRUCT, 'aze', (AuthorizationException, AuthorizationException.thrift_spec), None, ), # 1
+ )
+
+ def __init__(self, success=None, aze=None,):
+ self.success = success
+ self.aze = aze
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 0:
+ if ftype == TType.BOOL:
+ self.success = iprot.readBool()
+ else:
+ iprot.skip(ftype)
+ elif fid == 1:
+ if ftype == TType.STRUCT:
+ self.aze = AuthorizationException()
+ self.aze.read(iprot)
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('isTopologyNameAllowed_result')
+ if self.success is not None:
+ oprot.writeFieldBegin('success', TType.BOOL, 0)
+ oprot.writeBool(self.success)
+ oprot.writeFieldEnd()
+ if self.aze is not None:
+ oprot.writeFieldBegin('aze', TType.STRUCT, 1)
+ self.aze.write(oprot)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ return
+
+
+ def __hash__(self):
+ value = 17
+ value = (value * 31) ^ hash(self.success)
+ value = (value * 31) ^ hash(self.aze)
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
class getTopologyInfo_args:
"""
Attributes:
http://git-wip-us.apache.org/repos/asf/storm/blob/897a6d2b/storm-core/src/storm.thrift
----------------------------------------------------------------------
diff --git a/storm-core/src/storm.thrift b/storm-core/src/storm.thrift
index 8e80d2a..700e5a0 100644
--- a/storm-core/src/storm.thrift
+++ b/storm-core/src/storm.thrift
@@ -667,6 +667,8 @@ service Nimbus {
string getNimbusConf() throws (1: AuthorizationException aze);
// stats functions
ClusterSummary getClusterInfo() throws (1: AuthorizationException aze);
+ NimbusSummary getLeader() throws (1: AuthorizationException aze);
+ bool isTopologyNameAllowed(1: string name) throws (1: AuthorizationException aze);
TopologyInfo getTopologyInfo(1: string id) throws (1: NotAliveException e, 2: AuthorizationException aze);
TopologyInfo getTopologyInfoWithOpts(1: string id, 2: GetInfoOptions options) throws (1: NotAliveException e, 2: AuthorizationException aze);
TopologyPageInfo getTopologyPageInfo(1: string id, 2: string window, 3: bool is_include_sys) throws (1: NotAliveException e, 2: AuthorizationException aze);
[7/8] storm git commit: Merge branch 'STORM-2190-1.x' of
https://github.com/revans2/incubator-storm into STORM-2190-1.x-merge
Posted by ka...@apache.org.
Merge branch 'STORM-2190-1.x' of https://github.com/revans2/incubator-storm into STORM-2190-1.x-merge
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/af9b7c8c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/af9b7c8c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/af9b7c8c
Branch: refs/heads/1.x-branch
Commit: af9b7c8c241922f7688c3da18577684b742e08af
Parents: b515356 0f9dbf4
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Dec 20 17:36:57 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Dec 20 17:36:57 2016 +0900
----------------------------------------------------------------------
.../org/apache/storm/command/kill_topology.clj | 8 +-
.../src/clj/org/apache/storm/daemon/nimbus.clj | 161 +-
.../jvm/org/apache/storm/StormSubmitter.java | 121 +-
.../storm/dependency/DependencyUploader.java | 22 +-
.../jvm/org/apache/storm/generated/Nimbus.java | 3377 ++++++++++++++----
.../org/apache/storm/utils/NimbusClient.java | 45 +-
.../StormBoundedExponentialBackoffRetry.java | 5 +-
.../src/jvm/org/apache/storm/utils/Utils.java | 2 +
.../org/apache/storm/zookeeper/Zookeeper.java | 1 +
storm-core/src/py/storm/Nimbus-remote | 14 +
storm-core/src/py/storm/Nimbus.py | 385 ++
storm-core/src/storm.thrift | 2 +
12 files changed, 3207 insertions(+), 936 deletions(-)
----------------------------------------------------------------------
[8/8] storm git commit: STORM-2190: CHANGELOG
Posted by ka...@apache.org.
STORM-2190: CHANGELOG
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4154490f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4154490f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4154490f
Branch: refs/heads/1.x-branch
Commit: 4154490f71b188996b51bd4cd9dbb47dc3871f21
Parents: af9b7c8
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Dec 20 17:56:13 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Dec 20 17:56:13 2016 +0900
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/4154490f/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index b965cf4..63a5777 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 1.1.0
+ * STORM-2190: reduce contention between submission and scheduling
* STORM-2239: Handle InterruptException in new Kafka spout
* STORM-2087: Storm-kafka-client: Failed tuples are not always replayed
* STORM-2238: Add Timestamp extractor for windowed bolt
[5/8] storm git commit: Added in some optimizations for better
topology submission performance
Posted by ka...@apache.org.
Added in some optimizations for better topology submission performance
Conflicts:
storm-core/src/jvm/org/apache/storm/StormSubmitter.java
storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/897a6d2b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/897a6d2b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/897a6d2b
Branch: refs/heads/1.x-branch
Commit: 897a6d2b9c2e34d06ba949c227d84f63904f5b5e
Parents: 93c1ea3
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Dec 2 15:28:17 2016 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Dec 5 07:48:44 2016 -0600
----------------------------------------------------------------------
.../src/clj/org/apache/storm/daemon/nimbus.clj | 28 +
.../jvm/org/apache/storm/StormSubmitter.java | 121 +-
.../storm/dependency/DependencyUploader.java | 22 +-
.../jvm/org/apache/storm/generated/Nimbus.java | 3377 ++++++++++++++----
.../org/apache/storm/utils/NimbusClient.java | 45 +-
.../StormBoundedExponentialBackoffRetry.java | 5 +-
.../src/jvm/org/apache/storm/utils/Utils.java | 2 +
.../org/apache/storm/zookeeper/Zookeeper.java | 1 +
storm-core/src/py/storm/Nimbus-remote | 14 +
storm-core/src/py/storm/Nimbus.py | 385 ++
storm-core/src/storm.thrift | 2 +
11 files changed, 3137 insertions(+), 865 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/897a6d2b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index e9d35a8..112d00b 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -94,6 +94,8 @@
(defmeter nimbus:num-getTopology-calls)
(defmeter nimbus:num-getUserTopology-calls)
(defmeter nimbus:num-getClusterInfo-calls)
+(defmeter nimbus:num-getLeader-calls)
+(defmeter nimbus:num-isTopologyNameAllowed-calls)
(defmeter nimbus:num-getTopologyInfoWithOpts-calls)
(defmeter nimbus:num-getTopologyInfo-calls)
(defmeter nimbus:num-getTopologyPageInfo-calls)
@@ -2353,6 +2355,32 @@
topo-history-list (read-topology-history nimbus user admin-users)]
(TopologyHistoryInfo. (distinct (concat active-ids-for-user topo-history-list)))))
+ (^NimbusSummary getLeader [this]
+ (mark! nimbus:num-getLeader-calls)
+ (check-authorization! nimbus nil nil "getClusterInfo")
+ (let [storm-cluster-state (:storm-cluster-state nimbus)
+ nimbuses (.nimbuses storm-cluster-state)
+ leader-elector (:leader-elector nimbus)
+ leader (.getLeader leader-elector)
+ leader-host (.getHost leader)
+ leader-port (.getPort leader)
+ leader-summary (first (filter
+ (fn [nimbus-summary] (and (= leader-host (.get_host nimbus-summary)) (= leader-port (.get_port nimbus-summary))))
+ nimbuses))]
+ (.set_uptime_secs leader-summary (time-delta (.get_uptime_secs leader-summary)))
+ (.set_isLeader leader-summary true)
+ leader-summary))
+
+ (^boolean isTopologyNameAllowed [this ^String name]
+ (mark! nimbus:num-isTopologyNameAllowed-calls)
+ (check-authorization! nimbus nil nil "getClusterInfo")
+ (try
+ (validate-topology-name! name)
+ (check-storm-active! nimbus name false)
+ true
+ (catch InvalidTopologyException e false)
+ (catch AlreadyAliveException e false)))
+
Shutdownable
(shutdown [this]
(mark! nimbus:num-shutdown-calls)
http://git-wip-us.apache.org/repos/asf/storm/blob/897a6d2b/storm-core/src/jvm/org/apache/storm/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/StormSubmitter.java b/storm-core/src/jvm/org/apache/storm/StormSubmitter.java
index effeed4..e414d60 100644
--- a/storm-core/src/jvm/org/apache/storm/StormSubmitter.java
+++ b/storm-core/src/jvm/org/apache/storm/StormSubmitter.java
@@ -128,19 +128,19 @@ public class StormSubmitter {
return;
}
try {
- if(localNimbus!=null) {
- LOG.info("Pushing Credentials to topology " + name + " in local mode");
+ if (localNimbus!=null) {
+ LOG.info("Pushing Credentials to topology {} in local mode", name);
localNimbus.uploadNewCredentials(name, new Credentials(fullCreds));
} else {
NimbusClient client = NimbusClient.getConfiguredClient(conf);
try {
- LOG.info("Uploading new credentials to " + name);
+ LOG.info("Uploading new credentials to {}", name);
client.getClient().uploadNewCredentials(name, new Credentials(fullCreds));
} finally {
client.close();
}
}
- LOG.info("Finished submitting topology: " + name);
+ LOG.info("Finished pushing creds to topology: {}", name);
} catch(TException e) {
throw new RuntimeException(e);
}
@@ -227,9 +227,9 @@ public class StormSubmitter {
opts.set_creds(new Credentials(fullCreds));
}
try {
- if(localNimbus!=null) {
+ if (localNimbus!=null) {
LOG.info("Submitting topology " + name + " in local mode");
- if(opts!=null) {
+ if (opts!=null) {
localNimbus.submitTopologyWithOpts(name, stormConf, topology, opts);
} else {
// this is for backwards compatibility
@@ -238,37 +238,41 @@ public class StormSubmitter {
LOG.info("Finished submitting topology: " + name);
} else {
String serConf = JSONValue.toJSONString(stormConf);
- if(topologyNameExists(conf, name, asUser)) {
- throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
- }
-
- // Dependency uploading only makes sense for distributed mode
- List<String> jarsBlobKeys = Collections.emptyList();
- List<String> artifactsBlobKeys;
-
- DependencyUploader uploader = new DependencyUploader();
- try {
- uploader.init();
- jarsBlobKeys = uploadDependencyJarsToBlobStore(uploader);
- artifactsBlobKeys = uploadDependencyArtifactsToBlobStore(uploader);
- } catch (Throwable e) {
- // remove uploaded jars blobs, not artifacts since they're shared across the cluster
- uploader.deleteBlobs(jarsBlobKeys);
- uploader.shutdown();
- throw e;
- }
-
- try {
- setDependencyBlobsToTopology(topology, jarsBlobKeys, artifactsBlobKeys);
- submitTopologyInDistributeMode(name, topology, opts, progressListener, asUser, conf, serConf);
- } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
- // remove uploaded jars blobs, not artifacts since they're shared across the cluster
- // Note that we don't handle TException to delete jars blobs
- // because it's safer to leave some blobs instead of topology not running
- uploader.deleteBlobs(jarsBlobKeys);
- throw e;
- } finally {
- uploader.shutdown();
+ try (NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser)) {
+ if (topologyNameExists(name, client)) {
+ throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
+ }
+
+ // Dependency uploading only makes sense for distributed mode
+ List<String> jarsBlobKeys = Collections.emptyList();
+ List<String> artifactsBlobKeys;
+
+ DependencyUploader uploader = new DependencyUploader();
+ try {
+ uploader.init();
+
+ jarsBlobKeys = uploadDependencyJarsToBlobStore(uploader);
+
+ artifactsBlobKeys = uploadDependencyArtifactsToBlobStore(uploader);
+ } catch (Throwable e) {
+ // remove uploaded jars blobs, not artifacts since they're shared across the cluster
+ uploader.deleteBlobs(jarsBlobKeys);
+ uploader.shutdown();
+ throw e;
+ }
+
+ try {
+ setDependencyBlobsToTopology(topology, jarsBlobKeys, artifactsBlobKeys);
+ submitTopologyInDistributeMode(name, topology, opts, progressListener, asUser, conf, serConf, client);
+ } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
+ // remove uploaded jars blobs, not artifacts since they're shared across the cluster
+ // Note that we don't handle TException to delete jars blobs
+ // because it's safer to leave some blobs instead of topology not running
+ uploader.deleteBlobs(jarsBlobKeys);
+ throw e;
+ } finally {
+ uploader.shutdown();
+ }
}
}
} catch(TException e) {
@@ -316,19 +320,20 @@ public class StormSubmitter {
private static void submitTopologyInDistributeMode(String name, StormTopology topology, SubmitOptions opts,
ProgressListener progressListener, String asUser, Map conf,
- String serConf) throws TException {
- String jar = submitJarAs(conf, System.getProperty("storm.jar"), progressListener, asUser);
- try (NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser)){
- LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf);
+ String serConf, NimbusClient client) throws TException {
+ try {
+ String jar = submitJarAs(conf, System.getProperty("storm.jar"), progressListener, client);
+ LOG.info("Submitting topology {} in distributed mode with conf {}", name, serConf);
+
if (opts != null) {
client.getClient().submitTopologyWithOpts(name, jar, serConf, topology, opts);
} else {
// this is for backwards compatibility
client.getClient().submitTopology(name, jar, serConf, topology);
}
- LOG.info("Finished submitting topology: " + name);
+ LOG.info("Finished submitting topology: {}", name);
} catch (InvalidTopologyException e) {
- LOG.warn("Topology submission exception: " + e.get_msg());
+ LOG.warn("Topology submission exception: {}", e.get_msg());
throw e;
} catch (AlreadyAliveException e) {
LOG.warn("Topology already alive exception", e);
@@ -444,16 +449,9 @@ public class StormSubmitter {
});
}
- private static boolean topologyNameExists(Map conf, String name, String asUser) {
- try (NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser)) {
- ClusterSummary summary = client.getClient().getClusterInfo();
- for(TopologySummary s : summary.get_topologies()) {
- if(s.get_name().equals(name)) {
- return true;
- }
- }
- return false;
-
+ private static boolean topologyNameExists(String name, NimbusClient client) {
+ try {
+ return !client.getClient().isTopologyNameAllowed(name);
} catch(Exception e) {
throw new RuntimeException(e);
}
@@ -473,13 +471,12 @@ public class StormSubmitter {
return submitJar(conf, localJar, null);
}
-
- public static String submitJarAs(Map conf, String localJar, ProgressListener listener, String asUser) {
+ public static String submitJarAs(Map conf, String localJar, ProgressListener listener, NimbusClient client) {
if (localJar == null) {
throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
}
- try (NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser)) {
+ try {
String uploadLocation = client.getClient().beginFileUpload();
LOG.info("Uploading topology jar " + localJar + " to assigned location: " + uploadLocation);
BufferFileInputStream is = new BufferFileInputStream(localJar, THRIFT_CHUNK_SIZE_BYTES);
@@ -512,6 +509,16 @@ public class StormSubmitter {
throw new RuntimeException(e);
}
}
+
+ public static String submitJarAs(Map conf, String localJar, ProgressListener listener, String asUser) {
+ if (localJar == null) {
+ throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
+ }
+
+ try (NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser)) {
+ return submitJarAs(conf, localJar, listener, client);
+ }
+ }
/**
* Submit jar file
@@ -521,7 +528,7 @@ public class StormSubmitter {
* @return the remote location of the submitted jar
*/
public static String submitJar(Map conf, String localJar, ProgressListener listener) {
- return submitJarAs(conf,localJar, listener, null);
+ return submitJarAs(conf,localJar, listener, (String)null);
}
/**
http://git-wip-us.apache.org/repos/asf/storm/blob/897a6d2b/storm-core/src/jvm/org/apache/storm/dependency/DependencyUploader.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/dependency/DependencyUploader.java b/storm-core/src/jvm/org/apache/storm/dependency/DependencyUploader.java
index 4f71f67..636f454 100644
--- a/storm-core/src/jvm/org/apache/storm/dependency/DependencyUploader.java
+++ b/storm-core/src/jvm/org/apache/storm/dependency/DependencyUploader.java
@@ -18,7 +18,6 @@
package org.apache.storm.dependency;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang.StringUtils;
import org.apache.storm.blobstore.AtomicOutputStream;
import org.apache.storm.blobstore.BlobStoreUtils;
import org.apache.storm.blobstore.ClientBlobStore;
@@ -38,7 +37,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
public class DependencyUploader {
public static final Logger LOG = LoggerFactory.getLogger(DependencyUploader.class);
@@ -51,9 +49,7 @@ public class DependencyUploader {
}
public void init() {
- if (blobStore == null) {
- blobStore = Utils.getClientBlobStore(conf);
- }
+ //NOOP
}
public void shutdown() {
@@ -67,7 +63,13 @@ public class DependencyUploader {
this.blobStore = blobStore;
}
- @SuppressWarnings("unchecked")
+ private synchronized ClientBlobStore getBlobStore() {
+ if (blobStore == null) {
+ blobStore = Utils.getClientBlobStore(conf);
+ }
+ return blobStore;
+ }
+
public List<String> uploadFiles(List<File> dependencies, boolean cleanupIfFails) throws IOException, AuthorizationException {
checkFilesExist(dependencies);
@@ -87,7 +89,7 @@ public class DependencyUploader {
keys.add(key);
}
} catch (Throwable e) {
- if (blobStore != null && cleanupIfFails) {
+ if (getBlobStore() != null && cleanupIfFails) {
deleteBlobs(keys);
}
throw new RuntimeException(e);
@@ -124,7 +126,7 @@ public class DependencyUploader {
public void deleteBlobs(List<String> keys) {
for (String key : keys) {
try {
- blobStore.deleteBlob(key);
+ getBlobStore().deleteBlob(key);
} catch (Throwable e) {
LOG.warn("blob delete failed - key: {} continue...", key);
}
@@ -142,10 +144,10 @@ public class DependencyUploader {
try {
// FIXME: we can filter by listKeys() with local blobstore when STORM-1986 is going to be resolved
// as a workaround, we call getBlobMeta() for all keys
- blobStore.getBlobMeta(key);
+ getBlobStore().getBlobMeta(key);
} catch (KeyNotFoundException e) {
// TODO: do we want to add ACL here?
- AtomicOutputStream blob = blobStore
+ AtomicOutputStream blob = getBlobStore()
.createBlob(key, new SettableBlobMeta(new ArrayList<AccessControl>()));
Files.copy(dependency.toPath(), blob);
blob.close();