You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/05/23 15:13:40 UTC
[1/3] storm git commit: STORM-2448: Report storm and jdk versions to
nimbus on topology submission
Repository: storm
Updated Branches:
refs/heads/0.10.x-branch c56b7cf27 -> ecba328c8
STORM-2448: Report storm and jdk versions to nimbus on topology submission
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/77985a07
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/77985a07
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/77985a07
Branch: refs/heads/0.10.x-branch
Commit: 77985a0756d099c379059dbef021ade4af0d4e88
Parents: 4fa445f
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Apr 4 14:32:18 2017 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Sat Apr 8 21:09:20 2017 -0500
----------------------------------------------------------------------
.../src/clj/backtype/storm/daemon/common.clj | 11 +-
.../src/clj/backtype/storm/daemon/nimbus.clj | 4 +-
storm-core/src/clj/backtype/storm/testing.clj | 4 +-
.../src/jvm/backtype/storm/StormSubmitter.java | 1 +
.../backtype/storm/generated/StormTopology.java | 222 ++++++++++++-
.../storm/topology/TopologyBuilder.java | 4 +-
.../storm/utils/ThriftTopologyUtils.java | 29 +-
.../src/jvm/backtype/storm/utils/Utils.java | 21 ++
storm-core/src/py/storm/DistributedRPC-remote | 2 +-
storm-core/src/py/storm/DistributedRPC.py | 20 +-
.../py/storm/DistributedRPCInvocations-remote | 2 +-
.../src/py/storm/DistributedRPCInvocations.py | 41 ++-
storm-core/src/py/storm/Nimbus-remote | 2 +-
storm-core/src/py/storm/Nimbus.py | 318 +++++++++++++++----
storm-core/src/py/storm/constants.py | 2 +-
storm-core/src/py/storm/ttypes.py | 161 ++++++----
storm-core/src/storm.thrift | 5 +
17 files changed, 661 insertions(+), 188 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/77985a07/storm-core/src/clj/backtype/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/common.clj b/storm-core/src/clj/backtype/storm/daemon/common.clj
index e3a10ef..a592d8e 100644
--- a/storm-core/src/clj/backtype/storm/daemon/common.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/common.clj
@@ -102,13 +102,12 @@
)))))
(defn- validate-ids! [^StormTopology topology]
- (let [sets (map #(.getFieldValue topology %) thrift/STORM-TOPOLOGY-FIELDS)
+ (let [sets [(.get_bolts topology) (.get_spouts topology) (.get_state_spouts topology)]
offending (apply any-intersection sets)]
(if-not (empty? offending)
(throw (InvalidTopologyException.
(str "Duplicate component ids: " offending))))
- (doseq [f thrift/STORM-TOPOLOGY-FIELDS
- :let [obj-map (.getFieldValue topology f)]]
+ (doseq [obj-map sets]
(doseq [id (keys obj-map)]
(if (system-id? id)
(throw (InvalidTopologyException.
@@ -122,9 +121,9 @@
(defn all-components [^StormTopology topology]
(apply merge {}
- (for [f thrift/STORM-TOPOLOGY-FIELDS]
- (.getFieldValue topology f)
- )))
+ (.get_bolts topology)
+ (.get_spouts topology)
+ (.get_state_spouts topology)))
(defn component-conf [component]
(->> component
http://git-wip-us.apache.org/repos/asf/storm/blob/77985a07/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index 7757708..b8b17bc 100644
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@ -1095,7 +1095,9 @@
(throw (IllegalArgumentException. "The cluster is configured for zookeeper authentication, but no payload was provided.")))
(log-message "Received topology submission for "
storm-name
- " with conf "
+ " (storm-" (.get_storm_version topology)
+ " JDK-" (.get_jdk_version topology)
+ ") with conf "
(redact-value storm-conf STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD))
;; lock protects against multiple topologies being submitted at once and
;; cleanup thread killing topology in b/w assignment and starting the topology
http://git-wip-us.apache.org/repos/asf/storm/blob/77985a07/storm-core/src/clj/backtype/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/testing.clj b/storm-core/src/clj/backtype/storm/testing.clj
index b0c6637..76e1b3f 100644
--- a/storm-core/src/clj/backtype/storm/testing.clj
+++ b/storm-core/src/clj/backtype/storm/testing.clj
@@ -273,13 +273,13 @@
[nimbus storm-name conf topology]
(when-not (Utils/isValidConf conf)
(throw (IllegalArgumentException. "Topology conf is not json-serializable")))
- (.submitTopology nimbus storm-name nil (to-json conf) topology))
+ (.submitTopology nimbus storm-name nil (to-json conf) (Utils/addVersions topology)))
(defn submit-local-topology-with-opts
[nimbus storm-name conf topology submit-opts]
(when-not (Utils/isValidConf conf)
(throw (IllegalArgumentException. "Topology conf is not json-serializable")))
- (.submitTopologyWithOpts nimbus storm-name nil (to-json conf) topology submit-opts))
+ (.submitTopologyWithOpts nimbus storm-name nil (to-json conf) (Utils/addVersions topology) submit-opts))
(defn mocked-compute-new-topology->executor->node+port [storm-name executor->node+port]
(fn [nimbus existing-assignments topologies scratch-topology-id]
http://git-wip-us.apache.org/repos/asf/storm/blob/77985a07/storm-core/src/jvm/backtype/storm/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/StormSubmitter.java b/storm-core/src/jvm/backtype/storm/StormSubmitter.java
index c30cc7d..cbb64c6 100644
--- a/storm-core/src/jvm/backtype/storm/StormSubmitter.java
+++ b/storm-core/src/jvm/backtype/storm/StormSubmitter.java
@@ -201,6 +201,7 @@ public class StormSubmitter {
passedCreds = tmpCreds.get_creds();
}
}
+ topology = Utils.addVersions(topology);
Map<String,String> fullCreds = populateCredentials(conf, passedCreds);
if (!fullCreds.isEmpty()) {
if (opts == null) {
http://git-wip-us.apache.org/repos/asf/storm/blob/77985a07/storm-core/src/jvm/backtype/storm/generated/StormTopology.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/StormTopology.java b/storm-core/src/jvm/backtype/storm/generated/StormTopology.java
index d022e95..0a1f356 100644
--- a/storm-core/src/jvm/backtype/storm/generated/StormTopology.java
+++ b/storm-core/src/jvm/backtype/storm/generated/StormTopology.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
/**
- * Autogenerated by Thrift Compiler (0.9.2)
+ * Autogenerated by Thrift Compiler (0.9.3)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
@@ -51,13 +51,15 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-2-6")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)", date = "2017-04-04")
public class StormTopology implements org.apache.thrift.TBase<StormTopology, StormTopology._Fields>, java.io.Serializable, Cloneable, Comparable<StormTopology> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("StormTopology");
private static final org.apache.thrift.protocol.TField SPOUTS_FIELD_DESC = new org.apache.thrift.protocol.TField("spouts", org.apache.thrift.protocol.TType.MAP, (short)1);
private static final org.apache.thrift.protocol.TField BOLTS_FIELD_DESC = new org.apache.thrift.protocol.TField("bolts", org.apache.thrift.protocol.TType.MAP, (short)2);
private static final org.apache.thrift.protocol.TField STATE_SPOUTS_FIELD_DESC = new org.apache.thrift.protocol.TField("state_spouts", org.apache.thrift.protocol.TType.MAP, (short)3);
+ private static final org.apache.thrift.protocol.TField STORM_VERSION_FIELD_DESC = new org.apache.thrift.protocol.TField("storm_version", org.apache.thrift.protocol.TType.STRING, (short)7);
+ private static final org.apache.thrift.protocol.TField JDK_VERSION_FIELD_DESC = new org.apache.thrift.protocol.TField("jdk_version", org.apache.thrift.protocol.TType.STRING, (short)8);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
@@ -68,12 +70,16 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
private Map<String,SpoutSpec> spouts; // required
private Map<String,Bolt> bolts; // required
private Map<String,StateSpoutSpec> state_spouts; // required
+ private String storm_version; // optional
+ private String jdk_version; // optional
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
SPOUTS((short)1, "spouts"),
BOLTS((short)2, "bolts"),
- STATE_SPOUTS((short)3, "state_spouts");
+ STATE_SPOUTS((short)3, "state_spouts"),
+ STORM_VERSION((short)7, "storm_version"),
+ JDK_VERSION((short)8, "jdk_version");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -94,6 +100,10 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
return BOLTS;
case 3: // STATE_SPOUTS
return STATE_SPOUTS;
+ case 7: // STORM_VERSION
+ return STORM_VERSION;
+ case 8: // JDK_VERSION
+ return JDK_VERSION;
default:
return null;
}
@@ -134,6 +144,7 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
}
// isset id assignments
+ private static final _Fields optionals[] = {_Fields.STORM_VERSION,_Fields.JDK_VERSION};
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -149,6 +160,10 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING),
new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, StateSpoutSpec.class))));
+ tmpMap.put(_Fields.STORM_VERSION, new org.apache.thrift.meta_data.FieldMetaData("storm_version", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.JDK_VERSION, new org.apache.thrift.meta_data.FieldMetaData("jdk_version", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(StormTopology.class, metaDataMap);
}
@@ -216,6 +231,12 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
}
this.state_spouts = __this__state_spouts;
}
+ if (other.is_set_storm_version()) {
+ this.storm_version = other.storm_version;
+ }
+ if (other.is_set_jdk_version()) {
+ this.jdk_version = other.jdk_version;
+ }
}
public StormTopology deepCopy() {
@@ -227,6 +248,8 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
this.spouts = null;
this.bolts = null;
this.state_spouts = null;
+ this.storm_version = null;
+ this.jdk_version = null;
}
public int get_spouts_size() {
@@ -331,6 +354,52 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
}
}
+ public String get_storm_version() {
+ return this.storm_version;
+ }
+
+ public void set_storm_version(String storm_version) {
+ this.storm_version = storm_version;
+ }
+
+ public void unset_storm_version() {
+ this.storm_version = null;
+ }
+
+ /** Returns true if field storm_version is set (has been assigned a value) and false otherwise */
+ public boolean is_set_storm_version() {
+ return this.storm_version != null;
+ }
+
+ public void set_storm_version_isSet(boolean value) {
+ if (!value) {
+ this.storm_version = null;
+ }
+ }
+
+ public String get_jdk_version() {
+ return this.jdk_version;
+ }
+
+ public void set_jdk_version(String jdk_version) {
+ this.jdk_version = jdk_version;
+ }
+
+ public void unset_jdk_version() {
+ this.jdk_version = null;
+ }
+
+ /** Returns true if field jdk_version is set (has been assigned a value) and false otherwise */
+ public boolean is_set_jdk_version() {
+ return this.jdk_version != null;
+ }
+
+ public void set_jdk_version_isSet(boolean value) {
+ if (!value) {
+ this.jdk_version = null;
+ }
+ }
+
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case SPOUTS:
@@ -357,6 +426,22 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
}
break;
+ case STORM_VERSION:
+ if (value == null) {
+ unset_storm_version();
+ } else {
+ set_storm_version((String)value);
+ }
+ break;
+
+ case JDK_VERSION:
+ if (value == null) {
+ unset_jdk_version();
+ } else {
+ set_jdk_version((String)value);
+ }
+ break;
+
}
}
@@ -371,6 +456,12 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
case STATE_SPOUTS:
return get_state_spouts();
+ case STORM_VERSION:
+ return get_storm_version();
+
+ case JDK_VERSION:
+ return get_jdk_version();
+
}
throw new IllegalStateException();
}
@@ -388,6 +479,10 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
return is_set_bolts();
case STATE_SPOUTS:
return is_set_state_spouts();
+ case STORM_VERSION:
+ return is_set_storm_version();
+ case JDK_VERSION:
+ return is_set_jdk_version();
}
throw new IllegalStateException();
}
@@ -432,6 +527,24 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
return false;
}
+ boolean this_present_storm_version = true && this.is_set_storm_version();
+ boolean that_present_storm_version = true && that.is_set_storm_version();
+ if (this_present_storm_version || that_present_storm_version) {
+ if (!(this_present_storm_version && that_present_storm_version))
+ return false;
+ if (!this.storm_version.equals(that.storm_version))
+ return false;
+ }
+
+ boolean this_present_jdk_version = true && this.is_set_jdk_version();
+ boolean that_present_jdk_version = true && that.is_set_jdk_version();
+ if (this_present_jdk_version || that_present_jdk_version) {
+ if (!(this_present_jdk_version && that_present_jdk_version))
+ return false;
+ if (!this.jdk_version.equals(that.jdk_version))
+ return false;
+ }
+
return true;
}
@@ -454,6 +567,16 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
if (present_state_spouts)
list.add(state_spouts);
+ boolean present_storm_version = true && (is_set_storm_version());
+ list.add(present_storm_version);
+ if (present_storm_version)
+ list.add(storm_version);
+
+ boolean present_jdk_version = true && (is_set_jdk_version());
+ list.add(present_jdk_version);
+ if (present_jdk_version)
+ list.add(jdk_version);
+
return list.hashCode();
}
@@ -495,6 +618,26 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(is_set_storm_version()).compareTo(other.is_set_storm_version());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_storm_version()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.storm_version, other.storm_version);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(is_set_jdk_version()).compareTo(other.is_set_jdk_version());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_jdk_version()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.jdk_version, other.jdk_version);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -538,6 +681,26 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
sb.append(this.state_spouts);
}
first = false;
+ if (is_set_storm_version()) {
+ if (!first) sb.append(", ");
+ sb.append("storm_version:");
+ if (this.storm_version == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.storm_version);
+ }
+ first = false;
+ }
+ if (is_set_jdk_version()) {
+ if (!first) sb.append(", ");
+ sb.append("jdk_version:");
+ if (this.jdk_version == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.jdk_version);
+ }
+ first = false;
+ }
sb.append(")");
return sb.toString();
}
@@ -656,6 +819,22 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
+ case 7: // STORM_VERSION
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.storm_version = iprot.readString();
+ struct.set_storm_version_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 8: // JDK_VERSION
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.jdk_version = iprot.readString();
+ struct.set_jdk_version_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@@ -708,6 +887,20 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
}
oprot.writeFieldEnd();
}
+ if (struct.storm_version != null) {
+ if (struct.is_set_storm_version()) {
+ oprot.writeFieldBegin(STORM_VERSION_FIELD_DESC);
+ oprot.writeString(struct.storm_version);
+ oprot.writeFieldEnd();
+ }
+ }
+ if (struct.jdk_version != null) {
+ if (struct.is_set_jdk_version()) {
+ oprot.writeFieldBegin(JDK_VERSION_FIELD_DESC);
+ oprot.writeString(struct.jdk_version);
+ oprot.writeFieldEnd();
+ }
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -749,6 +942,20 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
_iter61.getValue().write(oprot);
}
}
+ BitSet optionals = new BitSet();
+ if (struct.is_set_storm_version()) {
+ optionals.set(0);
+ }
+ if (struct.is_set_jdk_version()) {
+ optionals.set(1);
+ }
+ oprot.writeBitSet(optionals, 2);
+ if (struct.is_set_storm_version()) {
+ oprot.writeString(struct.storm_version);
+ }
+ if (struct.is_set_jdk_version()) {
+ oprot.writeString(struct.jdk_version);
+ }
}
@Override
@@ -796,6 +1003,15 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
}
}
struct.set_state_spouts_isSet(true);
+ BitSet incoming = iprot.readBitSet(2);
+ if (incoming.get(0)) {
+ struct.storm_version = iprot.readString();
+ struct.set_storm_version_isSet(true);
+ }
+ if (incoming.get(1)) {
+ struct.jdk_version = iprot.readString();
+ struct.set_jdk_version_isSet(true);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/77985a07/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java b/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java
index 4d5a0bd..9a4409d 100644
--- a/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java
+++ b/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java
@@ -112,9 +112,9 @@ public class TopologyBuilder {
spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.javaSerialize(spout)), common));
}
- return new StormTopology(spoutSpecs,
+ return Utils.addVersions(new StormTopology(spoutSpecs,
boltSpecs,
- new HashMap<String, StateSpoutSpec>());
+ new HashMap<String, StateSpoutSpec>()));
}
/**
http://git-wip-us.apache.org/repos/asf/storm/blob/77985a07/storm-core/src/jvm/backtype/storm/utils/ThriftTopologyUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/ThriftTopologyUtils.java b/storm-core/src/jvm/backtype/storm/utils/ThriftTopologyUtils.java
index 8306d9b..7b379c5 100644
--- a/storm-core/src/jvm/backtype/storm/utils/ThriftTopologyUtils.java
+++ b/storm-core/src/jvm/backtype/storm/utils/ThriftTopologyUtils.java
@@ -29,30 +29,19 @@ import java.util.Set;
public class ThriftTopologyUtils {
public static Set<String> getComponentIds(StormTopology topology) {
Set<String> ret = new HashSet<String>();
- for(StormTopology._Fields f: StormTopology.metaDataMap.keySet()) {
- Map<String, Object> componentMap = (Map<String, Object>) topology.getFieldValue(f);
- ret.addAll(componentMap.keySet());
- }
+ ret.addAll(topology.get_bolts().keySet());
+ ret.addAll(topology.get_spouts().keySet());
+ ret.addAll(topology.get_state_spouts().keySet());
return ret;
}
public static ComponentCommon getComponentCommon(StormTopology topology, String componentId) {
- for(StormTopology._Fields f: StormTopology.metaDataMap.keySet()) {
- Map<String, Object> componentMap = (Map<String, Object>) topology.getFieldValue(f);
- if(componentMap.containsKey(componentId)) {
- Object component = componentMap.get(componentId);
- if(component instanceof Bolt) {
- return ((Bolt) component).get_common();
- }
- if(component instanceof SpoutSpec) {
- return ((SpoutSpec) component).get_common();
- }
- if(component instanceof StateSpoutSpec) {
- return ((StateSpoutSpec) component).get_common();
- }
- throw new RuntimeException("Unreachable code! No get_common conversion for component " + component);
- }
- }
+ Bolt b = topology.get_bolts().get(componentId);
+ if (b != null) return b.get_common();
+ SpoutSpec s = topology.get_spouts().get(componentId);
+ if (s != null) return s.get_common();
+ StateSpoutSpec ss = topology.get_state_spouts().get(componentId);
+ if (ss != null) return ss.get_common();
throw new IllegalArgumentException("Could not find component common for " + componentId);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/77985a07/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java
index 15aae91..3f9b15c 100644
--- a/storm-core/src/jvm/backtype/storm/utils/Utils.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java
@@ -674,5 +674,26 @@ public class Utils {
public static int toPositive(int number) {
return number & Integer.MAX_VALUE;
}
+
+ /**
+ * Add version information to the given topology
+ * @param topology the topology being submitted (MIGHT BE MODIFIED)
+ * @return topology
+ */
+ public static StormTopology addVersions(StormTopology topology) {
+ String stormVersion = VersionInfo.getVersion();
+ LOG.warn("STORM-VERSION new {} old {}", stormVersion, topology.get_storm_version());
+ if (stormVersion != null &&
+ !"Unknown".equalsIgnoreCase(stormVersion) &&
+ !topology.is_set_storm_version()) {
+ topology.set_storm_version(stormVersion);
+ }
+
+ String jdkVersion = System.getProperty("java.version");
+ if (jdkVersion != null && !topology.is_set_jdk_version()) {
+ topology.set_jdk_version(jdkVersion);
+ }
+ return topology;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/77985a07/storm-core/src/py/storm/DistributedRPC-remote
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/DistributedRPC-remote b/storm-core/src/py/storm/DistributedRPC-remote
index 3d06d07..90f894a 100644
--- a/storm-core/src/py/storm/DistributedRPC-remote
+++ b/storm-core/src/py/storm/DistributedRPC-remote
@@ -18,7 +18,7 @@
#!/usr/bin/env python
#
-# Autogenerated by Thrift Compiler (0.9.2)
+# Autogenerated by Thrift Compiler (0.9.3)
#
# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
#
http://git-wip-us.apache.org/repos/asf/storm/blob/77985a07/storm-core/src/py/storm/DistributedRPC.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/DistributedRPC.py b/storm-core/src/py/storm/DistributedRPC.py
index 330499c..225c57e 100644
--- a/storm-core/src/py/storm/DistributedRPC.py
+++ b/storm-core/src/py/storm/DistributedRPC.py
@@ -17,7 +17,7 @@
# limitations under the License.
#
-# Autogenerated by Thrift Compiler (0.9.2)
+# Autogenerated by Thrift Compiler (0.9.3)
#
# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
#
@@ -25,6 +25,7 @@
#
from thrift.Thrift import TType, TMessageType, TException, TApplicationException
+import logging
from ttypes import *
from thrift.Thrift import TProcessor
from thrift.transport import TTransport
@@ -87,7 +88,7 @@ class Client(Iface):
raise result.e
if result.aze is not None:
raise result.aze
- raise TApplicationException(TApplicationException.MISSING_RESULT, "execute failed: unknown result");
+ raise TApplicationException(TApplicationException.MISSING_RESULT, "execute failed: unknown result")
class Processor(Iface, TProcessor):
@@ -118,11 +119,20 @@ class Processor(Iface, TProcessor):
result = execute_result()
try:
result.success = self._handler.execute(args.functionName, args.funcArgs)
- except DRPCExecutionException, e:
+ msg_type = TMessageType.REPLY
+ except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+ raise
+ except DRPCExecutionException as e:
+ msg_type = TMessageType.REPLY
result.e = e
- except AuthorizationException, aze:
+ except AuthorizationException as aze:
+ msg_type = TMessageType.REPLY
result.aze = aze
- oprot.writeMessageBegin("execute", TMessageType.REPLY, seqid)
+ except Exception as ex:
+ msg_type = TMessageType.EXCEPTION
+ logging.exception(ex)
+ result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+ oprot.writeMessageBegin("execute", msg_type, seqid)
result.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()
http://git-wip-us.apache.org/repos/asf/storm/blob/77985a07/storm-core/src/py/storm/DistributedRPCInvocations-remote
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/DistributedRPCInvocations-remote b/storm-core/src/py/storm/DistributedRPCInvocations-remote
index 9dd50cd..01435b6 100644
--- a/storm-core/src/py/storm/DistributedRPCInvocations-remote
+++ b/storm-core/src/py/storm/DistributedRPCInvocations-remote
@@ -18,7 +18,7 @@
#!/usr/bin/env python
#
-# Autogenerated by Thrift Compiler (0.9.2)
+# Autogenerated by Thrift Compiler (0.9.3)
#
# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
#
http://git-wip-us.apache.org/repos/asf/storm/blob/77985a07/storm-core/src/py/storm/DistributedRPCInvocations.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/DistributedRPCInvocations.py b/storm-core/src/py/storm/DistributedRPCInvocations.py
index 493fcc7..207fa9d 100644
--- a/storm-core/src/py/storm/DistributedRPCInvocations.py
+++ b/storm-core/src/py/storm/DistributedRPCInvocations.py
@@ -17,7 +17,7 @@
# limitations under the License.
#
-# Autogenerated by Thrift Compiler (0.9.2)
+# Autogenerated by Thrift Compiler (0.9.3)
#
# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
#
@@ -25,6 +25,7 @@
#
from thrift.Thrift import TType, TMessageType, TException, TApplicationException
+import logging
from ttypes import *
from thrift.Thrift import TProcessor
from thrift.transport import TTransport
@@ -130,7 +131,7 @@ class Client(Iface):
return result.success
if result.aze is not None:
raise result.aze
- raise TApplicationException(TApplicationException.MISSING_RESULT, "fetchRequest failed: unknown result");
+ raise TApplicationException(TApplicationException.MISSING_RESULT, "fetchRequest failed: unknown result")
def failRequest(self, id):
"""
@@ -194,9 +195,17 @@ class Processor(Iface, TProcessor):
result = result_result()
try:
self._handler.result(args.id, args.result)
- except AuthorizationException, aze:
+ msg_type = TMessageType.REPLY
+ except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+ raise
+ except AuthorizationException as aze:
+ msg_type = TMessageType.REPLY
result.aze = aze
- oprot.writeMessageBegin("result", TMessageType.REPLY, seqid)
+ except Exception as ex:
+ msg_type = TMessageType.EXCEPTION
+ logging.exception(ex)
+ result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+ oprot.writeMessageBegin("result", msg_type, seqid)
result.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()
@@ -208,9 +217,17 @@ class Processor(Iface, TProcessor):
result = fetchRequest_result()
try:
result.success = self._handler.fetchRequest(args.functionName)
- except AuthorizationException, aze:
+ msg_type = TMessageType.REPLY
+ except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+ raise
+ except AuthorizationException as aze:
+ msg_type = TMessageType.REPLY
result.aze = aze
- oprot.writeMessageBegin("fetchRequest", TMessageType.REPLY, seqid)
+ except Exception as ex:
+ msg_type = TMessageType.EXCEPTION
+ logging.exception(ex)
+ result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+ oprot.writeMessageBegin("fetchRequest", msg_type, seqid)
result.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()
@@ -222,9 +239,17 @@ class Processor(Iface, TProcessor):
result = failRequest_result()
try:
self._handler.failRequest(args.id)
- except AuthorizationException, aze:
+ msg_type = TMessageType.REPLY
+ except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+ raise
+ except AuthorizationException as aze:
+ msg_type = TMessageType.REPLY
result.aze = aze
- oprot.writeMessageBegin("failRequest", TMessageType.REPLY, seqid)
+ except Exception as ex:
+ msg_type = TMessageType.EXCEPTION
+ logging.exception(ex)
+ result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+ oprot.writeMessageBegin("failRequest", msg_type, seqid)
result.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()
http://git-wip-us.apache.org/repos/asf/storm/blob/77985a07/storm-core/src/py/storm/Nimbus-remote
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/Nimbus-remote b/storm-core/src/py/storm/Nimbus-remote
index 9af5ead..c6c1514 100644
--- a/storm-core/src/py/storm/Nimbus-remote
+++ b/storm-core/src/py/storm/Nimbus-remote
@@ -18,7 +18,7 @@
#!/usr/bin/env python
#
-# Autogenerated by Thrift Compiler (0.9.2)
+# Autogenerated by Thrift Compiler (0.9.3)
#
# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
#
http://git-wip-us.apache.org/repos/asf/storm/blob/77985a07/storm-core/src/py/storm/Nimbus.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/Nimbus.py b/storm-core/src/py/storm/Nimbus.py
index e9636c2..95bad54 100644
--- a/storm-core/src/py/storm/Nimbus.py
+++ b/storm-core/src/py/storm/Nimbus.py
@@ -17,7 +17,7 @@
# limitations under the License.
#
-# Autogenerated by Thrift Compiler (0.9.2)
+# Autogenerated by Thrift Compiler (0.9.3)
#
# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
#
@@ -25,6 +25,7 @@
#
from thrift.Thrift import TType, TMessageType, TException, TApplicationException
+import logging
from ttypes import *
from thrift.Thrift import TProcessor
from thrift.transport import TTransport
@@ -506,7 +507,7 @@ class Client(Iface):
return result.success
if result.aze is not None:
raise result.aze
- raise TApplicationException(TApplicationException.MISSING_RESULT, "beginFileUpload failed: unknown result");
+ raise TApplicationException(TApplicationException.MISSING_RESULT, "beginFileUpload failed: unknown result")
def uploadChunk(self, location, chunk):
"""
@@ -603,7 +604,7 @@ class Client(Iface):
return result.success
if result.aze is not None:
raise result.aze
- raise TApplicationException(TApplicationException.MISSING_RESULT, "beginFileDownload failed: unknown result");
+ raise TApplicationException(TApplicationException.MISSING_RESULT, "beginFileDownload failed: unknown result")
def downloadChunk(self, id):
"""
@@ -636,7 +637,7 @@ class Client(Iface):
return result.success
if result.aze is not None:
raise result.aze
- raise TApplicationException(TApplicationException.MISSING_RESULT, "downloadChunk failed: unknown result");
+ raise TApplicationException(TApplicationException.MISSING_RESULT, "downloadChunk failed: unknown result")
def getNimbusConf(self):
self.send_getNimbusConf()
@@ -664,7 +665,7 @@ class Client(Iface):
return result.success
if result.aze is not None:
raise result.aze
- raise TApplicationException(TApplicationException.MISSING_RESULT, "getNimbusConf failed: unknown result");
+ raise TApplicationException(TApplicationException.MISSING_RESULT, "getNimbusConf failed: unknown result")
def getClusterInfo(self):
self.send_getClusterInfo()
@@ -692,7 +693,7 @@ class Client(Iface):
return result.success
if result.aze is not None:
raise result.aze
- raise TApplicationException(TApplicationException.MISSING_RESULT, "getClusterInfo failed: unknown result");
+ raise TApplicationException(TApplicationException.MISSING_RESULT, "getClusterInfo failed: unknown result")
def getTopologyInfo(self, id):
"""
@@ -727,7 +728,7 @@ class Client(Iface):
raise result.e
if result.aze is not None:
raise result.aze
- raise TApplicationException(TApplicationException.MISSING_RESULT, "getTopologyInfo failed: unknown result");
+ raise TApplicationException(TApplicationException.MISSING_RESULT, "getTopologyInfo failed: unknown result")
def getTopologyInfoWithOpts(self, id, options):
"""
@@ -764,7 +765,7 @@ class Client(Iface):
raise result.e
if result.aze is not None:
raise result.aze
- raise TApplicationException(TApplicationException.MISSING_RESULT, "getTopologyInfoWithOpts failed: unknown result");
+ raise TApplicationException(TApplicationException.MISSING_RESULT, "getTopologyInfoWithOpts failed: unknown result")
def getTopologyConf(self, id):
"""
@@ -799,7 +800,7 @@ class Client(Iface):
raise result.e
if result.aze is not None:
raise result.aze
- raise TApplicationException(TApplicationException.MISSING_RESULT, "getTopologyConf failed: unknown result");
+ raise TApplicationException(TApplicationException.MISSING_RESULT, "getTopologyConf failed: unknown result")
def getTopology(self, id):
"""
@@ -836,7 +837,7 @@ class Client(Iface):
raise result.e
if result.aze is not None:
raise result.aze
- raise TApplicationException(TApplicationException.MISSING_RESULT, "getTopology failed: unknown result");
+ raise TApplicationException(TApplicationException.MISSING_RESULT, "getTopology failed: unknown result")
def getUserTopology(self, id):
"""
@@ -873,7 +874,7 @@ class Client(Iface):
raise result.e
if result.aze is not None:
raise result.aze
- raise TApplicationException(TApplicationException.MISSING_RESULT, "getUserTopology failed: unknown result");
+ raise TApplicationException(TApplicationException.MISSING_RESULT, "getUserTopology failed: unknown result")
class Processor(Iface, TProcessor):
@@ -923,13 +924,23 @@ class Processor(Iface, TProcessor):
result = submitTopology_result()
try:
self._handler.submitTopology(args.name, args.uploadedJarLocation, args.jsonConf, args.topology)
- except AlreadyAliveException, e:
+ msg_type = TMessageType.REPLY
+ except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+ raise
+ except AlreadyAliveException as e:
+ msg_type = TMessageType.REPLY
result.e = e
- except InvalidTopologyException, ite:
+ except InvalidTopologyException as ite:
+ msg_type = TMessageType.REPLY
result.ite = ite
- except AuthorizationException, aze:
+ except AuthorizationException as aze:
+ msg_type = TMessageType.REPLY
result.aze = aze
- oprot.writeMessageBegin("submitTopology", TMessageType.REPLY, seqid)
+ except Exception as ex:
+ msg_type = TMessageType.EXCEPTION
+ logging.exception(ex)
+ result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+ oprot.writeMessageBegin("submitTopology", msg_type, seqid)
result.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()
@@ -941,13 +952,23 @@ class Processor(Iface, TProcessor):
result = submitTopologyWithOpts_result()
try:
self._handler.submitTopologyWithOpts(args.name, args.uploadedJarLocation, args.jsonConf, args.topology, args.options)
- except AlreadyAliveException, e:
+ msg_type = TMessageType.REPLY
+ except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+ raise
+ except AlreadyAliveException as e:
+ msg_type = TMessageType.REPLY
result.e = e
- except InvalidTopologyException, ite:
+ except InvalidTopologyException as ite:
+ msg_type = TMessageType.REPLY
result.ite = ite
- except AuthorizationException, aze:
+ except AuthorizationException as aze:
+ msg_type = TMessageType.REPLY
result.aze = aze
- oprot.writeMessageBegin("submitTopologyWithOpts", TMessageType.REPLY, seqid)
+ except Exception as ex:
+ msg_type = TMessageType.EXCEPTION
+ logging.exception(ex)
+ result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+ oprot.writeMessageBegin("submitTopologyWithOpts", msg_type, seqid)
result.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()
@@ -959,11 +980,20 @@ class Processor(Iface, TProcessor):
result = killTopology_result()
try:
self._handler.killTopology(args.name)
- except NotAliveException, e:
+ msg_type = TMessageType.REPLY
+ except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+ raise
+ except NotAliveException as e:
+ msg_type = TMessageType.REPLY
result.e = e
- except AuthorizationException, aze:
+ except AuthorizationException as aze:
+ msg_type = TMessageType.REPLY
result.aze = aze
- oprot.writeMessageBegin("killTopology", TMessageType.REPLY, seqid)
+ except Exception as ex:
+ msg_type = TMessageType.EXCEPTION
+ logging.exception(ex)
+ result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+ oprot.writeMessageBegin("killTopology", msg_type, seqid)
result.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()
@@ -975,11 +1005,20 @@ class Processor(Iface, TProcessor):
result = killTopologyWithOpts_result()
try:
self._handler.killTopologyWithOpts(args.name, args.options)
- except NotAliveException, e:
+ msg_type = TMessageType.REPLY
+ except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+ raise
+ except NotAliveException as e:
+ msg_type = TMessageType.REPLY
result.e = e
- except AuthorizationException, aze:
+ except AuthorizationException as aze:
+ msg_type = TMessageType.REPLY
result.aze = aze
- oprot.writeMessageBegin("killTopologyWithOpts", TMessageType.REPLY, seqid)
+ except Exception as ex:
+ msg_type = TMessageType.EXCEPTION
+ logging.exception(ex)
+ result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+ oprot.writeMessageBegin("killTopologyWithOpts", msg_type, seqid)
result.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()
@@ -991,11 +1030,20 @@ class Processor(Iface, TProcessor):
result = activate_result()
try:
self._handler.activate(args.name)
- except NotAliveException, e:
+ msg_type = TMessageType.REPLY
+ except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+ raise
+ except NotAliveException as e:
+ msg_type = TMessageType.REPLY
result.e = e
- except AuthorizationException, aze:
+ except AuthorizationException as aze:
+ msg_type = TMessageType.REPLY
result.aze = aze
- oprot.writeMessageBegin("activate", TMessageType.REPLY, seqid)
+ except Exception as ex:
+ msg_type = TMessageType.EXCEPTION
+ logging.exception(ex)
+ result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+ oprot.writeMessageBegin("activate", msg_type, seqid)
result.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()
@@ -1007,11 +1055,20 @@ class Processor(Iface, TProcessor):
result = deactivate_result()
try:
self._handler.deactivate(args.name)
- except NotAliveException, e:
+ msg_type = TMessageType.REPLY
+ except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+ raise
+ except NotAliveException as e:
+ msg_type = TMessageType.REPLY
result.e = e
- except AuthorizationException, aze:
+ except AuthorizationException as aze:
+ msg_type = TMessageType.REPLY
result.aze = aze
- oprot.writeMessageBegin("deactivate", TMessageType.REPLY, seqid)
+ except Exception as ex:
+ msg_type = TMessageType.EXCEPTION
+ logging.exception(ex)
+ result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+ oprot.writeMessageBegin("deactivate", msg_type, seqid)
result.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()
@@ -1023,13 +1080,23 @@ class Processor(Iface, TProcessor):
result = rebalance_result()
try:
self._handler.rebalance(args.name, args.options)
- except NotAliveException, e:
+ msg_type = TMessageType.REPLY
+ except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+ raise
+ except NotAliveException as e:
+ msg_type = TMessageType.REPLY
result.e = e
- except InvalidTopologyException, ite:
+ except InvalidTopologyException as ite:
+ msg_type = TMessageType.REPLY
result.ite = ite
- except AuthorizationException, aze:
+ except AuthorizationException as aze:
+ msg_type = TMessageType.REPLY
result.aze = aze
- oprot.writeMessageBegin("rebalance", TMessageType.REPLY, seqid)
+ except Exception as ex:
+ msg_type = TMessageType.EXCEPTION
+ logging.exception(ex)
+ result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+ oprot.writeMessageBegin("rebalance", msg_type, seqid)
result.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()
@@ -1041,13 +1108,23 @@ class Processor(Iface, TProcessor):
result = uploadNewCredentials_result()
try:
self._handler.uploadNewCredentials(args.name, args.creds)
- except NotAliveException, e:
+ msg_type = TMessageType.REPLY
+ except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+ raise
+ except NotAliveException as e:
+ msg_type = TMessageType.REPLY
result.e = e
- except InvalidTopologyException, ite:
+ except InvalidTopologyException as ite:
+ msg_type = TMessageType.REPLY
result.ite = ite
- except AuthorizationException, aze:
+ except AuthorizationException as aze:
+ msg_type = TMessageType.REPLY
result.aze = aze
- oprot.writeMessageBegin("uploadNewCredentials", TMessageType.REPLY, seqid)
+ except Exception as ex:
+ msg_type = TMessageType.EXCEPTION
+ logging.exception(ex)
+ result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+ oprot.writeMessageBegin("uploadNewCredentials", msg_type, seqid)
result.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()
@@ -1059,9 +1136,17 @@ class Processor(Iface, TProcessor):
result = beginFileUpload_result()
try:
result.success = self._handler.beginFileUpload()
- except AuthorizationException, aze:
+ msg_type = TMessageType.REPLY
+ except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+ raise
+ except AuthorizationException as aze:
+ msg_type = TMessageType.REPLY
result.aze = aze
- oprot.writeMessageBegin("beginFileUpload", TMessageType.REPLY, seqid)
+ except Exception as ex:
+ msg_type = TMessageType.EXCEPTION
+ logging.exception(ex)
+ result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+ oprot.writeMessageBegin("beginFileUpload", msg_type, seqid)
result.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()
@@ -1073,9 +1158,17 @@ class Processor(Iface, TProcessor):
result = uploadChunk_result()
try:
self._handler.uploadChunk(args.location, args.chunk)
- except AuthorizationException, aze:
+ msg_type = TMessageType.REPLY
+ except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+ raise
+ except AuthorizationException as aze:
+ msg_type = TMessageType.REPLY
result.aze = aze
- oprot.writeMessageBegin("uploadChunk", TMessageType.REPLY, seqid)
+ except Exception as ex:
+ msg_type = TMessageType.EXCEPTION
+ logging.exception(ex)
+ result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+ oprot.writeMessageBegin("uploadChunk", msg_type, seqid)
result.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()
@@ -1087,9 +1180,17 @@ class Processor(Iface, TProcessor):
result = finishFileUpload_result()
try:
self._handler.finishFileUpload(args.location)
- except AuthorizationException, aze:
+ msg_type = TMessageType.REPLY
+ except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+ raise
+ except AuthorizationException as aze:
+ msg_type = TMessageType.REPLY
result.aze = aze
- oprot.writeMessageBegin("finishFileUpload", TMessageType.REPLY, seqid)
+ except Exception as ex:
+ msg_type = TMessageType.EXCEPTION
+ logging.exception(ex)
+ result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+ oprot.writeMessageBegin("finishFileUpload", msg_type, seqid)
result.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()
@@ -1101,9 +1202,17 @@ class Processor(Iface, TProcessor):
result = beginFileDownload_result()
try:
result.success = self._handler.beginFileDownload(args.file)
- except AuthorizationException, aze:
+ msg_type = TMessageType.REPLY
+ except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+ raise
+ except AuthorizationException as aze:
+ msg_type = TMessageType.REPLY
result.aze = aze
- oprot.writeMessageBegin("beginFileDownload", TMessageType.REPLY, seqid)
+ except Exception as ex:
+ msg_type = TMessageType.EXCEPTION
+ logging.exception(ex)
+ result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+ oprot.writeMessageBegin("beginFileDownload", msg_type, seqid)
result.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()
@@ -1115,9 +1224,17 @@ class Processor(Iface, TProcessor):
result = downloadChunk_result()
try:
result.success = self._handler.downloadChunk(args.id)
- except AuthorizationException, aze:
+ msg_type = TMessageType.REPLY
+ except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+ raise
+ except AuthorizationException as aze:
+ msg_type = TMessageType.REPLY
result.aze = aze
- oprot.writeMessageBegin("downloadChunk", TMessageType.REPLY, seqid)
+ except Exception as ex:
+ msg_type = TMessageType.EXCEPTION
+ logging.exception(ex)
+ result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+ oprot.writeMessageBegin("downloadChunk", msg_type, seqid)
result.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()
@@ -1129,9 +1246,17 @@ class Processor(Iface, TProcessor):
result = getNimbusConf_result()
try:
result.success = self._handler.getNimbusConf()
- except AuthorizationException, aze:
+ msg_type = TMessageType.REPLY
+ except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+ raise
+ except AuthorizationException as aze:
+ msg_type = TMessageType.REPLY
result.aze = aze
- oprot.writeMessageBegin("getNimbusConf", TMessageType.REPLY, seqid)
+ except Exception as ex:
+ msg_type = TMessageType.EXCEPTION
+ logging.exception(ex)
+ result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+ oprot.writeMessageBegin("getNimbusConf", msg_type, seqid)
result.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()
@@ -1143,9 +1268,17 @@ class Processor(Iface, TProcessor):
result = getClusterInfo_result()
try:
result.success = self._handler.getClusterInfo()
- except AuthorizationException, aze:
+ msg_type = TMessageType.REPLY
+ except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+ raise
+ except AuthorizationException as aze:
+ msg_type = TMessageType.REPLY
result.aze = aze
- oprot.writeMessageBegin("getClusterInfo", TMessageType.REPLY, seqid)
+ except Exception as ex:
+ msg_type = TMessageType.EXCEPTION
+ logging.exception(ex)
+ result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+ oprot.writeMessageBegin("getClusterInfo", msg_type, seqid)
result.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()
@@ -1157,11 +1290,20 @@ class Processor(Iface, TProcessor):
result = getTopologyInfo_result()
try:
result.success = self._handler.getTopologyInfo(args.id)
- except NotAliveException, e:
+ msg_type = TMessageType.REPLY
+ except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+ raise
+ except NotAliveException as e:
+ msg_type = TMessageType.REPLY
result.e = e
- except AuthorizationException, aze:
+ except AuthorizationException as aze:
+ msg_type = TMessageType.REPLY
result.aze = aze
- oprot.writeMessageBegin("getTopologyInfo", TMessageType.REPLY, seqid)
+ except Exception as ex:
+ msg_type = TMessageType.EXCEPTION
+ logging.exception(ex)
+ result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+ oprot.writeMessageBegin("getTopologyInfo", msg_type, seqid)
result.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()
@@ -1173,11 +1315,20 @@ class Processor(Iface, TProcessor):
result = getTopologyInfoWithOpts_result()
try:
result.success = self._handler.getTopologyInfoWithOpts(args.id, args.options)
- except NotAliveException, e:
+ msg_type = TMessageType.REPLY
+ except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+ raise
+ except NotAliveException as e:
+ msg_type = TMessageType.REPLY
result.e = e
- except AuthorizationException, aze:
+ except AuthorizationException as aze:
+ msg_type = TMessageType.REPLY
result.aze = aze
- oprot.writeMessageBegin("getTopologyInfoWithOpts", TMessageType.REPLY, seqid)
+ except Exception as ex:
+ msg_type = TMessageType.EXCEPTION
+ logging.exception(ex)
+ result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+ oprot.writeMessageBegin("getTopologyInfoWithOpts", msg_type, seqid)
result.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()
@@ -1189,11 +1340,20 @@ class Processor(Iface, TProcessor):
result = getTopologyConf_result()
try:
result.success = self._handler.getTopologyConf(args.id)
- except NotAliveException, e:
+ msg_type = TMessageType.REPLY
+ except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+ raise
+ except NotAliveException as e:
+ msg_type = TMessageType.REPLY
result.e = e
- except AuthorizationException, aze:
+ except AuthorizationException as aze:
+ msg_type = TMessageType.REPLY
result.aze = aze
- oprot.writeMessageBegin("getTopologyConf", TMessageType.REPLY, seqid)
+ except Exception as ex:
+ msg_type = TMessageType.EXCEPTION
+ logging.exception(ex)
+ result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+ oprot.writeMessageBegin("getTopologyConf", msg_type, seqid)
result.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()
@@ -1205,11 +1365,20 @@ class Processor(Iface, TProcessor):
result = getTopology_result()
try:
result.success = self._handler.getTopology(args.id)
- except NotAliveException, e:
+ msg_type = TMessageType.REPLY
+ except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+ raise
+ except NotAliveException as e:
+ msg_type = TMessageType.REPLY
result.e = e
- except AuthorizationException, aze:
+ except AuthorizationException as aze:
+ msg_type = TMessageType.REPLY
result.aze = aze
- oprot.writeMessageBegin("getTopology", TMessageType.REPLY, seqid)
+ except Exception as ex:
+ msg_type = TMessageType.EXCEPTION
+ logging.exception(ex)
+ result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+ oprot.writeMessageBegin("getTopology", msg_type, seqid)
result.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()
@@ -1221,11 +1390,20 @@ class Processor(Iface, TProcessor):
result = getUserTopology_result()
try:
result.success = self._handler.getUserTopology(args.id)
- except NotAliveException, e:
+ msg_type = TMessageType.REPLY
+ except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+ raise
+ except NotAliveException as e:
+ msg_type = TMessageType.REPLY
result.e = e
- except AuthorizationException, aze:
+ except AuthorizationException as aze:
+ msg_type = TMessageType.REPLY
result.aze = aze
- oprot.writeMessageBegin("getUserTopology", TMessageType.REPLY, seqid)
+ except Exception as ex:
+ msg_type = TMessageType.EXCEPTION
+ logging.exception(ex)
+ result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+ oprot.writeMessageBegin("getUserTopology", msg_type, seqid)
result.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()
@@ -2742,7 +2920,7 @@ class uploadChunk_args:
iprot.skip(ftype)
elif fid == 2:
if ftype == TType.STRING:
- self.chunk = iprot.readString();
+ self.chunk = iprot.readString()
else:
iprot.skip(ftype)
else:
@@ -3219,7 +3397,7 @@ class downloadChunk_result:
break
if fid == 0:
if ftype == TType.STRING:
- self.success = iprot.readString();
+ self.success = iprot.readString()
else:
iprot.skip(ftype)
elif fid == 1:
http://git-wip-us.apache.org/repos/asf/storm/blob/77985a07/storm-core/src/py/storm/constants.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/constants.py b/storm-core/src/py/storm/constants.py
index 3f0c64a..b403f97 100644
--- a/storm-core/src/py/storm/constants.py
+++ b/storm-core/src/py/storm/constants.py
@@ -17,7 +17,7 @@
# limitations under the License.
#
-# Autogenerated by Thrift Compiler (0.9.2)
+# Autogenerated by Thrift Compiler (0.9.3)
#
# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
#
http://git-wip-us.apache.org/repos/asf/storm/blob/77985a07/storm-core/src/py/storm/ttypes.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/ttypes.py b/storm-core/src/py/storm/ttypes.py
index a06af92..0661254 100644
--- a/storm-core/src/py/storm/ttypes.py
+++ b/storm-core/src/py/storm/ttypes.py
@@ -17,7 +17,7 @@
# limitations under the License.
#
-# Autogenerated by Thrift Compiler (0.9.2)
+# Autogenerated by Thrift Compiler (0.9.3)
#
# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
#
@@ -126,12 +126,12 @@ class JavaObjectArg:
break
if fid == 1:
if ftype == TType.I32:
- self.int_arg = iprot.readI32();
+ self.int_arg = iprot.readI32()
else:
iprot.skip(ftype)
elif fid == 2:
if ftype == TType.I64:
- self.long_arg = iprot.readI64();
+ self.long_arg = iprot.readI64()
else:
iprot.skip(ftype)
elif fid == 3:
@@ -141,17 +141,17 @@ class JavaObjectArg:
iprot.skip(ftype)
elif fid == 4:
if ftype == TType.BOOL:
- self.bool_arg = iprot.readBool();
+ self.bool_arg = iprot.readBool()
else:
iprot.skip(ftype)
elif fid == 5:
if ftype == TType.STRING:
- self.binary_arg = iprot.readString();
+ self.binary_arg = iprot.readString()
else:
iprot.skip(ftype)
elif fid == 6:
if ftype == TType.DOUBLE:
- self.double_arg = iprot.readDouble();
+ self.double_arg = iprot.readDouble()
else:
iprot.skip(ftype)
else:
@@ -521,7 +521,7 @@ class Grouping:
iprot.skip(ftype)
elif fid == 7:
if ftype == TType.STRING:
- self.custom_serialized = iprot.readString();
+ self.custom_serialized = iprot.readString()
else:
iprot.skip(ftype)
elif fid == 8:
@@ -643,7 +643,7 @@ class StreamInfo:
iprot.skip(ftype)
elif fid == 2:
if ftype == TType.BOOL:
- self.direct = iprot.readBool();
+ self.direct = iprot.readBool()
else:
iprot.skip(ftype)
else:
@@ -804,7 +804,7 @@ class ComponentObject:
break
if fid == 1:
if ftype == TType.STRING:
- self.serialized_java = iprot.readString();
+ self.serialized_java = iprot.readString()
else:
iprot.skip(ftype)
elif fid == 2:
@@ -925,7 +925,7 @@ class ComponentCommon:
iprot.skip(ftype)
elif fid == 3:
if ftype == TType.I32:
- self.parallelism_hint = iprot.readI32();
+ self.parallelism_hint = iprot.readI32()
else:
iprot.skip(ftype)
elif fid == 4:
@@ -1255,6 +1255,8 @@ class StormTopology:
- spouts
- bolts
- state_spouts
+ - storm_version
+ - jdk_version
"""
thrift_spec = (
@@ -1262,12 +1264,19 @@ class StormTopology:
(1, TType.MAP, 'spouts', (TType.STRING,None,TType.STRUCT,(SpoutSpec, SpoutSpec.thrift_spec)), None, ), # 1
(2, TType.MAP, 'bolts', (TType.STRING,None,TType.STRUCT,(Bolt, Bolt.thrift_spec)), None, ), # 2
(3, TType.MAP, 'state_spouts', (TType.STRING,None,TType.STRUCT,(StateSpoutSpec, StateSpoutSpec.thrift_spec)), None, ), # 3
+ None, # 4
+ None, # 5
+ None, # 6
+ (7, TType.STRING, 'storm_version', None, None, ), # 7
+ (8, TType.STRING, 'jdk_version', None, None, ), # 8
)
- def __init__(self, spouts=None, bolts=None, state_spouts=None,):
+ def __init__(self, spouts=None, bolts=None, state_spouts=None, storm_version=None, jdk_version=None,):
self.spouts = spouts
self.bolts = bolts
self.state_spouts = state_spouts
+ self.storm_version = storm_version
+ self.jdk_version = jdk_version
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -1314,6 +1323,16 @@ class StormTopology:
iprot.readMapEnd()
else:
iprot.skip(ftype)
+ elif fid == 7:
+ if ftype == TType.STRING:
+ self.storm_version = iprot.readString().decode('utf-8')
+ else:
+ iprot.skip(ftype)
+ elif fid == 8:
+ if ftype == TType.STRING:
+ self.jdk_version = iprot.readString().decode('utf-8')
+ else:
+ iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
@@ -1348,6 +1367,14 @@ class StormTopology:
viter65.write(oprot)
oprot.writeMapEnd()
oprot.writeFieldEnd()
+ if self.storm_version is not None:
+ oprot.writeFieldBegin('storm_version', TType.STRING, 7)
+ oprot.writeString(self.storm_version.encode('utf-8'))
+ oprot.writeFieldEnd()
+ if self.jdk_version is not None:
+ oprot.writeFieldBegin('jdk_version', TType.STRING, 8)
+ oprot.writeString(self.jdk_version.encode('utf-8'))
+ oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
@@ -1366,6 +1393,8 @@ class StormTopology:
value = (value * 31) ^ hash(self.spouts)
value = (value * 31) ^ hash(self.bolts)
value = (value * 31) ^ hash(self.state_spouts)
+ value = (value * 31) ^ hash(self.storm_version)
+ value = (value * 31) ^ hash(self.jdk_version)
return value
def __repr__(self):
@@ -2223,22 +2252,22 @@ class TopologySummary:
iprot.skip(ftype)
elif fid == 3:
if ftype == TType.I32:
- self.num_tasks = iprot.readI32();
+ self.num_tasks = iprot.readI32()
else:
iprot.skip(ftype)
elif fid == 4:
if ftype == TType.I32:
- self.num_executors = iprot.readI32();
+ self.num_executors = iprot.readI32()
else:
iprot.skip(ftype)
elif fid == 5:
if ftype == TType.I32:
- self.num_workers = iprot.readI32();
+ self.num_workers = iprot.readI32()
else:
iprot.skip(ftype)
elif fid == 6:
if ftype == TType.I32:
- self.uptime_secs = iprot.readI32();
+ self.uptime_secs = iprot.readI32()
else:
iprot.skip(ftype)
elif fid == 7:
@@ -2365,10 +2394,10 @@ class SupervisorSummary:
(3, TType.I32, 'num_workers', None, None, ), # 3
(4, TType.I32, 'num_used_workers', None, None, ), # 4
(5, TType.STRING, 'supervisor_id', None, None, ), # 5
- (6, TType.STRING, 'version', None, None, ), # 6
+ (6, TType.STRING, 'version', None, "VERSION_NOT_PROVIDED", ), # 6
)
- def __init__(self, host=None, uptime_secs=None, num_workers=None, num_used_workers=None, supervisor_id=None, version=None,):
+ def __init__(self, host=None, uptime_secs=None, num_workers=None, num_used_workers=None, supervisor_id=None, version=thrift_spec[6][4],):
self.host = host
self.uptime_secs = uptime_secs
self.num_workers = num_workers
@@ -2392,17 +2421,17 @@ class SupervisorSummary:
iprot.skip(ftype)
elif fid == 2:
if ftype == TType.I32:
- self.uptime_secs = iprot.readI32();
+ self.uptime_secs = iprot.readI32()
else:
iprot.skip(ftype)
elif fid == 3:
if ftype == TType.I32:
- self.num_workers = iprot.readI32();
+ self.num_workers = iprot.readI32()
else:
iprot.skip(ftype)
elif fid == 4:
if ftype == TType.I32:
- self.num_used_workers = iprot.readI32();
+ self.num_used_workers = iprot.readI32()
else:
iprot.skip(ftype)
elif fid == 5:
@@ -2463,8 +2492,6 @@ class SupervisorSummary:
raise TProtocol.TProtocolException(message='Required field num_used_workers is unset!')
if self.supervisor_id is None:
raise TProtocol.TProtocolException(message='Required field supervisor_id is unset!')
- if self.version is None:
- raise TProtocol.TProtocolException(message='Required field version is unset!')
return
@@ -2531,7 +2558,7 @@ class ClusterSummary:
iprot.skip(ftype)
elif fid == 2:
if ftype == TType.I32:
- self.nimbus_uptime_secs = iprot.readI32();
+ self.nimbus_uptime_secs = iprot.readI32()
else:
iprot.skip(ftype)
elif fid == 3:
@@ -2643,7 +2670,7 @@ class ErrorInfo:
iprot.skip(ftype)
elif fid == 2:
if ftype == TType.I32:
- self.error_time_secs = iprot.readI32();
+ self.error_time_secs = iprot.readI32()
else:
iprot.skip(ftype)
elif fid == 3:
@@ -2653,7 +2680,7 @@ class ErrorInfo:
iprot.skip(ftype)
elif fid == 4:
if ftype == TType.I32:
- self.port = iprot.readI32();
+ self.port = iprot.readI32()
else:
iprot.skip(ftype)
else:
@@ -2758,7 +2785,7 @@ class BoltStats:
for _i91 in xrange(_size87):
_key92 = GlobalStreamId()
_key92.read(iprot)
- _val93 = iprot.readI64();
+ _val93 = iprot.readI64()
_val86[_key92] = _val93
iprot.readMapEnd()
self.acked[_key85] = _val86
@@ -2776,7 +2803,7 @@ class BoltStats:
for _i105 in xrange(_size101):
_key106 = GlobalStreamId()
_key106.read(iprot)
- _val107 = iprot.readI64();
+ _val107 = iprot.readI64()
_val100[_key106] = _val107
iprot.readMapEnd()
self.failed[_key99] = _val100
@@ -2794,7 +2821,7 @@ class BoltStats:
for _i119 in xrange(_size115):
_key120 = GlobalStreamId()
_key120.read(iprot)
- _val121 = iprot.readDouble();
+ _val121 = iprot.readDouble()
_val114[_key120] = _val121
iprot.readMapEnd()
self.process_ms_avg[_key113] = _val114
@@ -2812,7 +2839,7 @@ class BoltStats:
for _i133 in xrange(_size129):
_key134 = GlobalStreamId()
_key134.read(iprot)
- _val135 = iprot.readI64();
+ _val135 = iprot.readI64()
_val128[_key134] = _val135
iprot.readMapEnd()
self.executed[_key127] = _val128
@@ -2830,7 +2857,7 @@ class BoltStats:
for _i147 in xrange(_size143):
_key148 = GlobalStreamId()
_key148.read(iprot)
- _val149 = iprot.readDouble();
+ _val149 = iprot.readDouble()
_val142[_key148] = _val149
iprot.readMapEnd()
self.execute_ms_avg[_key141] = _val142
@@ -2983,7 +3010,7 @@ class SpoutStats:
(_ktype178, _vtype179, _size177 ) = iprot.readMapBegin()
for _i181 in xrange(_size177):
_key182 = iprot.readString().decode('utf-8')
- _val183 = iprot.readI64();
+ _val183 = iprot.readI64()
_val176[_key182] = _val183
iprot.readMapEnd()
self.acked[_key175] = _val176
@@ -3000,7 +3027,7 @@ class SpoutStats:
(_ktype192, _vtype193, _size191 ) = iprot.readMapBegin()
for _i195 in xrange(_size191):
_key196 = iprot.readString().decode('utf-8')
- _val197 = iprot.readI64();
+ _val197 = iprot.readI64()
_val190[_key196] = _val197
iprot.readMapEnd()
self.failed[_key189] = _val190
@@ -3017,7 +3044,7 @@ class SpoutStats:
(_ktype206, _vtype207, _size205 ) = iprot.readMapBegin()
for _i209 in xrange(_size205):
_key210 = iprot.readString().decode('utf-8')
- _val211 = iprot.readDouble();
+ _val211 = iprot.readDouble()
_val204[_key210] = _val211
iprot.readMapEnd()
self.complete_ms_avg[_key203] = _val204
@@ -3223,7 +3250,7 @@ class ExecutorStats:
(_ktype232, _vtype233, _size231 ) = iprot.readMapBegin()
for _i235 in xrange(_size231):
_key236 = iprot.readString().decode('utf-8')
- _val237 = iprot.readI64();
+ _val237 = iprot.readI64()
_val230[_key236] = _val237
iprot.readMapEnd()
self.emitted[_key229] = _val230
@@ -3240,7 +3267,7 @@ class ExecutorStats:
(_ktype246, _vtype247, _size245 ) = iprot.readMapBegin()
for _i249 in xrange(_size245):
_key250 = iprot.readString().decode('utf-8')
- _val251 = iprot.readI64();
+ _val251 = iprot.readI64()
_val244[_key250] = _val251
iprot.readMapEnd()
self.transferred[_key243] = _val244
@@ -3255,7 +3282,7 @@ class ExecutorStats:
iprot.skip(ftype)
elif fid == 4:
if ftype == TType.DOUBLE:
- self.rate = iprot.readDouble();
+ self.rate = iprot.readDouble()
else:
iprot.skip(ftype)
else:
@@ -3362,12 +3389,12 @@ class ExecutorInfo:
break
if fid == 1:
if ftype == TType.I32:
- self.task_start = iprot.readI32();
+ self.task_start = iprot.readI32()
else:
iprot.skip(ftype)
elif fid == 2:
if ftype == TType.I32:
- self.task_end = iprot.readI32();
+ self.task_end = iprot.readI32()
else:
iprot.skip(ftype)
else:
@@ -3473,12 +3500,12 @@ class ExecutorSummary:
iprot.skip(ftype)
elif fid == 4:
if ftype == TType.I32:
- self.port = iprot.readI32();
+ self.port = iprot.readI32()
else:
iprot.skip(ftype)
elif fid == 5:
if ftype == TType.I32:
- self.uptime_secs = iprot.readI32();
+ self.uptime_secs = iprot.readI32()
else:
iprot.skip(ftype)
elif fid == 7:
@@ -4121,7 +4148,7 @@ class TopologyInfo:
iprot.skip(ftype)
elif fid == 3:
if ftype == TType.I32:
- self.uptime_secs = iprot.readI32();
+ self.uptime_secs = iprot.readI32()
else:
iprot.skip(ftype)
elif fid == 4:
@@ -4286,7 +4313,7 @@ class KillOptions:
break
if fid == 1:
if ftype == TType.I32:
- self.wait_secs = iprot.readI32();
+ self.wait_secs = iprot.readI32()
else:
iprot.skip(ftype)
else:
@@ -4357,12 +4384,12 @@ class RebalanceOptions:
break
if fid == 1:
if ftype == TType.I32:
- self.wait_secs = iprot.readI32();
+ self.wait_secs = iprot.readI32()
else:
iprot.skip(ftype)
elif fid == 2:
if ftype == TType.I32:
- self.num_workers = iprot.readI32();
+ self.num_workers = iprot.readI32()
else:
iprot.skip(ftype)
elif fid == 3:
@@ -4371,7 +4398,7 @@ class RebalanceOptions:
(_ktype284, _vtype285, _size283 ) = iprot.readMapBegin()
for _i287 in xrange(_size283):
_key288 = iprot.readString().decode('utf-8')
- _val289 = iprot.readI32();
+ _val289 = iprot.readI32()
self.num_executors[_key288] = _val289
iprot.readMapEnd()
else:
@@ -4532,7 +4559,7 @@ class SubmitOptions:
break
if fid == 1:
if ftype == TType.I32:
- self.initial_status = iprot.readI32();
+ self.initial_status = iprot.readI32()
else:
iprot.skip(ftype)
elif fid == 2:
@@ -4631,7 +4658,7 @@ class SupervisorInfo:
break
if fid == 1:
if ftype == TType.I64:
- self.time_secs = iprot.readI64();
+ self.time_secs = iprot.readI64()
else:
iprot.skip(ftype)
elif fid == 2:
@@ -4649,7 +4676,7 @@ class SupervisorInfo:
self.used_ports = []
(_etype304, _size301) = iprot.readListBegin()
for _i305 in xrange(_size301):
- _elem306 = iprot.readI64();
+ _elem306 = iprot.readI64()
self.used_ports.append(_elem306)
iprot.readListEnd()
else:
@@ -4659,7 +4686,7 @@ class SupervisorInfo:
self.meta = []
(_etype310, _size307) = iprot.readListBegin()
for _i311 in xrange(_size307):
- _elem312 = iprot.readI64();
+ _elem312 = iprot.readI64()
self.meta.append(_elem312)
iprot.readListEnd()
else:
@@ -4677,7 +4704,7 @@ class SupervisorInfo:
iprot.skip(ftype)
elif fid == 7:
if ftype == TType.I64:
- self.uptime_secs = iprot.readI64();
+ self.uptime_secs = iprot.readI64()
else:
iprot.skip(ftype)
elif fid == 8:
@@ -4807,7 +4834,7 @@ class NodeInfo:
self.port = set()
(_etype327, _size324) = iprot.readSetBegin()
for _i328 in xrange(_size324):
- _elem329 = iprot.readI64();
+ _elem329 = iprot.readI64()
self.port.add(_elem329)
iprot.readSetEnd()
else:
@@ -4929,7 +4956,7 @@ class Assignment:
_key343 = []
(_etype348, _size345) = iprot.readListBegin()
for _i349 in xrange(_size345):
- _elem350 = iprot.readI64();
+ _elem350 = iprot.readI64()
_key343.append(_elem350)
iprot.readListEnd()
_val344 = NodeInfo()
@@ -4946,10 +4973,10 @@ class Assignment:
_key356 = []
(_etype361, _size358) = iprot.readListBegin()
for _i362 in xrange(_size358):
- _elem363 = iprot.readI64();
+ _elem363 = iprot.readI64()
_key356.append(_elem363)
iprot.readListEnd()
- _val357 = iprot.readI64();
+ _val357 = iprot.readI64()
self.executor_start_time_secs[_key356] = _val357
iprot.readMapEnd()
else:
@@ -5157,12 +5184,12 @@ class StormBase:
iprot.skip(ftype)
elif fid == 2:
if ftype == TType.I32:
- self.status = iprot.readI32();
+ self.status = iprot.readI32()
else:
iprot.skip(ftype)
elif fid == 3:
if ftype == TType.I32:
- self.num_workers = iprot.readI32();
+ self.num_workers = iprot.readI32()
else:
iprot.skip(ftype)
elif fid == 4:
@@ -5171,14 +5198,14 @@ class StormBase:
(_ktype373, _vtype374, _size372 ) = iprot.readMapBegin()
for _i376 in xrange(_size372):
_key377 = iprot.readString().decode('utf-8')
- _val378 = iprot.readI32();
+ _val378 = iprot.readI32()
self.component_executors[_key377] = _val378
iprot.readMapEnd()
else:
iprot.skip(ftype)
elif fid == 5:
if ftype == TType.I32:
- self.launch_time_secs = iprot.readI32();
+ self.launch_time_secs = iprot.readI32()
else:
iprot.skip(ftype)
elif fid == 6:
@@ -5194,7 +5221,7 @@ class StormBase:
iprot.skip(ftype)
elif fid == 8:
if ftype == TType.I32:
- self.prev_status = iprot.readI32();
+ self.prev_status = iprot.readI32()
else:
iprot.skip(ftype)
else:
@@ -5331,12 +5358,12 @@ class ClusterWorkerHeartbeat:
iprot.skip(ftype)
elif fid == 3:
if ftype == TType.I32:
- self.time_secs = iprot.readI32();
+ self.time_secs = iprot.readI32()
else:
iprot.skip(ftype)
elif fid == 4:
if ftype == TType.I32:
- self.uptime_secs = iprot.readI32();
+ self.uptime_secs = iprot.readI32()
else:
iprot.skip(ftype)
else:
@@ -5436,7 +5463,7 @@ class ThriftSerializedObject:
iprot.skip(ftype)
elif fid == 2:
if ftype == TType.STRING:
- self.bits = iprot.readString();
+ self.bits = iprot.readString()
else:
iprot.skip(ftype)
else:
@@ -5750,7 +5777,7 @@ class LSApprovedWorkers:
(_ktype407, _vtype408, _size406 ) = iprot.readMapBegin()
for _i410 in xrange(_size406):
_key411 = iprot.readString().decode('utf-8')
- _val412 = iprot.readI32();
+ _val412 = iprot.readI32()
self.approved_workers[_key411] = _val412
iprot.readMapEnd()
else:
@@ -5826,7 +5853,7 @@ class LSSupervisorAssignments:
self.assignments = {}
(_ktype416, _vtype417, _size415 ) = iprot.readMapBegin()
for _i419 in xrange(_size415):
- _key420 = iprot.readI32();
+ _key420 = iprot.readI32()
_val421 = LocalAssignment()
_val421.read(iprot)
self.assignments[_key420] = _val421
@@ -5910,7 +5937,7 @@ class LSWorkerHeartbeat:
break
if fid == 1:
if ftype == TType.I32:
- self.time_secs = iprot.readI32();
+ self.time_secs = iprot.readI32()
else:
iprot.skip(ftype)
elif fid == 2:
@@ -5931,7 +5958,7 @@ class LSWorkerHeartbeat:
iprot.skip(ftype)
elif fid == 4:
if ftype == TType.I32:
- self.port = iprot.readI32();
+ self.port = iprot.readI32()
else:
iprot.skip(ftype)
else:
@@ -6022,7 +6049,7 @@ class GetInfoOptions:
break
if fid == 1:
if ftype == TType.I32:
- self.num_err_choice = iprot.readI32();
+ self.num_err_choice = iprot.readI32()
else:
iprot.skip(ftype)
else:
http://git-wip-us.apache.org/repos/asf/storm/blob/77985a07/storm-core/src/storm.thrift
----------------------------------------------------------------------
diff --git a/storm-core/src/storm.thrift b/storm-core/src/storm.thrift
index a4b0b2a..4f81635 100644
--- a/storm-core/src/storm.thrift
+++ b/storm-core/src/storm.thrift
@@ -117,6 +117,11 @@ struct StormTopology {
1: required map<string, SpoutSpec> spouts;
2: required map<string, Bolt> bolts;
3: required map<string, StateSpoutSpec> state_spouts;
+ #reserved 4: optional list<binary> worker_hooks;
+ #reserved 5: optional list<string> dependency_jars;
+ #reserved 6: optional list<string> dependency_artifacts;
+ 7: optional string storm_version;
+ 8: optional string jdk_version;
}
exception AlreadyAliveException {
[2/3] storm git commit: Merge branch 'STORM-2448-0.10.x' of
https://github.com/revans2/incubator-storm into STORM-2448-0.10.x
Posted by bo...@apache.org.
Merge branch 'STORM-2448-0.10.x' of https://github.com/revans2/incubator-storm into STORM-2448-0.10.x
STORM-2448: Report storm and jdk versions to nimbus on topology
submission
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b2f3f38f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b2f3f38f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b2f3f38f
Branch: refs/heads/0.10.x-branch
Commit: b2f3f38fcef87af01a339255c1e14bd2424734bf
Parents: c56b7cf 77985a0
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Tue May 23 10:03:13 2017 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Tue May 23 10:03:13 2017 -0500
----------------------------------------------------------------------
.../src/clj/backtype/storm/daemon/common.clj | 11 +-
.../src/clj/backtype/storm/daemon/nimbus.clj | 4 +-
storm-core/src/clj/backtype/storm/testing.clj | 4 +-
.../src/jvm/backtype/storm/StormSubmitter.java | 1 +
.../backtype/storm/generated/StormTopology.java | 222 ++++++++++++-
.../storm/topology/TopologyBuilder.java | 4 +-
.../storm/utils/ThriftTopologyUtils.java | 29 +-
.../src/jvm/backtype/storm/utils/Utils.java | 21 ++
storm-core/src/py/storm/DistributedRPC-remote | 2 +-
storm-core/src/py/storm/DistributedRPC.py | 20 +-
.../py/storm/DistributedRPCInvocations-remote | 2 +-
.../src/py/storm/DistributedRPCInvocations.py | 41 ++-
storm-core/src/py/storm/Nimbus-remote | 2 +-
storm-core/src/py/storm/Nimbus.py | 318 +++++++++++++++----
storm-core/src/py/storm/constants.py | 2 +-
storm-core/src/py/storm/ttypes.py | 161 ++++++----
storm-core/src/storm.thrift | 5 +
17 files changed, 661 insertions(+), 188 deletions(-)
----------------------------------------------------------------------
[3/3] storm git commit: Added STORM-2448 to Changelog
Posted by bo...@apache.org.
Added STORM-2448 to Changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ecba328c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ecba328c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ecba328c
Branch: refs/heads/0.10.x-branch
Commit: ecba328c8729d54372077c2f0b537aab6ee1cf91
Parents: b2f3f38
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Tue May 23 10:03:44 2017 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Tue May 23 10:03:44 2017 -0500
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/ecba328c/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 211922f..9c8a182 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 0.10.3
+ * STORM-2448: Report storm and jdk versions to nimbus on topology submission
* STORM-2486: Prevent cd from printing target directory to avoid breaking classpath
* STORM-1114: Race condition in trident zookeeper zk-node create/delete
* STORM-2158: Fix OutOfMemoryError in Nimbus' SimpleTransportPlugin