You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/02/03 19:53:50 UTC
[1/3] storm git commit: Faster,
optional retrieval of last component error
Repository: storm
Updated Branches:
refs/heads/master bbcf749ca -> 1f35f41a8
Faster, optional retrieval of last component error
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1cfa190f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1cfa190f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1cfa190f
Branch: refs/heads/master
Commit: 1cfa190f2efb06f8798984b43dec801e5ff20ad5
Parents: bc54e8e
Author: Derek Dagit <de...@yahoo-inc.com>
Authored: Thu Jan 22 10:46:03 2015 -0600
Committer: Derek Dagit <de...@yahoo-inc.com>
Committed: Thu Jan 22 10:46:03 2015 -0600
----------------------------------------------------------------------
storm-core/src/clj/backtype/storm/cluster.clj | 38 +-
.../src/clj/backtype/storm/daemon/nimbus.clj | 35 +-
storm-core/src/clj/backtype/storm/ui/core.clj | 32 +-
.../storm/generated/GetInfoOptions.java | 350 +++++++
.../jvm/backtype/storm/generated/Nimbus.java | 974 +++++++++++++++++++
.../storm/generated/NumErrorsChoice.java | 64 ++
.../src/jvm/backtype/storm/utils/Monitor.java | 8 +-
storm-core/src/py/storm/DistributedRPC-remote | 0
.../py/storm/DistributedRPCInvocations-remote | 0
storm-core/src/py/storm/Nimbus-remote | 7 +
storm-core/src/py/storm/Nimbus.py | 226 +++++
storm-core/src/py/storm/ttypes.py | 80 ++
storm-core/src/storm.thrift | 11 +
.../clj/backtype/storm/integration_test.clj | 10 +-
14 files changed, 1812 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/1cfa190f/storm-core/src/clj/backtype/storm/cluster.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/cluster.clj b/storm-core/src/clj/backtype/storm/cluster.clj
index 8ead710..4b73f2e 100644
--- a/storm-core/src/clj/backtype/storm/cluster.clj
+++ b/storm-core/src/clj/backtype/storm/cluster.clj
@@ -162,6 +162,7 @@
(remove-storm! [this storm-id])
(report-error [this storm-id task-id node port error])
(errors [this storm-id task-id])
+ (last-error [this storm-id task-id])
(set-credentials! [this storm-id creds topo-conf])
(credentials [this storm-id callback])
(disconnect [this]))
@@ -209,6 +210,16 @@
[storm-id component-id]
(str (error-storm-root storm-id) "/" (url-encode component-id)))
+(def last-error-path-seg "last-error")
+
+(defn last-error-path
+ [storm-id component-id]
+ (str (error-storm-root storm-id)
+ "/"
+ (url-encode component-id)
+ "-"
+ last-error-path-seg))
+
(defn credentials-path
[storm-id]
(str CREDENTIALS-SUBTREE "/" storm-id))
@@ -232,7 +243,7 @@
(when ser
(Utils/deserialize ser)))
-(defstruct TaskError :error :time-secs :host :port)
+(defrecord TaskError [error time-secs host port])
(defn- parse-error-path
[^String p]
@@ -430,9 +441,13 @@
(report-error
[this storm-id component-id node port error]
(let [path (error-path storm-id component-id)
+ last-error-path (last-error-path storm-id component-id)
data {:time-secs (current-time-secs) :error (stringify-error error) :host node :port port}
_ (mkdirs cluster-state path acls)
- _ (create-sequential cluster-state (str path "/e") (Utils/serialize data) acls)
+ ser-data (Utils/serialize data)
+ _ (mkdirs cluster-state path acls)
+ _ (create-sequential cluster-state (str path "/e") ser-data acls)
+ _ (set-data cluster-state last-error-path ser-data acls)
to-kill (->> (get-children cluster-state path false)
(sort-by parse-error-path)
reverse
@@ -445,15 +460,22 @@
(let [path (error-path storm-id component-id)
errors (if (exists-node? cluster-state path false)
(dofor [c (get-children cluster-state path false)]
- (let [data (-> (get-data cluster-state (str path "/" c) false)
+ (if-let [data (-> (get-data cluster-state
+ (str path "/" c)
+ false)
maybe-deserialize)]
- (when data
- (struct TaskError (:error data) (:time-secs data) (:host data) (:port data))
- )))
- ())
- ]
+ (map->TaskError data)))
+ ())]
(->> (filter not-nil? errors)
(sort-by (comp - :time-secs)))))
+
+ (last-error
+ [this storm-id component-id]
+ (let [path (last-error-path storm-id component-id)]
+ (if (exists-node? cluster-state path false)
+ (if-let [data (->> (get-data cluster-state path false)
+ maybe-deserialize)]
+ (map->TaskError data)))))
(disconnect
[this]
http://git-wip-us.apache.org/repos/asf/storm/blob/1cfa190f/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index b2cb96a..0c7612b 100644
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@ -22,7 +22,8 @@
(:use [backtype.storm.scheduler.DefaultScheduler])
(:import [backtype.storm.scheduler INimbus SupervisorDetails WorkerSlot TopologyDetails
Cluster Topologies SchedulerAssignment SchedulerAssignmentImpl DefaultScheduler ExecutorDetails])
- (:import [backtype.storm.generated AuthorizationException])
+ (:import [backtype.storm.generated AuthorizationException GetInfoOptions
+ NumErrorsChoice])
(:use [backtype.storm bootstrap util])
(:use [backtype.storm.config :only [validate-configs-with-schemas]])
(:use [backtype.storm.daemon common])
@@ -893,6 +894,13 @@
(.set_host (:host %))
(.set_port (:port %))))))
+(defn- get-last-error
+ [storm-cluster-state storm-id component-id]
+ (if-let [e (.last-error storm-cluster-state storm-id component-id)]
+ (doto (ErrorInfo. (:error e) (:time-secs e))
+ (.set_host (:host e))
+ (.set_port (:port e)))))
+
(defn- thriftify-executor-id [[first-task-id last-task-id]]
(ExecutorInfo. (int first-task-id) (int last-task-id)))
@@ -1255,7 +1263,7 @@
topology-summaries)
))
- (^TopologyInfo getTopologyInfo [this ^String storm-id]
+ (^TopologyInfo getTopologyInfoWithOpts [this ^String storm-id ^GetInfoOptions options]
(let [storm-cluster-state (:storm-cluster-state nimbus)
topology-conf (try-read-storm-conf conf storm-id)
storm-name (topology-conf TOPOLOGY-NAME)
@@ -1266,8 +1274,22 @@
assignment (.assignment-info storm-cluster-state storm-id nil)
beats (map-val :heartbeat (get @(:heartbeats-cache nimbus) storm-id))
all-components (-> task->component reverse-map keys)
+ num-err-choice (or (.get_num_err_choice options)
+ NumErrorsChoice/ALL)
+ errors-fn (condp = num-err-choice
+ NumErrorsChoice/NONE (fn [& _] ()) ;; empty list only
+ NumErrorsChoice/ONE (comp #(remove nil? %)
+ list
+ get-last-error)
+ NumErrorsChoice/ALL get-errors
+ ;; Default
+ (do
+ (log-warn "Got invalid NumErrorsChoice '"
+ num-err-choice
+ "'")
+ get-errors))
errors (->> all-components
- (map (fn [c] [c (get-errors storm-cluster-state storm-id c)]))
+ (map (fn [c] [c (errors-fn storm-cluster-state storm-id c)]))
(into {}))
executor-summaries (dofor [[executor [node port]] (:executor->node+port assignment)]
(let [host (-> assignment :node->host (get node))
@@ -1294,7 +1316,12 @@
(when-let [sched-status (.get @(:id->sched-status nimbus) storm-id)] (.set_sched_status topo-info sched-status))
topo-info
))
-
+
+ (^TopologyInfo getTopologyInfo [this ^String storm-id]
+ (.getTopologyInfoWithOpts this
+ storm-id
+ (doto (GetInfoOptions.) (.set_num_err_choice NumErrorsChoice/ALL))))
+
Shutdownable
(shutdown [this]
(log-message "Shutting down master")
http://git-wip-us.apache.org/repos/asf/storm/blob/1cfa190f/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj
index 60f9493..5998529 100644
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@ -29,7 +29,7 @@
ExecutorStats ExecutorSummary TopologyInfo SpoutStats BoltStats
ErrorInfo ClusterSummary SupervisorSummary TopologySummary
Nimbus$Client StormTopology GlobalStreamId RebalanceOptions
- KillOptions])
+ KillOptions GetInfoOptions NumErrorsChoice])
(:import [backtype.storm.security.auth AuthUtils ReqContext])
(:import [backtype.storm.generated AuthorizationException])
(:import [backtype.storm.security.auth AuthUtils])
@@ -475,7 +475,10 @@
topology (.getTopology ^Nimbus$Client nimbus id)
spouts (.get_spouts topology)
bolts (.get_bolts topology)
- summ (.getTopologyInfo ^Nimbus$Client nimbus id)
+ summ (->> (doto
+ (GetInfoOptions.)
+ (.set_num_err_choice NumErrorsChoice/NONE))
+ (.getTopologyInfoWithOpts ^Nimbus$Client nimbus id))
execs (.get_executors summ)
spout-summs (filter (partial spout-summary? topology) execs)
bolt-summs (filter (partial bolt-summary? topology) execs)
@@ -652,7 +655,10 @@
(with-nimbus nimbus
(let [window (if window window ":all-time")
window-hint (window-hint window)
- summ (.getTopologyInfo ^Nimbus$Client nimbus id)
+ summ (->> (doto
+ (GetInfoOptions.)
+ (.set_num_err_choice NumErrorsChoice/ONE))
+ (.getTopologyInfoWithOpts ^Nimbus$Client nimbus id))
topology (.getTopology ^Nimbus$Client nimbus id)
topology-conf (from-json (.getTopologyConf ^Nimbus$Client nimbus id))
spout-summs (filter (partial spout-summary? topology) (.get_executors summ))
@@ -903,7 +909,10 @@
(json-response (component-page id component (:window m) (check-include-sys? (:sys m)) user) (:callback m))))
(POST "/api/v1/topology/:id/activate" [:as {:keys [cookies servlet-request]} id]
(with-nimbus nimbus
- (let [tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
+ (let [tplg (->> (doto
+ (GetInfoOptions.)
+ (.set_num_err_choice NumErrorsChoice/NONE))
+ (.getTopologyInfoWithOpts ^Nimbus$Client nimbus id))
name (.get_name tplg)]
(assert-authorized-user servlet-request "activate" (topology-config id))
(.activate nimbus name)
@@ -911,7 +920,10 @@
(resp/redirect (str "/api/v1/topology/" (url-encode id))))
(POST "/api/v1/topology/:id/deactivate" [:as {:keys [cookies servlet-request]} id]
(with-nimbus nimbus
- (let [tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
+ (let [tplg (->> (doto
+ (GetInfoOptions.)
+ (.set_num_err_choice NumErrorsChoice/NONE))
+ (.getTopologyInfoWithOpts ^Nimbus$Client nimbus id))
name (.get_name tplg)]
(assert-authorized-user servlet-request "deactivate" (topology-config id))
(.deactivate nimbus name)
@@ -919,7 +931,10 @@
(resp/redirect (str "/api/v1/topology/" (url-encode id))))
(POST "/api/v1/topology/:id/rebalance/:wait-time" [:as {:keys [cookies servlet-request]} id wait-time]
(with-nimbus nimbus
- (let [tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
+ (let [tplg (->> (doto
+ (GetInfoOptions.)
+ (.set_num_err_choice NumErrorsChoice/NONE))
+ (.getTopologyInfoWithOpts ^Nimbus$Client nimbus id))
name (.get_name tplg)
options (RebalanceOptions.)]
(assert-authorized-user servlet-request "rebalance" (topology-config id))
@@ -929,7 +944,10 @@
(resp/redirect (str "/api/v1/topology/" (url-encode id))))
(POST "/api/v1/topology/:id/kill/:wait-time" [:as {:keys [cookies servlet-request]} id wait-time]
(with-nimbus nimbus
- (let [tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
+ (let [tplg (->> (doto
+ (GetInfoOptions.)
+ (.set_num_err_choice NumErrorsChoice/NONE))
+ (.getTopologyInfoWithOpts ^Nimbus$Client nimbus id))
name (.get_name tplg)
options (KillOptions.)]
(assert-authorized-user servlet-request "killTopology" (topology-config id))
http://git-wip-us.apache.org/repos/asf/storm/blob/1cfa190f/storm-core/src/jvm/backtype/storm/generated/GetInfoOptions.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/GetInfoOptions.java b/storm-core/src/jvm/backtype/storm/generated/GetInfoOptions.java
new file mode 100644
index 0000000..23e6a3b
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/generated/GetInfoOptions.java
@@ -0,0 +1,350 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.7.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package backtype.storm.generated;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GetInfoOptions implements org.apache.thrift.TBase<GetInfoOptions, GetInfoOptions._Fields>, java.io.Serializable, Cloneable {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("GetInfoOptions");
+
+ private static final org.apache.thrift.protocol.TField NUM_ERR_CHOICE_FIELD_DESC = new org.apache.thrift.protocol.TField("num_err_choice", org.apache.thrift.protocol.TType.I32, (short)1);
+
+ private NumErrorsChoice num_err_choice; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ /**
+ *
+ * @see NumErrorsChoice
+ */
+ NUM_ERR_CHOICE((short)1, "num_err_choice");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // NUM_ERR_CHOICE
+ return NUM_ERR_CHOICE;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.NUM_ERR_CHOICE, new org.apache.thrift.meta_data.FieldMetaData("num_err_choice", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, NumErrorsChoice.class)));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(GetInfoOptions.class, metaDataMap);
+ }
+
+ public GetInfoOptions() {
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public GetInfoOptions(GetInfoOptions other) {
+ if (other.is_set_num_err_choice()) {
+ this.num_err_choice = other.num_err_choice;
+ }
+ }
+
+ public GetInfoOptions deepCopy() {
+ return new GetInfoOptions(this);
+ }
+
+ @Override
+ public void clear() {
+ this.num_err_choice = null;
+ }
+
+ /**
+ *
+ * @see NumErrorsChoice
+ */
+ public NumErrorsChoice get_num_err_choice() {
+ return this.num_err_choice;
+ }
+
+ /**
+ *
+ * @see NumErrorsChoice
+ */
+ public void set_num_err_choice(NumErrorsChoice num_err_choice) {
+ this.num_err_choice = num_err_choice;
+ }
+
+ public void unset_num_err_choice() {
+ this.num_err_choice = null;
+ }
+
+ /** Returns true if field num_err_choice is set (has been assigned a value) and false otherwise */
+ public boolean is_set_num_err_choice() {
+ return this.num_err_choice != null;
+ }
+
+ public void set_num_err_choice_isSet(boolean value) {
+ if (!value) {
+ this.num_err_choice = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case NUM_ERR_CHOICE:
+ if (value == null) {
+ unset_num_err_choice();
+ } else {
+ set_num_err_choice((NumErrorsChoice)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case NUM_ERR_CHOICE:
+ return get_num_err_choice();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case NUM_ERR_CHOICE:
+ return is_set_num_err_choice();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof GetInfoOptions)
+ return this.equals((GetInfoOptions)that);
+ return false;
+ }
+
+ public boolean equals(GetInfoOptions that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_num_err_choice = true && this.is_set_num_err_choice();
+ boolean that_present_num_err_choice = true && that.is_set_num_err_choice();
+ if (this_present_num_err_choice || that_present_num_err_choice) {
+ if (!(this_present_num_err_choice && that_present_num_err_choice))
+ return false;
+ if (!this.num_err_choice.equals(that.num_err_choice))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ HashCodeBuilder builder = new HashCodeBuilder();
+
+ boolean present_num_err_choice = true && (is_set_num_err_choice());
+ builder.append(present_num_err_choice);
+ if (present_num_err_choice)
+ builder.append(num_err_choice.getValue());
+
+ return builder.toHashCode();
+ }
+
+ public int compareTo(GetInfoOptions other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+ GetInfoOptions typedOther = (GetInfoOptions)other;
+
+ lastComparison = Boolean.valueOf(is_set_num_err_choice()).compareTo(typedOther.is_set_num_err_choice());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_num_err_choice()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.num_err_choice, typedOther.num_err_choice);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField field;
+ iprot.readStructBegin();
+ while (true)
+ {
+ field = iprot.readFieldBegin();
+ if (field.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (field.id) {
+ case 1: // NUM_ERR_CHOICE
+ if (field.type == org.apache.thrift.protocol.TType.I32) {
+ this.num_err_choice = NumErrorsChoice.findByValue(iprot.readI32());
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+ validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (this.num_err_choice != null) {
+ if (is_set_num_err_choice()) {
+ oprot.writeFieldBegin(NUM_ERR_CHOICE_FIELD_DESC);
+ oprot.writeI32(this.num_err_choice.getValue());
+ oprot.writeFieldEnd();
+ }
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("GetInfoOptions(");
+ boolean first = true;
+
+ if (is_set_num_err_choice()) {
+ sb.append("num_err_choice:");
+ if (this.num_err_choice == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.num_err_choice);
+ }
+ first = false;
+ }
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/storm/blob/1cfa190f/storm-core/src/jvm/backtype/storm/generated/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/Nimbus.java b/storm-core/src/jvm/backtype/storm/generated/Nimbus.java
index 7d04901..ef90dbb 100644
--- a/storm-core/src/jvm/backtype/storm/generated/Nimbus.java
+++ b/storm-core/src/jvm/backtype/storm/generated/Nimbus.java
@@ -74,6 +74,8 @@ public class Nimbus {
public TopologyInfo getTopologyInfo(String id) throws NotAliveException, AuthorizationException, org.apache.thrift.TException;
+ public TopologyInfo getTopologyInfoWithOpts(String id, GetInfoOptions options) throws NotAliveException, AuthorizationException, org.apache.thrift.TException;
+
public String getTopologyConf(String id) throws NotAliveException, AuthorizationException, org.apache.thrift.TException;
public StormTopology getTopology(String id) throws NotAliveException, AuthorizationException, org.apache.thrift.TException;
@@ -116,6 +118,8 @@ public class Nimbus {
public void getTopologyInfo(String id, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.getTopologyInfo_call> resultHandler) throws org.apache.thrift.TException;
+ public void getTopologyInfoWithOpts(String id, GetInfoOptions options, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.getTopologyInfoWithOpts_call> resultHandler) throws org.apache.thrift.TException;
+
public void getTopologyConf(String id, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.getTopologyConf_call> resultHandler) throws org.apache.thrift.TException;
public void getTopology(String id, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.getTopology_call> resultHandler) throws org.apache.thrift.TException;
@@ -577,6 +581,36 @@ public class Nimbus {
throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "getTopologyInfo failed: unknown result");
}
+ public TopologyInfo getTopologyInfoWithOpts(String id, GetInfoOptions options) throws NotAliveException, AuthorizationException, org.apache.thrift.TException
+ {
+ send_getTopologyInfoWithOpts(id, options);
+ return recv_getTopologyInfoWithOpts();
+ }
+
+ public void send_getTopologyInfoWithOpts(String id, GetInfoOptions options) throws org.apache.thrift.TException
+ {
+ getTopologyInfoWithOpts_args args = new getTopologyInfoWithOpts_args();
+ args.set_id(id);
+ args.set_options(options);
+ sendBase("getTopologyInfoWithOpts", args);
+ }
+
+ public TopologyInfo recv_getTopologyInfoWithOpts() throws NotAliveException, AuthorizationException, org.apache.thrift.TException
+ {
+ getTopologyInfoWithOpts_result result = new getTopologyInfoWithOpts_result();
+ receiveBase(result, "getTopologyInfoWithOpts");
+ if (result.is_set_success()) {
+ return result.success;
+ }
+ if (result.e != null) {
+ throw result.e;
+ }
+ if (result.aze != null) {
+ throw result.aze;
+ }
+ throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "getTopologyInfoWithOpts failed: unknown result");
+ }
+
public String getTopologyConf(String id) throws NotAliveException, AuthorizationException, org.apache.thrift.TException
{
send_getTopologyConf(id);
@@ -1218,6 +1252,41 @@ public class Nimbus {
}
}
+ public void getTopologyInfoWithOpts(String id, GetInfoOptions options, org.apache.thrift.async.AsyncMethodCallback<getTopologyInfoWithOpts_call> resultHandler) throws org.apache.thrift.TException {
+ checkReady();
+ getTopologyInfoWithOpts_call method_call = new getTopologyInfoWithOpts_call(id, options, resultHandler, this, ___protocolFactory, ___transport);
+ this.___currentMethod = method_call;
+ ___manager.call(method_call);
+ }
+
+ public static class getTopologyInfoWithOpts_call extends org.apache.thrift.async.TAsyncMethodCall {
+ private String id;
+ private GetInfoOptions options;
+ public getTopologyInfoWithOpts_call(String id, GetInfoOptions options, org.apache.thrift.async.AsyncMethodCallback<getTopologyInfoWithOpts_call> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+ super(client, protocolFactory, transport, resultHandler, false);
+ this.id = id;
+ this.options = options;
+ }
+
+ public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
+ prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("getTopologyInfoWithOpts", org.apache.thrift.protocol.TMessageType.CALL, 0));
+ getTopologyInfoWithOpts_args args = new getTopologyInfoWithOpts_args();
+ args.set_id(id);
+ args.set_options(options);
+ args.write(prot);
+ prot.writeMessageEnd();
+ }
+
+ public TopologyInfo getResult() throws NotAliveException, AuthorizationException, org.apache.thrift.TException {
+ if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
+ throw new IllegalStateException("Method call not finished!");
+ }
+ org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
+ org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
+ return (new Client(prot)).recv_getTopologyInfoWithOpts();
+ }
+ }
+
public void getTopologyConf(String id, org.apache.thrift.async.AsyncMethodCallback<getTopologyConf_call> resultHandler) throws org.apache.thrift.TException {
checkReady();
getTopologyConf_call method_call = new getTopologyConf_call(id, resultHandler, this, ___protocolFactory, ___transport);
@@ -1343,6 +1412,7 @@ public class Nimbus {
processMap.put("getNimbusConf", new getNimbusConf());
processMap.put("getClusterInfo", new getClusterInfo());
processMap.put("getTopologyInfo", new getTopologyInfo());
+ processMap.put("getTopologyInfoWithOpts", new getTopologyInfoWithOpts());
processMap.put("getTopologyConf", new getTopologyConf());
processMap.put("getTopology", new getTopology());
processMap.put("getUserTopology", new getUserTopology());
@@ -1695,6 +1765,28 @@ public class Nimbus {
}
}
+ private static class getTopologyInfoWithOpts<I extends Iface> extends org.apache.thrift.ProcessFunction<I, getTopologyInfoWithOpts_args> {
+ public getTopologyInfoWithOpts() {
+ super("getTopologyInfoWithOpts");
+ }
+
+ protected getTopologyInfoWithOpts_args getEmptyArgsInstance() {
+ return new getTopologyInfoWithOpts_args();
+ }
+
+ protected getTopologyInfoWithOpts_result getResult(I iface, getTopologyInfoWithOpts_args args) throws org.apache.thrift.TException {
+ getTopologyInfoWithOpts_result result = new getTopologyInfoWithOpts_result();
+ try {
+ result.success = iface.getTopologyInfoWithOpts(args.id, args.options);
+ } catch (NotAliveException e) {
+ result.e = e;
+ } catch (AuthorizationException aze) {
+ result.aze = aze;
+ }
+ return result;
+ }
+ }
+
private static class getTopologyConf<I extends Iface> extends org.apache.thrift.ProcessFunction<I, getTopologyConf_args> {
public getTopologyConf() {
super("getTopologyConf");
@@ -13910,6 +14002,888 @@ public class Nimbus {
}
+ public static class getTopologyInfoWithOpts_args implements org.apache.thrift.TBase<getTopologyInfoWithOpts_args, getTopologyInfoWithOpts_args._Fields>, java.io.Serializable, Cloneable {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("getTopologyInfoWithOpts_args");
+
+ private static final org.apache.thrift.protocol.TField ID_FIELD_DESC = new org.apache.thrift.protocol.TField("id", org.apache.thrift.protocol.TType.STRING, (short)1);
+ private static final org.apache.thrift.protocol.TField OPTIONS_FIELD_DESC = new org.apache.thrift.protocol.TField("options", org.apache.thrift.protocol.TType.STRUCT, (short)2);
+
+ private String id; // required
+ private GetInfoOptions options; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ ID((short)1, "id"),
+ OPTIONS((short)2, "options");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // ID
+ return ID;
+ case 2: // OPTIONS
+ return OPTIONS;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.ID, new org.apache.thrift.meta_data.FieldMetaData("id", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.OPTIONS, new org.apache.thrift.meta_data.FieldMetaData("options", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, GetInfoOptions.class)));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(getTopologyInfoWithOpts_args.class, metaDataMap);
+ }
+
+ public getTopologyInfoWithOpts_args() {
+ }
+
+ public getTopologyInfoWithOpts_args(
+ String id,
+ GetInfoOptions options)
+ {
+ this();
+ this.id = id;
+ this.options = options;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public getTopologyInfoWithOpts_args(getTopologyInfoWithOpts_args other) {
+ if (other.is_set_id()) {
+ this.id = other.id;
+ }
+ if (other.is_set_options()) {
+ this.options = new GetInfoOptions(other.options);
+ }
+ }
+
+ public getTopologyInfoWithOpts_args deepCopy() {
+ return new getTopologyInfoWithOpts_args(this);
+ }
+
+ @Override
+ public void clear() {
+ this.id = null;
+ this.options = null;
+ }
+
+ public String get_id() {
+ return this.id;
+ }
+
+ public void set_id(String id) {
+ this.id = id;
+ }
+
+ public void unset_id() {
+ this.id = null;
+ }
+
+ /** Returns true if field id is set (has been assigned a value) and false otherwise */
+ public boolean is_set_id() {
+ return this.id != null;
+ }
+
+ public void set_id_isSet(boolean value) {
+ if (!value) {
+ this.id = null;
+ }
+ }
+
+ public GetInfoOptions get_options() {
+ return this.options;
+ }
+
+ public void set_options(GetInfoOptions options) {
+ this.options = options;
+ }
+
+ public void unset_options() {
+ this.options = null;
+ }
+
+ /** Returns true if field options is set (has been assigned a value) and false otherwise */
+ public boolean is_set_options() {
+ return this.options != null;
+ }
+
+ public void set_options_isSet(boolean value) {
+ if (!value) {
+ this.options = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case ID:
+ if (value == null) {
+ unset_id();
+ } else {
+ set_id((String)value);
+ }
+ break;
+
+ case OPTIONS:
+ if (value == null) {
+ unset_options();
+ } else {
+ set_options((GetInfoOptions)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case ID:
+ return get_id();
+
+ case OPTIONS:
+ return get_options();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case ID:
+ return is_set_id();
+ case OPTIONS:
+ return is_set_options();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof getTopologyInfoWithOpts_args)
+ return this.equals((getTopologyInfoWithOpts_args)that);
+ return false;
+ }
+
+ public boolean equals(getTopologyInfoWithOpts_args that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_id = true && this.is_set_id();
+ boolean that_present_id = true && that.is_set_id();
+ if (this_present_id || that_present_id) {
+ if (!(this_present_id && that_present_id))
+ return false;
+ if (!this.id.equals(that.id))
+ return false;
+ }
+
+ boolean this_present_options = true && this.is_set_options();
+ boolean that_present_options = true && that.is_set_options();
+ if (this_present_options || that_present_options) {
+ if (!(this_present_options && that_present_options))
+ return false;
+ if (!this.options.equals(that.options))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ HashCodeBuilder builder = new HashCodeBuilder();
+
+ boolean present_id = true && (is_set_id());
+ builder.append(present_id);
+ if (present_id)
+ builder.append(id);
+
+ boolean present_options = true && (is_set_options());
+ builder.append(present_options);
+ if (present_options)
+ builder.append(options);
+
+ return builder.toHashCode();
+ }
+
+ public int compareTo(getTopologyInfoWithOpts_args other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+ getTopologyInfoWithOpts_args typedOther = (getTopologyInfoWithOpts_args)other;
+
+ lastComparison = Boolean.valueOf(is_set_id()).compareTo(typedOther.is_set_id());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_id()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.id, typedOther.id);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(is_set_options()).compareTo(typedOther.is_set_options());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_options()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.options, typedOther.options);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField field;
+ iprot.readStructBegin();
+ while (true)
+ {
+ field = iprot.readFieldBegin();
+ if (field.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (field.id) {
+ case 1: // ID
+ if (field.type == org.apache.thrift.protocol.TType.STRING) {
+ this.id = iprot.readString();
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ case 2: // OPTIONS
+ if (field.type == org.apache.thrift.protocol.TType.STRUCT) {
+ this.options = new GetInfoOptions();
+ this.options.read(iprot);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+ validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (this.id != null) {
+ oprot.writeFieldBegin(ID_FIELD_DESC);
+ oprot.writeString(this.id);
+ oprot.writeFieldEnd();
+ }
+ if (this.options != null) {
+ oprot.writeFieldBegin(OPTIONS_FIELD_DESC);
+ this.options.write(oprot);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("getTopologyInfoWithOpts_args(");
+ boolean first = true;
+
+ sb.append("id:");
+ if (this.id == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.id);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("options:");
+ if (this.options == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.options);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ }
+
+ public static class getTopologyInfoWithOpts_result implements org.apache.thrift.TBase<getTopologyInfoWithOpts_result, getTopologyInfoWithOpts_result._Fields>, java.io.Serializable, Cloneable {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("getTopologyInfoWithOpts_result");
+
+ private static final org.apache.thrift.protocol.TField SUCCESS_FIELD_DESC = new org.apache.thrift.protocol.TField("success", org.apache.thrift.protocol.TType.STRUCT, (short)0);
+ private static final org.apache.thrift.protocol.TField E_FIELD_DESC = new org.apache.thrift.protocol.TField("e", org.apache.thrift.protocol.TType.STRUCT, (short)1);
+ private static final org.apache.thrift.protocol.TField AZE_FIELD_DESC = new org.apache.thrift.protocol.TField("aze", org.apache.thrift.protocol.TType.STRUCT, (short)2);
+
+ private TopologyInfo success; // required
+ private NotAliveException e; // required
+ private AuthorizationException aze; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ SUCCESS((short)0, "success"),
+ E((short)1, "e"),
+ AZE((short)2, "aze");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 0: // SUCCESS
+ return SUCCESS;
+ case 1: // E
+ return E;
+ case 2: // AZE
+ return AZE;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.SUCCESS, new org.apache.thrift.meta_data.FieldMetaData("success", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, TopologyInfo.class)));
+ tmpMap.put(_Fields.E, new org.apache.thrift.meta_data.FieldMetaData("e", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRUCT)));
+ tmpMap.put(_Fields.AZE, new org.apache.thrift.meta_data.FieldMetaData("aze", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRUCT)));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(getTopologyInfoWithOpts_result.class, metaDataMap);
+ }
+
+ public getTopologyInfoWithOpts_result() {
+ }
+
+ public getTopologyInfoWithOpts_result(
+ TopologyInfo success,
+ NotAliveException e,
+ AuthorizationException aze)
+ {
+ this();
+ this.success = success;
+ this.e = e;
+ this.aze = aze;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public getTopologyInfoWithOpts_result(getTopologyInfoWithOpts_result other) {
+ if (other.is_set_success()) {
+ this.success = new TopologyInfo(other.success);
+ }
+ if (other.is_set_e()) {
+ this.e = new NotAliveException(other.e);
+ }
+ if (other.is_set_aze()) {
+ this.aze = new AuthorizationException(other.aze);
+ }
+ }
+
+ public getTopologyInfoWithOpts_result deepCopy() {
+ return new getTopologyInfoWithOpts_result(this);
+ }
+
+ @Override
+ public void clear() {
+ this.success = null;
+ this.e = null;
+ this.aze = null;
+ }
+
+ public TopologyInfo get_success() {
+ return this.success;
+ }
+
+ public void set_success(TopologyInfo success) {
+ this.success = success;
+ }
+
+ public void unset_success() {
+ this.success = null;
+ }
+
+ /** Returns true if field success is set (has been assigned a value) and false otherwise */
+ public boolean is_set_success() {
+ return this.success != null;
+ }
+
+ public void set_success_isSet(boolean value) {
+ if (!value) {
+ this.success = null;
+ }
+ }
+
+ public NotAliveException get_e() {
+ return this.e;
+ }
+
+ public void set_e(NotAliveException e) {
+ this.e = e;
+ }
+
+ public void unset_e() {
+ this.e = null;
+ }
+
+ /** Returns true if field e is set (has been assigned a value) and false otherwise */
+ public boolean is_set_e() {
+ return this.e != null;
+ }
+
+ public void set_e_isSet(boolean value) {
+ if (!value) {
+ this.e = null;
+ }
+ }
+
+ public AuthorizationException get_aze() {
+ return this.aze;
+ }
+
+ public void set_aze(AuthorizationException aze) {
+ this.aze = aze;
+ }
+
+ public void unset_aze() {
+ this.aze = null;
+ }
+
+ /** Returns true if field aze is set (has been assigned a value) and false otherwise */
+ public boolean is_set_aze() {
+ return this.aze != null;
+ }
+
+ public void set_aze_isSet(boolean value) {
+ if (!value) {
+ this.aze = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case SUCCESS:
+ if (value == null) {
+ unset_success();
+ } else {
+ set_success((TopologyInfo)value);
+ }
+ break;
+
+ case E:
+ if (value == null) {
+ unset_e();
+ } else {
+ set_e((NotAliveException)value);
+ }
+ break;
+
+ case AZE:
+ if (value == null) {
+ unset_aze();
+ } else {
+ set_aze((AuthorizationException)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case SUCCESS:
+ return get_success();
+
+ case E:
+ return get_e();
+
+ case AZE:
+ return get_aze();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case SUCCESS:
+ return is_set_success();
+ case E:
+ return is_set_e();
+ case AZE:
+ return is_set_aze();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof getTopologyInfoWithOpts_result)
+ return this.equals((getTopologyInfoWithOpts_result)that);
+ return false;
+ }
+
+ public boolean equals(getTopologyInfoWithOpts_result that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_success = true && this.is_set_success();
+ boolean that_present_success = true && that.is_set_success();
+ if (this_present_success || that_present_success) {
+ if (!(this_present_success && that_present_success))
+ return false;
+ if (!this.success.equals(that.success))
+ return false;
+ }
+
+ boolean this_present_e = true && this.is_set_e();
+ boolean that_present_e = true && that.is_set_e();
+ if (this_present_e || that_present_e) {
+ if (!(this_present_e && that_present_e))
+ return false;
+ if (!this.e.equals(that.e))
+ return false;
+ }
+
+ boolean this_present_aze = true && this.is_set_aze();
+ boolean that_present_aze = true && that.is_set_aze();
+ if (this_present_aze || that_present_aze) {
+ if (!(this_present_aze && that_present_aze))
+ return false;
+ if (!this.aze.equals(that.aze))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ HashCodeBuilder builder = new HashCodeBuilder();
+
+ boolean present_success = true && (is_set_success());
+ builder.append(present_success);
+ if (present_success)
+ builder.append(success);
+
+ boolean present_e = true && (is_set_e());
+ builder.append(present_e);
+ if (present_e)
+ builder.append(e);
+
+ boolean present_aze = true && (is_set_aze());
+ builder.append(present_aze);
+ if (present_aze)
+ builder.append(aze);
+
+ return builder.toHashCode();
+ }
+
+ public int compareTo(getTopologyInfoWithOpts_result other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+ getTopologyInfoWithOpts_result typedOther = (getTopologyInfoWithOpts_result)other;
+
+ lastComparison = Boolean.valueOf(is_set_success()).compareTo(typedOther.is_set_success());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_success()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.success, typedOther.success);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(is_set_e()).compareTo(typedOther.is_set_e());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_e()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.e, typedOther.e);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(is_set_aze()).compareTo(typedOther.is_set_aze());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_aze()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.aze, typedOther.aze);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField field;
+ iprot.readStructBegin();
+ while (true)
+ {
+ field = iprot.readFieldBegin();
+ if (field.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (field.id) {
+ case 0: // SUCCESS
+ if (field.type == org.apache.thrift.protocol.TType.STRUCT) {
+ this.success = new TopologyInfo();
+ this.success.read(iprot);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ case 1: // E
+ if (field.type == org.apache.thrift.protocol.TType.STRUCT) {
+ this.e = new NotAliveException();
+ this.e.read(iprot);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ case 2: // AZE
+ if (field.type == org.apache.thrift.protocol.TType.STRUCT) {
+ this.aze = new AuthorizationException();
+ this.aze.read(iprot);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+ validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ oprot.writeStructBegin(STRUCT_DESC);
+
+ if (this.is_set_success()) {
+ oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
+ this.success.write(oprot);
+ oprot.writeFieldEnd();
+ } else if (this.is_set_e()) {
+ oprot.writeFieldBegin(E_FIELD_DESC);
+ this.e.write(oprot);
+ oprot.writeFieldEnd();
+ } else if (this.is_set_aze()) {
+ oprot.writeFieldBegin(AZE_FIELD_DESC);
+ this.aze.write(oprot);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("getTopologyInfoWithOpts_result(");
+ boolean first = true;
+
+ sb.append("success:");
+ if (this.success == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.success);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("e:");
+ if (this.e == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.e);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("aze:");
+ if (this.aze == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.aze);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ }
+
public static class getTopologyConf_args implements org.apache.thrift.TBase<getTopologyConf_args, getTopologyConf_args._Fields>, java.io.Serializable, Cloneable {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("getTopologyConf_args");
http://git-wip-us.apache.org/repos/asf/storm/blob/1cfa190f/storm-core/src/jvm/backtype/storm/generated/NumErrorsChoice.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/NumErrorsChoice.java b/storm-core/src/jvm/backtype/storm/generated/NumErrorsChoice.java
new file mode 100644
index 0000000..b0548b0
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/generated/NumErrorsChoice.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.7.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package backtype.storm.generated;
+
+
+import java.util.Map;
+import java.util.HashMap;
+import org.apache.thrift.TEnum;
+
+public enum NumErrorsChoice implements org.apache.thrift.TEnum {
+ ALL(0),
+ NONE(1),
+ ONE(2);
+
+ private final int value;
+
+ private NumErrorsChoice(int value) {
+ this.value = value;
+ }
+
+ /**
+ * Get the integer value of this enum value, as defined in the Thrift IDL.
+ */
+ public int getValue() {
+ return value;
+ }
+
+ /**
+ * Find a the enum type by its integer value, as defined in the Thrift IDL.
+ * @return null if the value is not found.
+ */
+ public static NumErrorsChoice findByValue(int value) {
+ switch (value) {
+ case 0:
+ return ALL;
+ case 1:
+ return NONE;
+ case 2:
+ return ONE;
+ default:
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/1cfa190f/storm-core/src/jvm/backtype/storm/utils/Monitor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/Monitor.java b/storm-core/src/jvm/backtype/storm/utils/Monitor.java
index 43ddcf8..36fedc4 100644
--- a/storm-core/src/jvm/backtype/storm/utils/Monitor.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Monitor.java
@@ -111,7 +111,9 @@ public class Monitor {
throw new IllegalArgumentException("topology: " + topology + " not found");
} else {
String id = topologySummary.get_id();
- TopologyInfo info = client.getTopologyInfo(id);
+ GetInfoOptions getInfoOpts = new GetInfoOptions();
+ getInfoOpts.set_num_err_choice(NumErrorsChoice.NONE);
+ TopologyInfo info = client.getTopologyInfoWithOpts(id, getInfoOpts);
for (ExecutorSummary es: info.get_executors()) {
components.add(es.get_component_id());
}
@@ -182,7 +184,9 @@ public class Monitor {
throw new IllegalArgumentException("topology: " + _topology + " not found");
} else {
String id = topologySummary.get_id();
- TopologyInfo info = client.getTopologyInfo(id);
+ GetInfoOptions getInfoOpts = new GetInfoOptions();
+ getInfoOpts.set_num_err_choice(NumErrorsChoice.NONE);
+ TopologyInfo info = client.getTopologyInfoWithOpts(id, getInfoOpts);
for (ExecutorSummary es: info.get_executors()) {
if (_component.equals(es.get_component_id())) {
componentParallelism ++;
http://git-wip-us.apache.org/repos/asf/storm/blob/1cfa190f/storm-core/src/py/storm/DistributedRPC-remote
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/DistributedRPC-remote b/storm-core/src/py/storm/DistributedRPC-remote
old mode 100755
new mode 100644
http://git-wip-us.apache.org/repos/asf/storm/blob/1cfa190f/storm-core/src/py/storm/DistributedRPCInvocations-remote
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/DistributedRPCInvocations-remote b/storm-core/src/py/storm/DistributedRPCInvocations-remote
old mode 100755
new mode 100644
http://git-wip-us.apache.org/repos/asf/storm/blob/1cfa190f/storm-core/src/py/storm/Nimbus-remote
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/Nimbus-remote b/storm-core/src/py/storm/Nimbus-remote
old mode 100755
new mode 100644
index efd9220..2e5e41a
--- a/storm-core/src/py/storm/Nimbus-remote
+++ b/storm-core/src/py/storm/Nimbus-remote
@@ -55,6 +55,7 @@ if len(sys.argv) <= 1 or sys.argv[1] == '--help':
print ' string getNimbusConf()'
print ' ClusterSummary getClusterInfo()'
print ' TopologyInfo getTopologyInfo(string id)'
+ print ' TopologyInfo getTopologyInfoWithOpts(string id, GetInfoOptions options)'
print ' string getTopologyConf(string id)'
print ' StormTopology getTopology(string id)'
print ' StormTopology getUserTopology(string id)'
@@ -204,6 +205,12 @@ elif cmd == 'getTopologyInfo':
sys.exit(1)
pp.pprint(client.getTopologyInfo(args[0],))
+elif cmd == 'getTopologyInfoWithOpts':
+ if len(args) != 2:
+ print 'getTopologyInfoWithOpts requires 2 args'
+ sys.exit(1)
+ pp.pprint(client.getTopologyInfoWithOpts(args[0],eval(args[1]),))
+
elif cmd == 'getTopologyConf':
if len(args) != 1:
print 'getTopologyConf requires 1 args'
http://git-wip-us.apache.org/repos/asf/storm/blob/1cfa190f/storm-core/src/py/storm/Nimbus.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/Nimbus.py b/storm-core/src/py/storm/Nimbus.py
index a2ee892..271780b 100644
--- a/storm-core/src/py/storm/Nimbus.py
+++ b/storm-core/src/py/storm/Nimbus.py
@@ -143,6 +143,14 @@ class Iface:
"""
pass
+ def getTopologyInfoWithOpts(self, id, options):
+ """
+ Parameters:
+ - id
+ - options
+ """
+ pass
+
def getTopologyConf(self, id):
"""
Parameters:
@@ -697,6 +705,42 @@ class Client(Iface):
raise result.aze
raise TApplicationException(TApplicationException.MISSING_RESULT, "getTopologyInfo failed: unknown result");
+ def getTopologyInfoWithOpts(self, id, options):
+ """
+ Parameters:
+ - id
+ - options
+ """
+ self.send_getTopologyInfoWithOpts(id, options)
+ return self.recv_getTopologyInfoWithOpts()
+
+ def send_getTopologyInfoWithOpts(self, id, options):
+ self._oprot.writeMessageBegin('getTopologyInfoWithOpts', TMessageType.CALL, self._seqid)
+ args = getTopologyInfoWithOpts_args()
+ args.id = id
+ args.options = options
+ args.write(self._oprot)
+ self._oprot.writeMessageEnd()
+ self._oprot.trans.flush()
+
+ def recv_getTopologyInfoWithOpts(self, ):
+ (fname, mtype, rseqid) = self._iprot.readMessageBegin()
+ if mtype == TMessageType.EXCEPTION:
+ x = TApplicationException()
+ x.read(self._iprot)
+ self._iprot.readMessageEnd()
+ raise x
+ result = getTopologyInfoWithOpts_result()
+ result.read(self._iprot)
+ self._iprot.readMessageEnd()
+ if result.success is not None:
+ return result.success
+ if result.e is not None:
+ raise result.e
+ if result.aze is not None:
+ raise result.aze
+ raise TApplicationException(TApplicationException.MISSING_RESULT, "getTopologyInfoWithOpts failed: unknown result");
+
def getTopologyConf(self, id):
"""
Parameters:
@@ -820,6 +864,7 @@ class Processor(Iface, TProcessor):
self._processMap["getNimbusConf"] = Processor.process_getNimbusConf
self._processMap["getClusterInfo"] = Processor.process_getClusterInfo
self._processMap["getTopologyInfo"] = Processor.process_getTopologyInfo
+ self._processMap["getTopologyInfoWithOpts"] = Processor.process_getTopologyInfoWithOpts
self._processMap["getTopologyConf"] = Processor.process_getTopologyConf
self._processMap["getTopology"] = Processor.process_getTopology
self._processMap["getUserTopology"] = Processor.process_getUserTopology
@@ -1089,6 +1134,22 @@ class Processor(Iface, TProcessor):
oprot.writeMessageEnd()
oprot.trans.flush()
+ def process_getTopologyInfoWithOpts(self, seqid, iprot, oprot):
+ args = getTopologyInfoWithOpts_args()
+ args.read(iprot)
+ iprot.readMessageEnd()
+ result = getTopologyInfoWithOpts_result()
+ try:
+ result.success = self._handler.getTopologyInfoWithOpts(args.id, args.options)
+ except NotAliveException, e:
+ result.e = e
+ except AuthorizationException, aze:
+ result.aze = aze
+ oprot.writeMessageBegin("getTopologyInfoWithOpts", TMessageType.REPLY, seqid)
+ result.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
+
def process_getTopologyConf(self, seqid, iprot, oprot):
args = getTopologyConf_args()
args.read(iprot)
@@ -3493,6 +3554,171 @@ class getTopologyInfo_result:
def __ne__(self, other):
return not (self == other)
+class getTopologyInfoWithOpts_args:
+ """
+ Attributes:
+ - id
+ - options
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.STRING, 'id', None, None, ), # 1
+ (2, TType.STRUCT, 'options', (GetInfoOptions, GetInfoOptions.thrift_spec), None, ), # 2
+ )
+
+ def __hash__(self):
+ return 0 + hash(self.id) + hash(self.options)
+
+ def __init__(self, id=None, options=None,):
+ self.id = id
+ self.options = options
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.STRING:
+ self.id = iprot.readString().decode('utf-8')
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.STRUCT:
+ self.options = GetInfoOptions()
+ self.options.read(iprot)
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('getTopologyInfoWithOpts_args')
+ if self.id is not None:
+ oprot.writeFieldBegin('id', TType.STRING, 1)
+ oprot.writeString(self.id.encode('utf-8'))
+ oprot.writeFieldEnd()
+ if self.options is not None:
+ oprot.writeFieldBegin('options', TType.STRUCT, 2)
+ self.options.write(oprot)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ return
+
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class getTopologyInfoWithOpts_result:
+ """
+ Attributes:
+ - success
+ - e
+ - aze
+ """
+
+ thrift_spec = (
+ (0, TType.STRUCT, 'success', (TopologyInfo, TopologyInfo.thrift_spec), None, ), # 0
+ (1, TType.STRUCT, 'e', (NotAliveException, NotAliveException.thrift_spec), None, ), # 1
+ (2, TType.STRUCT, 'aze', (AuthorizationException, AuthorizationException.thrift_spec), None, ), # 2
+ )
+
+ def __hash__(self):
+ return 0 + hash(self.success) + hash(self.e) + hash(self.aze)
+
+ def __init__(self, success=None, e=None, aze=None,):
+ self.success = success
+ self.e = e
+ self.aze = aze
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 0:
+ if ftype == TType.STRUCT:
+ self.success = TopologyInfo()
+ self.success.read(iprot)
+ else:
+ iprot.skip(ftype)
+ elif fid == 1:
+ if ftype == TType.STRUCT:
+ self.e = NotAliveException()
+ self.e.read(iprot)
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.STRUCT:
+ self.aze = AuthorizationException()
+ self.aze.read(iprot)
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('getTopologyInfoWithOpts_result')
+ if self.success is not None:
+ oprot.writeFieldBegin('success', TType.STRUCT, 0)
+ self.success.write(oprot)
+ oprot.writeFieldEnd()
+ if self.e is not None:
+ oprot.writeFieldBegin('e', TType.STRUCT, 1)
+ self.e.write(oprot)
+ oprot.writeFieldEnd()
+ if self.aze is not None:
+ oprot.writeFieldBegin('aze', TType.STRUCT, 2)
+ self.aze.write(oprot)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ return
+
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
class getTopologyConf_args:
"""
Attributes:
http://git-wip-us.apache.org/repos/asf/storm/blob/1cfa190f/storm-core/src/py/storm/ttypes.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/ttypes.py b/storm-core/src/py/storm/ttypes.py
index 1bbaf37..46e7a92 100644
--- a/storm-core/src/py/storm/ttypes.py
+++ b/storm-core/src/py/storm/ttypes.py
@@ -44,6 +44,23 @@ class TopologyInitialStatus:
"INACTIVE": 2,
}
+class NumErrorsChoice:
+ ALL = 0
+ NONE = 1
+ ONE = 2
+
+ _VALUES_TO_NAMES = {
+ 0: "ALL",
+ 1: "NONE",
+ 2: "ONE",
+ }
+
+ _NAMES_TO_VALUES = {
+ "ALL": 0,
+ "NONE": 1,
+ "ONE": 2,
+ }
+
class JavaObjectArg:
"""
@@ -4383,6 +4400,69 @@ class SubmitOptions:
def __ne__(self, other):
return not (self == other)
+class GetInfoOptions:
+ """
+ Attributes:
+ - num_err_choice
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.I32, 'num_err_choice', None, None, ), # 1
+ )
+
+ def __hash__(self):
+ return 0 + hash(self.num_err_choice)
+
+ def __init__(self, num_err_choice=None,):
+ self.num_err_choice = num_err_choice
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.I32:
+ self.num_err_choice = iprot.readI32();
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('GetInfoOptions')
+ if self.num_err_choice is not None:
+ oprot.writeFieldBegin('num_err_choice', TType.I32, 1)
+ oprot.writeI32(self.num_err_choice)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ return
+
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
class DRPCRequest:
"""
Attributes:
http://git-wip-us.apache.org/repos/asf/storm/blob/1cfa190f/storm-core/src/storm.thrift
----------------------------------------------------------------------
diff --git a/storm-core/src/storm.thrift b/storm-core/src/storm.thrift
index f807b74..066cb4f 100644
--- a/storm-core/src/storm.thrift
+++ b/storm-core/src/storm.thrift
@@ -243,6 +243,16 @@ struct SubmitOptions {
2: optional Credentials creds;
}
+enum NumErrorsChoice {
+ ALL,
+ NONE,
+ ONE
+}
+
+struct GetInfoOptions {
+ 1: optional NumErrorsChoice num_err_choice;
+}
+
service Nimbus {
void submitTopology(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite, 3: AuthorizationException aze);
void submitTopologyWithOpts(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology, 5: SubmitOptions options) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite, 3: AuthorizationException aze);
@@ -268,6 +278,7 @@ service Nimbus {
// stats functions
ClusterSummary getClusterInfo() throws (1: AuthorizationException aze);
TopologyInfo getTopologyInfo(1: string id) throws (1: NotAliveException e, 2: AuthorizationException aze);
+ TopologyInfo getTopologyInfoWithOpts(1: string id, 2: GetInfoOptions options) throws (1: NotAliveException e, 2: AuthorizationException aze);
//returns json
string getTopologyConf(1: string id) throws (1: NotAliveException e, 2: AuthorizationException aze);
StormTopology getTopology(1: string id) throws (1: NotAliveException e, 2: AuthorizationException aze);
http://git-wip-us.apache.org/repos/asf/storm/blob/1cfa190f/storm-core/test/clj/backtype/storm/integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/integration_test.clj b/storm-core/test/clj/backtype/storm/integration_test.clj
index d96bd90..b192253 100644
--- a/storm-core/test/clj/backtype/storm/integration_test.clj
+++ b/storm-core/test/clj/backtype/storm/integration_test.clj
@@ -570,28 +570,34 @@
(:topology tracked))
storm-id (get-storm-id state "test-errors")
errors-count (fn [] (count (.errors state storm-id "2")))]
+
+ (is (nil? (.last-error state storm-id "2")))
+
;; so it launches the topology
(advance-cluster-time cluster 2)
(.feed feeder [6])
(tracked-wait tracked 1)
(is (= 4 (errors-count)))
+ (is (.last-error state storm-id "2"))
(advance-time-secs! 5)
(.feed feeder [2])
(tracked-wait tracked 1)
(is (= 4 (errors-count)))
+ (is (.last-error state storm-id "2"))
(advance-time-secs! 6)
(.feed feeder [2])
(tracked-wait tracked 1)
(is (= 6 (errors-count)))
+ (is (.last-error state storm-id "2"))
(advance-time-secs! 6)
(.feed feeder [3])
(tracked-wait tracked 1)
(is (= 8 (errors-count)))
-
- ))))
+ (is (.last-error state storm-id "2"))))))
+
(deftest test-acking-branching-complex
;; test acking with branching in the topology
[2/3] storm git commit: Merge branch 'storm-636-ui-errors' of
https://github.com/d2r/storm into STORM-636
Posted by bo...@apache.org.
Merge branch 'storm-636-ui-errors' of https://github.com/d2r/storm into STORM-636
STORM-636: Faster, optional retrieval of last component error
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/01d78c72
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/01d78c72
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/01d78c72
Branch: refs/heads/master
Commit: 01d78c726d15a5ac9a4676e6b85b41b6eff9b857
Parents: bbcf749 1cfa190
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Feb 3 12:34:57 2015 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Tue Feb 3 12:34:57 2015 -0600
----------------------------------------------------------------------
storm-core/src/clj/backtype/storm/cluster.clj | 38 +-
.../src/clj/backtype/storm/daemon/nimbus.clj | 35 +-
storm-core/src/clj/backtype/storm/ui/core.clj | 32 +-
.../storm/generated/GetInfoOptions.java | 350 +++++++
.../jvm/backtype/storm/generated/Nimbus.java | 974 +++++++++++++++++++
.../storm/generated/NumErrorsChoice.java | 64 ++
.../src/jvm/backtype/storm/utils/Monitor.java | 8 +-
storm-core/src/py/storm/DistributedRPC-remote | 0
.../py/storm/DistributedRPCInvocations-remote | 0
storm-core/src/py/storm/Nimbus-remote | 7 +
storm-core/src/py/storm/Nimbus.py | 226 +++++
storm-core/src/py/storm/ttypes.py | 80 ++
storm-core/src/storm.thrift | 11 +
.../clj/backtype/storm/integration_test.clj | 10 +-
14 files changed, 1812 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/01d78c72/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
[3/3] storm git commit: Added STORM-636 to changelog
Posted by bo...@apache.org.
Added STORM-636 to changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1f35f41a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1f35f41a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1f35f41a
Branch: refs/heads/master
Commit: 1f35f41a879c8e12a77b1eb8f0695507486a88a6
Parents: 01d78c7
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Feb 3 12:52:59 2015 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Tue Feb 3 12:52:59 2015 -0600
----------------------------------------------------------------------
CHANGELOG.md | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/1f35f41a/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index cbdc314..b8313cf 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -39,7 +39,8 @@
* STORM-632: New grouping for better load balancing
* STORM-527: update worker.clj -- delete "missing-tasks" checking
* STORM-623: Generate latest javadocs
- * STORM-635. logviewer returns 404 if storm_home/logs is a symlinked dir.
+ * STORM-635: logviewer returns 404 if storm_home/logs is a symlinked dir.
+ * STORM-636: Faster, optional retrieval of last component error
## 0.9.3-rc2
* STORM-558: change "swap!" to "reset!" to fix assignment-versions in supervisor