You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/05/23 14:37:55 UTC
[1/3] storm git commit: STORM-2448: Add in Storm and JDK versions
when submitting a topology
Repository: storm
Updated Branches:
refs/heads/1.x-branch 7d904f0e6 -> 9a4185c49
STORM-2448: Add in Storm and JDK versions when submitting a topology
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/23ed16ad
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/23ed16ad
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/23ed16ad
Branch: refs/heads/1.x-branch
Commit: 23ed16adb7fa8d937fdd225d529658fe1a5cee2f
Parents: ca66b1a
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Apr 4 12:34:49 2017 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Sat Apr 8 21:05:26 2017 -0500
----------------------------------------------------------------------
.../src/clj/org/apache/storm/daemon/common.clj | 34 ++-
.../src/clj/org/apache/storm/daemon/nimbus.clj | 4 +-
storm-core/src/clj/org/apache/storm/testing.clj | 4 +-
.../jvm/org/apache/storm/StormSubmitter.java | 2 +-
.../apache/storm/generated/StormTopology.java | 220 ++++++++++++++++++-
.../apache/storm/topology/TopologyBuilder.java | 2 +-
.../src/jvm/org/apache/storm/utils/Utils.java | 20 ++
storm-core/src/py/storm/ttypes.py | 28 ++-
storm-core/src/storm.thrift | 2 +
9 files changed, 287 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/23ed16ad/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index 20e03a0..db01727 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -121,32 +121,28 @@
)))))
(defn- validate-ids! [^StormTopology topology]
- (let [sets (map #(.getFieldValue topology %) thrift/STORM-TOPOLOGY-FIELDS)
+ (let [sets [(.get_bolts topology) (.get_spouts topology) (.get_state_spouts topology)]
offending (apply any-intersection sets)]
(if-not (empty? offending)
(throw (InvalidTopologyException.
(str "Duplicate component ids: " offending))))
- (doseq [f thrift/STORM-TOPOLOGY-FIELDS
- :let [obj-map (.getFieldValue topology f)]]
- (if-not (or (ThriftTopologyUtils/isWorkerHook f)
- (ThriftTopologyUtils/isDependencies f))
- (do
- (doseq [id (keys obj-map)]
- (if (Utils/isSystemId id)
- (throw (InvalidTopologyException.
- (str id " is not a valid component id")))))
- (doseq [obj (vals obj-map)
- id (-> obj .get_common .get_streams keys)]
- (if (Utils/isSystemId id)
- (throw (InvalidTopologyException.
- (str id " is not a valid stream id"))))))))))
+ (doseq [obj-map sets]
+ (do
+ (doseq [id (keys obj-map)]
+ (if (Utils/isSystemId id)
+ (throw (InvalidTopologyException.
+ (str id " is not a valid component id")))))
+ (doseq [obj (vals obj-map)
+ id (-> obj .get_common .get_streams keys)]
+ (if (Utils/isSystemId id)
+ (throw (InvalidTopologyException.
+ (str id " is not a valid stream id")))))))))
(defn all-components [^StormTopology topology]
(apply merge {}
- (for [f thrift/STORM-TOPOLOGY-FIELDS]
- (if-not (or (ThriftTopologyUtils/isWorkerHook f)
- (ThriftTopologyUtils/isDependencies f))
- (.getFieldValue topology f)))))
+ (.get_bolts topology)
+ (.get_spouts topology)
+ (.get_state_spouts topology)))
(defn component-conf [component]
(->> component
http://git-wip-us.apache.org/repos/asf/storm/blob/23ed16ad/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index f3b3373..3bd8a17 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -1698,7 +1698,9 @@
(throw (IllegalArgumentException. "The cluster is configured for zookeeper authentication, but no payload was provided.")))
(log-message "Received topology submission for "
storm-name
- " with conf "
+ " (storm-" (.get_storm_version topology)
+ " JDK-" (.get_jdk_version topology)
+ ") with conf "
(redact-value storm-conf STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD))
;; lock protects against multiple topologies being submitted at once and
;; cleanup thread killing topology in b/w assignment and starting the topology
http://git-wip-us.apache.org/repos/asf/storm/blob/23ed16ad/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
index c061922..db6b94b 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -307,13 +307,13 @@
[nimbus storm-name conf topology]
(when-not (Utils/isValidConf conf)
(throw (IllegalArgumentException. "Topology conf is not json-serializable")))
- (.submitTopology nimbus storm-name nil (to-json conf) topology))
+ (.submitTopology nimbus storm-name nil (to-json conf) (Utils/addVersions topology)))
(defn submit-local-topology-with-opts
[nimbus storm-name conf topology submit-opts]
(when-not (Utils/isValidConf conf)
(throw (IllegalArgumentException. "Topology conf is not json-serializable")))
- (.submitTopologyWithOpts nimbus storm-name nil (to-json conf) topology submit-opts))
+ (.submitTopologyWithOpts nimbus storm-name nil (to-json conf) (Utils/addVersions topology) submit-opts))
(defn mocked-convert-assignments-to-worker->resources [storm-cluster-state storm-name worker->resources]
(fn [existing-assignments]
http://git-wip-us.apache.org/repos/asf/storm/blob/23ed16ad/storm-core/src/jvm/org/apache/storm/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/StormSubmitter.java b/storm-core/src/jvm/org/apache/storm/StormSubmitter.java
index d382dd9..eeb90c5 100644
--- a/storm-core/src/jvm/org/apache/storm/StormSubmitter.java
+++ b/storm-core/src/jvm/org/apache/storm/StormSubmitter.java
@@ -321,7 +321,7 @@ public class StormSubmitter {
try {
String jar = submitJarAs(conf, System.getProperty("storm.jar"), progressListener, client);
LOG.info("Submitting topology {} in distributed mode with conf {}", name, serConf);
-
+ Utils.addVersions(topology);
if (opts != null) {
client.getClient().submitTopologyWithOpts(name, jar, serConf, topology, opts);
} else {
http://git-wip-us.apache.org/repos/asf/storm/blob/23ed16ad/storm-core/src/jvm/org/apache/storm/generated/StormTopology.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/generated/StormTopology.java b/storm-core/src/jvm/org/apache/storm/generated/StormTopology.java
index caec6c6..6241d7b 100644
--- a/storm-core/src/jvm/org/apache/storm/generated/StormTopology.java
+++ b/storm-core/src/jvm/org/apache/storm/generated/StormTopology.java
@@ -61,6 +61,8 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
private static final org.apache.thrift.protocol.TField WORKER_HOOKS_FIELD_DESC = new org.apache.thrift.protocol.TField("worker_hooks", org.apache.thrift.protocol.TType.LIST, (short)4);
private static final org.apache.thrift.protocol.TField DEPENDENCY_JARS_FIELD_DESC = new org.apache.thrift.protocol.TField("dependency_jars", org.apache.thrift.protocol.TType.LIST, (short)5);
private static final org.apache.thrift.protocol.TField DEPENDENCY_ARTIFACTS_FIELD_DESC = new org.apache.thrift.protocol.TField("dependency_artifacts", org.apache.thrift.protocol.TType.LIST, (short)6);
+ private static final org.apache.thrift.protocol.TField STORM_VERSION_FIELD_DESC = new org.apache.thrift.protocol.TField("storm_version", org.apache.thrift.protocol.TType.STRING, (short)7);
+ private static final org.apache.thrift.protocol.TField JDK_VERSION_FIELD_DESC = new org.apache.thrift.protocol.TField("jdk_version", org.apache.thrift.protocol.TType.STRING, (short)8);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
@@ -74,6 +76,8 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
private List<ByteBuffer> worker_hooks; // optional
private List<String> dependency_jars; // optional
private List<String> dependency_artifacts; // optional
+ private String storm_version; // optional
+ private String jdk_version; // optional
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -82,7 +86,9 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
STATE_SPOUTS((short)3, "state_spouts"),
WORKER_HOOKS((short)4, "worker_hooks"),
DEPENDENCY_JARS((short)5, "dependency_jars"),
- DEPENDENCY_ARTIFACTS((short)6, "dependency_artifacts");
+ DEPENDENCY_ARTIFACTS((short)6, "dependency_artifacts"),
+ STORM_VERSION((short)7, "storm_version"),
+ JDK_VERSION((short)8, "jdk_version");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -109,6 +115,10 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
return DEPENDENCY_JARS;
case 6: // DEPENDENCY_ARTIFACTS
return DEPENDENCY_ARTIFACTS;
+ case 7: // STORM_VERSION
+ return STORM_VERSION;
+ case 8: // JDK_VERSION
+ return JDK_VERSION;
default:
return null;
}
@@ -149,7 +159,7 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
}
// isset id assignments
- private static final _Fields optionals[] = {_Fields.WORKER_HOOKS,_Fields.DEPENDENCY_JARS,_Fields.DEPENDENCY_ARTIFACTS};
+ private static final _Fields optionals[] = {_Fields.WORKER_HOOKS,_Fields.DEPENDENCY_JARS,_Fields.DEPENDENCY_ARTIFACTS,_Fields.STORM_VERSION,_Fields.JDK_VERSION};
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -174,6 +184,10 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
tmpMap.put(_Fields.DEPENDENCY_ARTIFACTS, new org.apache.thrift.meta_data.FieldMetaData("dependency_artifacts", org.apache.thrift.TFieldRequirementType.OPTIONAL,
new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))));
+ tmpMap.put(_Fields.STORM_VERSION, new org.apache.thrift.meta_data.FieldMetaData("storm_version", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.JDK_VERSION, new org.apache.thrift.meta_data.FieldMetaData("jdk_version", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(StormTopology.class, metaDataMap);
}
@@ -253,6 +267,12 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
List<String> __this__dependency_artifacts = new ArrayList<String>(other.dependency_artifacts);
this.dependency_artifacts = __this__dependency_artifacts;
}
+ if (other.is_set_storm_version()) {
+ this.storm_version = other.storm_version;
+ }
+ if (other.is_set_jdk_version()) {
+ this.jdk_version = other.jdk_version;
+ }
}
public StormTopology deepCopy() {
@@ -267,6 +287,8 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
this.worker_hooks = null;
this.dependency_jars = null;
this.dependency_artifacts = null;
+ this.storm_version = null;
+ this.jdk_version = null;
}
public int get_spouts_size() {
@@ -485,6 +507,52 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
}
}
+ public String get_storm_version() {
+ return this.storm_version;
+ }
+
+ public void set_storm_version(String storm_version) {
+ this.storm_version = storm_version;
+ }
+
+ public void unset_storm_version() {
+ this.storm_version = null;
+ }
+
+ /** Returns true if field storm_version is set (has been assigned a value) and false otherwise */
+ public boolean is_set_storm_version() {
+ return this.storm_version != null;
+ }
+
+ public void set_storm_version_isSet(boolean value) {
+ if (!value) {
+ this.storm_version = null;
+ }
+ }
+
+ public String get_jdk_version() {
+ return this.jdk_version;
+ }
+
+ public void set_jdk_version(String jdk_version) {
+ this.jdk_version = jdk_version;
+ }
+
+ public void unset_jdk_version() {
+ this.jdk_version = null;
+ }
+
+ /** Returns true if field jdk_version is set (has been assigned a value) and false otherwise */
+ public boolean is_set_jdk_version() {
+ return this.jdk_version != null;
+ }
+
+ public void set_jdk_version_isSet(boolean value) {
+ if (!value) {
+ this.jdk_version = null;
+ }
+ }
+
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case SPOUTS:
@@ -535,6 +603,22 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
}
break;
+ case STORM_VERSION:
+ if (value == null) {
+ unset_storm_version();
+ } else {
+ set_storm_version((String)value);
+ }
+ break;
+
+ case JDK_VERSION:
+ if (value == null) {
+ unset_jdk_version();
+ } else {
+ set_jdk_version((String)value);
+ }
+ break;
+
}
}
@@ -558,6 +642,12 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
case DEPENDENCY_ARTIFACTS:
return get_dependency_artifacts();
+ case STORM_VERSION:
+ return get_storm_version();
+
+ case JDK_VERSION:
+ return get_jdk_version();
+
}
throw new IllegalStateException();
}
@@ -581,6 +671,10 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
return is_set_dependency_jars();
case DEPENDENCY_ARTIFACTS:
return is_set_dependency_artifacts();
+ case STORM_VERSION:
+ return is_set_storm_version();
+ case JDK_VERSION:
+ return is_set_jdk_version();
}
throw new IllegalStateException();
}
@@ -652,6 +746,24 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
return false;
}
+ boolean this_present_storm_version = true && this.is_set_storm_version();
+ boolean that_present_storm_version = true && that.is_set_storm_version();
+ if (this_present_storm_version || that_present_storm_version) {
+ if (!(this_present_storm_version && that_present_storm_version))
+ return false;
+ if (!this.storm_version.equals(that.storm_version))
+ return false;
+ }
+
+ boolean this_present_jdk_version = true && this.is_set_jdk_version();
+ boolean that_present_jdk_version = true && that.is_set_jdk_version();
+ if (this_present_jdk_version || that_present_jdk_version) {
+ if (!(this_present_jdk_version && that_present_jdk_version))
+ return false;
+ if (!this.jdk_version.equals(that.jdk_version))
+ return false;
+ }
+
return true;
}
@@ -689,6 +801,16 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
if (present_dependency_artifacts)
list.add(dependency_artifacts);
+ boolean present_storm_version = true && (is_set_storm_version());
+ list.add(present_storm_version);
+ if (present_storm_version)
+ list.add(storm_version);
+
+ boolean present_jdk_version = true && (is_set_jdk_version());
+ list.add(present_jdk_version);
+ if (present_jdk_version)
+ list.add(jdk_version);
+
return list.hashCode();
}
@@ -760,6 +882,26 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(is_set_storm_version()).compareTo(other.is_set_storm_version());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_storm_version()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.storm_version, other.storm_version);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(is_set_jdk_version()).compareTo(other.is_set_jdk_version());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_jdk_version()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.jdk_version, other.jdk_version);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -833,6 +975,26 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
}
first = false;
}
+ if (is_set_storm_version()) {
+ if (!first) sb.append(", ");
+ sb.append("storm_version:");
+ if (this.storm_version == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.storm_version);
+ }
+ first = false;
+ }
+ if (is_set_jdk_version()) {
+ if (!first) sb.append(", ");
+ sb.append("jdk_version:");
+ if (this.jdk_version == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.jdk_version);
+ }
+ first = false;
+ }
sb.append(")");
return sb.toString();
}
@@ -1005,6 +1167,22 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
+ case 7: // STORM_VERSION
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.storm_version = iprot.readString();
+ struct.set_storm_version_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 8: // JDK_VERSION
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.jdk_version = iprot.readString();
+ struct.set_jdk_version_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@@ -1099,6 +1277,20 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
oprot.writeFieldEnd();
}
}
+ if (struct.storm_version != null) {
+ if (struct.is_set_storm_version()) {
+ oprot.writeFieldBegin(STORM_VERSION_FIELD_DESC);
+ oprot.writeString(struct.storm_version);
+ oprot.writeFieldEnd();
+ }
+ }
+ if (struct.jdk_version != null) {
+ if (struct.is_set_jdk_version()) {
+ oprot.writeFieldBegin(JDK_VERSION_FIELD_DESC);
+ oprot.writeString(struct.jdk_version);
+ oprot.writeFieldEnd();
+ }
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -1150,7 +1342,13 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
if (struct.is_set_dependency_artifacts()) {
optionals.set(2);
}
- oprot.writeBitSet(optionals, 3);
+ if (struct.is_set_storm_version()) {
+ optionals.set(3);
+ }
+ if (struct.is_set_jdk_version()) {
+ optionals.set(4);
+ }
+ oprot.writeBitSet(optionals, 5);
if (struct.is_set_worker_hooks()) {
{
oprot.writeI32(struct.worker_hooks.size());
@@ -1178,6 +1376,12 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
}
}
}
+ if (struct.is_set_storm_version()) {
+ oprot.writeString(struct.storm_version);
+ }
+ if (struct.is_set_jdk_version()) {
+ oprot.writeString(struct.jdk_version);
+ }
}
@Override
@@ -1225,7 +1429,7 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
}
}
struct.set_state_spouts_isSet(true);
- BitSet incoming = iprot.readBitSet(3);
+ BitSet incoming = iprot.readBitSet(5);
if (incoming.get(0)) {
{
org.apache.thrift.protocol.TList _list89 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, iprot.readI32());
@@ -1265,6 +1469,14 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
}
struct.set_dependency_artifacts_isSet(true);
}
+ if (incoming.get(3)) {
+ struct.storm_version = iprot.readString();
+ struct.set_storm_version_isSet(true);
+ }
+ if (incoming.get(4)) {
+ struct.jdk_version = iprot.readString();
+ struct.set_jdk_version_isSet(true);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/23ed16ad/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java b/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
index 40ede4c..ea4488b 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
@@ -153,7 +153,7 @@ public class TopologyBuilder {
stormTopology.set_worker_hooks(_workerHooks);
- return stormTopology;
+ return Utils.addVersions(stormTopology);
}
/**
http://git-wip-us.apache.org/repos/asf/storm/blob/23ed16ad/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index 1a13a6f..b9ced2c 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -2271,4 +2271,24 @@ public class Utils {
return ret;
}
+ /**
+ * Add version information to the given topology
+ * @param topology the topology being submitted (MIGHT BE MODIFIED)
+ * @return topology
+ */
+ public static StormTopology addVersions(StormTopology topology) {
+ String stormVersion = VersionInfo.getVersion();
+ LOG.warn("STORM-VERSION new {} old {}", stormVersion, topology.get_storm_version());
+ if (stormVersion != null &&
+ !"Unknown".equalsIgnoreCase(stormVersion) &&
+ !topology.is_set_storm_version()) {
+ topology.set_storm_version(stormVersion);
+ }
+
+ String jdkVersion = System.getProperty("java.version");
+ if (jdkVersion != null && !topology.is_set_jdk_version()) {
+ topology.set_jdk_version(jdkVersion);
+ }
+ return topology;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/23ed16ad/storm-core/src/py/storm/ttypes.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/ttypes.py b/storm-core/src/py/storm/ttypes.py
index 4fac146..8262285 100644
--- a/storm-core/src/py/storm/ttypes.py
+++ b/storm-core/src/py/storm/ttypes.py
@@ -1394,6 +1394,8 @@ class StormTopology:
- worker_hooks
- dependency_jars
- dependency_artifacts
+ - storm_version
+ - jdk_version
"""
thrift_spec = (
@@ -1404,15 +1406,19 @@ class StormTopology:
(4, TType.LIST, 'worker_hooks', (TType.STRING,None), None, ), # 4
(5, TType.LIST, 'dependency_jars', (TType.STRING,None), None, ), # 5
(6, TType.LIST, 'dependency_artifacts', (TType.STRING,None), None, ), # 6
+ (7, TType.STRING, 'storm_version', None, None, ), # 7
+ (8, TType.STRING, 'jdk_version', None, None, ), # 8
)
- def __init__(self, spouts=None, bolts=None, state_spouts=None, worker_hooks=None, dependency_jars=None, dependency_artifacts=None,):
+ def __init__(self, spouts=None, bolts=None, state_spouts=None, worker_hooks=None, dependency_jars=None, dependency_artifacts=None, storm_version=None, jdk_version=None,):
self.spouts = spouts
self.bolts = bolts
self.state_spouts = state_spouts
self.worker_hooks = worker_hooks
self.dependency_jars = dependency_jars
self.dependency_artifacts = dependency_artifacts
+ self.storm_version = storm_version
+ self.jdk_version = jdk_version
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -1489,6 +1495,16 @@ class StormTopology:
iprot.readListEnd()
else:
iprot.skip(ftype)
+ elif fid == 7:
+ if ftype == TType.STRING:
+ self.storm_version = iprot.readString().decode('utf-8')
+ else:
+ iprot.skip(ftype)
+ elif fid == 8:
+ if ftype == TType.STRING:
+ self.jdk_version = iprot.readString().decode('utf-8')
+ else:
+ iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
@@ -1544,6 +1560,14 @@ class StormTopology:
oprot.writeString(iter86.encode('utf-8'))
oprot.writeListEnd()
oprot.writeFieldEnd()
+ if self.storm_version is not None:
+ oprot.writeFieldBegin('storm_version', TType.STRING, 7)
+ oprot.writeString(self.storm_version.encode('utf-8'))
+ oprot.writeFieldEnd()
+ if self.jdk_version is not None:
+ oprot.writeFieldBegin('jdk_version', TType.STRING, 8)
+ oprot.writeString(self.jdk_version.encode('utf-8'))
+ oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
@@ -1565,6 +1589,8 @@ class StormTopology:
value = (value * 31) ^ hash(self.worker_hooks)
value = (value * 31) ^ hash(self.dependency_jars)
value = (value * 31) ^ hash(self.dependency_artifacts)
+ value = (value * 31) ^ hash(self.storm_version)
+ value = (value * 31) ^ hash(self.jdk_version)
return value
def __repr__(self):
http://git-wip-us.apache.org/repos/asf/storm/blob/23ed16ad/storm-core/src/storm.thrift
----------------------------------------------------------------------
diff --git a/storm-core/src/storm.thrift b/storm-core/src/storm.thrift
index 700e5a0..146591f 100644
--- a/storm-core/src/storm.thrift
+++ b/storm-core/src/storm.thrift
@@ -120,6 +120,8 @@ struct StormTopology {
4: optional list<binary> worker_hooks;
5: optional list<string> dependency_jars;
6: optional list<string> dependency_artifacts;
+ 7: optional string storm_version;
+ 8: optional string jdk_version;
}
exception AlreadyAliveException {
[2/3] storm git commit: Merge branch 'STORM-2448-1.x' of
https://github.com/revans2/incubator-storm into STORM-2448-1.x
Posted by bo...@apache.org.
Merge branch 'STORM-2448-1.x' of https://github.com/revans2/incubator-storm into STORM-2448-1.x
STORM-2448: Add in Storm and JDK versions when submitting a topology
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bfffa7f7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bfffa7f7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bfffa7f7
Branch: refs/heads/1.x-branch
Commit: bfffa7f7f7accf54b2a1f1544e76819233c54ad5
Parents: 7d904f0 23ed16a
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Tue May 23 09:17:46 2017 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Tue May 23 09:17:46 2017 -0500
----------------------------------------------------------------------
.../src/clj/org/apache/storm/daemon/common.clj | 34 ++-
.../src/clj/org/apache/storm/daemon/nimbus.clj | 4 +-
storm-core/src/clj/org/apache/storm/testing.clj | 4 +-
.../jvm/org/apache/storm/StormSubmitter.java | 2 +-
.../apache/storm/generated/StormTopology.java | 220 ++++++++++++++++++-
.../apache/storm/topology/TopologyBuilder.java | 2 +-
.../src/jvm/org/apache/storm/utils/Utils.java | 20 ++
storm-core/src/py/storm/ttypes.py | 28 ++-
storm-core/src/storm.thrift | 2 +
9 files changed, 287 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
[3/3] storm git commit: Added STORM-2448 to Changelog
Posted by bo...@apache.org.
Added STORM-2448 to Changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9a4185c4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9a4185c4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9a4185c4
Branch: refs/heads/1.x-branch
Commit: 9a4185c4973a4b18696729da7f97cf1a4f499749
Parents: bfffa7f
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Tue May 23 09:37:26 2017 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Tue May 23 09:37:26 2017 -0500
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/9a4185c4/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 1e1cdaa..4d5391d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 1.1.1
+ * STORM-2448: Add in Storm and JDK versions when submitting a topology
* STORM-2413: Make new Kafka spout respect tuple retry limits
* STORM-2518 Handles empty name for "USER type" ACL when normalizing
* STORM-2501: Auto populate Hive Credentials