You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/05/23 14:37:55 UTC

[1/3] storm git commit: STORM-2448: Add in Storm and JDK versions when submitting a topology

Repository: storm
Updated Branches:
  refs/heads/1.x-branch 7d904f0e6 -> 9a4185c49


STORM-2448:  Add in Storm and JDK versions when submitting a topology


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/23ed16ad
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/23ed16ad
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/23ed16ad

Branch: refs/heads/1.x-branch
Commit: 23ed16adb7fa8d937fdd225d529658fe1a5cee2f
Parents: ca66b1a
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Apr 4 12:34:49 2017 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Sat Apr 8 21:05:26 2017 -0500

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/common.clj  |  34 ++-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |   4 +-
 storm-core/src/clj/org/apache/storm/testing.clj |   4 +-
 .../jvm/org/apache/storm/StormSubmitter.java    |   2 +-
 .../apache/storm/generated/StormTopology.java   | 220 ++++++++++++++++++-
 .../apache/storm/topology/TopologyBuilder.java  |   2 +-
 .../src/jvm/org/apache/storm/utils/Utils.java   |  20 ++
 storm-core/src/py/storm/ttypes.py               |  28 ++-
 storm-core/src/storm.thrift                     |   2 +
 9 files changed, 287 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/23ed16ad/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index 20e03a0..db01727 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -121,32 +121,28 @@
         )))))
 
 (defn- validate-ids! [^StormTopology topology]
-  (let [sets (map #(.getFieldValue topology %) thrift/STORM-TOPOLOGY-FIELDS)
+  (let [sets [(.get_bolts topology) (.get_spouts topology) (.get_state_spouts topology)]
         offending (apply any-intersection sets)]
     (if-not (empty? offending)
       (throw (InvalidTopologyException.
               (str "Duplicate component ids: " offending))))
-    (doseq [f thrift/STORM-TOPOLOGY-FIELDS
-            :let [obj-map (.getFieldValue topology f)]]
-      (if-not (or (ThriftTopologyUtils/isWorkerHook f)
-                   (ThriftTopologyUtils/isDependencies f))
-        (do
-          (doseq [id (keys obj-map)]
-            (if (Utils/isSystemId id)
-              (throw (InvalidTopologyException.
-                       (str id " is not a valid component id")))))
-          (doseq [obj (vals obj-map)
-                  id (-> obj .get_common .get_streams keys)]
-            (if (Utils/isSystemId id)
-              (throw (InvalidTopologyException.
-                       (str id " is not a valid stream id"))))))))))
+    (doseq [obj-map sets]
+      (do
+        (doseq [id (keys obj-map)]
+          (if (Utils/isSystemId id)
+            (throw (InvalidTopologyException.
+                     (str id " is not a valid component id")))))
+        (doseq [obj (vals obj-map)
+                id (-> obj .get_common .get_streams keys)]
+          (if (Utils/isSystemId id)
+            (throw (InvalidTopologyException.
+                     (str id " is not a valid stream id")))))))))
 
 (defn all-components [^StormTopology topology]
   (apply merge {}
-    (for [f thrift/STORM-TOPOLOGY-FIELDS]
-      (if-not (or (ThriftTopologyUtils/isWorkerHook f)
-                   (ThriftTopologyUtils/isDependencies f))
-        (.getFieldValue topology f)))))
+    (.get_bolts topology)
+    (.get_spouts topology)
+    (.get_state_spouts topology)))
 
 (defn component-conf [component]
   (->> component

http://git-wip-us.apache.org/repos/asf/storm/blob/23ed16ad/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index f3b3373..3bd8a17 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -1698,7 +1698,9 @@
             (throw (IllegalArgumentException. "The cluster is configured for zookeeper authentication, but no payload was provided.")))
           (log-message "Received topology submission for "
                        storm-name
-                       " with conf "
+                       " (storm-" (.get_storm_version topology)
+                       " JDK-" (.get_jdk_version topology)
+                       ") with conf "
                        (redact-value storm-conf STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD))
           ;; lock protects against multiple topologies being submitted at once and
           ;; cleanup thread killing topology in b/w assignment and starting the topology

http://git-wip-us.apache.org/repos/asf/storm/blob/23ed16ad/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
index c061922..db6b94b 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -307,13 +307,13 @@
   [nimbus storm-name conf topology]
   (when-not (Utils/isValidConf conf)
     (throw (IllegalArgumentException. "Topology conf is not json-serializable")))
-  (.submitTopology nimbus storm-name nil (to-json conf) topology))
+  (.submitTopology nimbus storm-name nil (to-json conf) (Utils/addVersions topology)))
 
 (defn submit-local-topology-with-opts
   [nimbus storm-name conf topology submit-opts]
   (when-not (Utils/isValidConf conf)
     (throw (IllegalArgumentException. "Topology conf is not json-serializable")))
-  (.submitTopologyWithOpts nimbus storm-name nil (to-json conf) topology submit-opts))
+  (.submitTopologyWithOpts nimbus storm-name nil (to-json conf) (Utils/addVersions topology) submit-opts))
 
 (defn mocked-convert-assignments-to-worker->resources [storm-cluster-state storm-name worker->resources]
   (fn [existing-assignments]

http://git-wip-us.apache.org/repos/asf/storm/blob/23ed16ad/storm-core/src/jvm/org/apache/storm/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/StormSubmitter.java b/storm-core/src/jvm/org/apache/storm/StormSubmitter.java
index d382dd9..eeb90c5 100644
--- a/storm-core/src/jvm/org/apache/storm/StormSubmitter.java
+++ b/storm-core/src/jvm/org/apache/storm/StormSubmitter.java
@@ -321,7 +321,7 @@ public class StormSubmitter {
         try {
             String jar = submitJarAs(conf, System.getProperty("storm.jar"), progressListener, client);
             LOG.info("Submitting topology {} in distributed mode with conf {}", name, serConf);
-
+            Utils.addVersions(topology);
             if (opts != null) {
                 client.getClient().submitTopologyWithOpts(name, jar, serConf, topology, opts);
             } else {

http://git-wip-us.apache.org/repos/asf/storm/blob/23ed16ad/storm-core/src/jvm/org/apache/storm/generated/StormTopology.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/generated/StormTopology.java b/storm-core/src/jvm/org/apache/storm/generated/StormTopology.java
index caec6c6..6241d7b 100644
--- a/storm-core/src/jvm/org/apache/storm/generated/StormTopology.java
+++ b/storm-core/src/jvm/org/apache/storm/generated/StormTopology.java
@@ -61,6 +61,8 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
   private static final org.apache.thrift.protocol.TField WORKER_HOOKS_FIELD_DESC = new org.apache.thrift.protocol.TField("worker_hooks", org.apache.thrift.protocol.TType.LIST, (short)4);
   private static final org.apache.thrift.protocol.TField DEPENDENCY_JARS_FIELD_DESC = new org.apache.thrift.protocol.TField("dependency_jars", org.apache.thrift.protocol.TType.LIST, (short)5);
   private static final org.apache.thrift.protocol.TField DEPENDENCY_ARTIFACTS_FIELD_DESC = new org.apache.thrift.protocol.TField("dependency_artifacts", org.apache.thrift.protocol.TType.LIST, (short)6);
+  private static final org.apache.thrift.protocol.TField STORM_VERSION_FIELD_DESC = new org.apache.thrift.protocol.TField("storm_version", org.apache.thrift.protocol.TType.STRING, (short)7);
+  private static final org.apache.thrift.protocol.TField JDK_VERSION_FIELD_DESC = new org.apache.thrift.protocol.TField("jdk_version", org.apache.thrift.protocol.TType.STRING, (short)8);
 
   private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
   static {
@@ -74,6 +76,8 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
   private List<ByteBuffer> worker_hooks; // optional
   private List<String> dependency_jars; // optional
   private List<String> dependency_artifacts; // optional
+  private String storm_version; // optional
+  private String jdk_version; // optional
 
   /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -82,7 +86,9 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
     STATE_SPOUTS((short)3, "state_spouts"),
     WORKER_HOOKS((short)4, "worker_hooks"),
     DEPENDENCY_JARS((short)5, "dependency_jars"),
-    DEPENDENCY_ARTIFACTS((short)6, "dependency_artifacts");
+    DEPENDENCY_ARTIFACTS((short)6, "dependency_artifacts"),
+    STORM_VERSION((short)7, "storm_version"),
+    JDK_VERSION((short)8, "jdk_version");
 
     private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
 
@@ -109,6 +115,10 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
           return DEPENDENCY_JARS;
         case 6: // DEPENDENCY_ARTIFACTS
           return DEPENDENCY_ARTIFACTS;
+        case 7: // STORM_VERSION
+          return STORM_VERSION;
+        case 8: // JDK_VERSION
+          return JDK_VERSION;
         default:
           return null;
       }
@@ -149,7 +159,7 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
   }
 
   // isset id assignments
-  private static final _Fields optionals[] = {_Fields.WORKER_HOOKS,_Fields.DEPENDENCY_JARS,_Fields.DEPENDENCY_ARTIFACTS};
+  private static final _Fields optionals[] = {_Fields.WORKER_HOOKS,_Fields.DEPENDENCY_JARS,_Fields.DEPENDENCY_ARTIFACTS,_Fields.STORM_VERSION,_Fields.JDK_VERSION};
   public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
   static {
     Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -174,6 +184,10 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
     tmpMap.put(_Fields.DEPENDENCY_ARTIFACTS, new org.apache.thrift.meta_data.FieldMetaData("dependency_artifacts", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
         new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST, 
             new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))));
+    tmpMap.put(_Fields.STORM_VERSION, new org.apache.thrift.meta_data.FieldMetaData("storm_version", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.JDK_VERSION, new org.apache.thrift.meta_data.FieldMetaData("jdk_version", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
     metaDataMap = Collections.unmodifiableMap(tmpMap);
     org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(StormTopology.class, metaDataMap);
   }
@@ -253,6 +267,12 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
       List<String> __this__dependency_artifacts = new ArrayList<String>(other.dependency_artifacts);
       this.dependency_artifacts = __this__dependency_artifacts;
     }
+    if (other.is_set_storm_version()) {
+      this.storm_version = other.storm_version;
+    }
+    if (other.is_set_jdk_version()) {
+      this.jdk_version = other.jdk_version;
+    }
   }
 
   public StormTopology deepCopy() {
@@ -267,6 +287,8 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
     this.worker_hooks = null;
     this.dependency_jars = null;
     this.dependency_artifacts = null;
+    this.storm_version = null;
+    this.jdk_version = null;
   }
 
   public int get_spouts_size() {
@@ -485,6 +507,52 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
     }
   }
 
+  public String get_storm_version() {
+    return this.storm_version;
+  }
+
+  public void set_storm_version(String storm_version) {
+    this.storm_version = storm_version;
+  }
+
+  public void unset_storm_version() {
+    this.storm_version = null;
+  }
+
+  /** Returns true if field storm_version is set (has been assigned a value) and false otherwise */
+  public boolean is_set_storm_version() {
+    return this.storm_version != null;
+  }
+
+  public void set_storm_version_isSet(boolean value) {
+    if (!value) {
+      this.storm_version = null;
+    }
+  }
+
+  public String get_jdk_version() {
+    return this.jdk_version;
+  }
+
+  public void set_jdk_version(String jdk_version) {
+    this.jdk_version = jdk_version;
+  }
+
+  public void unset_jdk_version() {
+    this.jdk_version = null;
+  }
+
+  /** Returns true if field jdk_version is set (has been assigned a value) and false otherwise */
+  public boolean is_set_jdk_version() {
+    return this.jdk_version != null;
+  }
+
+  public void set_jdk_version_isSet(boolean value) {
+    if (!value) {
+      this.jdk_version = null;
+    }
+  }
+
   public void setFieldValue(_Fields field, Object value) {
     switch (field) {
     case SPOUTS:
@@ -535,6 +603,22 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
       }
       break;
 
+    case STORM_VERSION:
+      if (value == null) {
+        unset_storm_version();
+      } else {
+        set_storm_version((String)value);
+      }
+      break;
+
+    case JDK_VERSION:
+      if (value == null) {
+        unset_jdk_version();
+      } else {
+        set_jdk_version((String)value);
+      }
+      break;
+
     }
   }
 
@@ -558,6 +642,12 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
     case DEPENDENCY_ARTIFACTS:
       return get_dependency_artifacts();
 
+    case STORM_VERSION:
+      return get_storm_version();
+
+    case JDK_VERSION:
+      return get_jdk_version();
+
     }
     throw new IllegalStateException();
   }
@@ -581,6 +671,10 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
       return is_set_dependency_jars();
     case DEPENDENCY_ARTIFACTS:
       return is_set_dependency_artifacts();
+    case STORM_VERSION:
+      return is_set_storm_version();
+    case JDK_VERSION:
+      return is_set_jdk_version();
     }
     throw new IllegalStateException();
   }
@@ -652,6 +746,24 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
         return false;
     }
 
+    boolean this_present_storm_version = true && this.is_set_storm_version();
+    boolean that_present_storm_version = true && that.is_set_storm_version();
+    if (this_present_storm_version || that_present_storm_version) {
+      if (!(this_present_storm_version && that_present_storm_version))
+        return false;
+      if (!this.storm_version.equals(that.storm_version))
+        return false;
+    }
+
+    boolean this_present_jdk_version = true && this.is_set_jdk_version();
+    boolean that_present_jdk_version = true && that.is_set_jdk_version();
+    if (this_present_jdk_version || that_present_jdk_version) {
+      if (!(this_present_jdk_version && that_present_jdk_version))
+        return false;
+      if (!this.jdk_version.equals(that.jdk_version))
+        return false;
+    }
+
     return true;
   }
 
@@ -689,6 +801,16 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
     if (present_dependency_artifacts)
       list.add(dependency_artifacts);
 
+    boolean present_storm_version = true && (is_set_storm_version());
+    list.add(present_storm_version);
+    if (present_storm_version)
+      list.add(storm_version);
+
+    boolean present_jdk_version = true && (is_set_jdk_version());
+    list.add(present_jdk_version);
+    if (present_jdk_version)
+      list.add(jdk_version);
+
     return list.hashCode();
   }
 
@@ -760,6 +882,26 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
         return lastComparison;
       }
     }
+    lastComparison = Boolean.valueOf(is_set_storm_version()).compareTo(other.is_set_storm_version());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_storm_version()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.storm_version, other.storm_version);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(is_set_jdk_version()).compareTo(other.is_set_jdk_version());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_jdk_version()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.jdk_version, other.jdk_version);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     return 0;
   }
 
@@ -833,6 +975,26 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
       }
       first = false;
     }
+    if (is_set_storm_version()) {
+      if (!first) sb.append(", ");
+      sb.append("storm_version:");
+      if (this.storm_version == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.storm_version);
+      }
+      first = false;
+    }
+    if (is_set_jdk_version()) {
+      if (!first) sb.append(", ");
+      sb.append("jdk_version:");
+      if (this.jdk_version == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.jdk_version);
+      }
+      first = false;
+    }
     sb.append(")");
     return sb.toString();
   }
@@ -1005,6 +1167,22 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
             }
             break;
+          case 7: // STORM_VERSION
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.storm_version = iprot.readString();
+              struct.set_storm_version_isSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 8: // JDK_VERSION
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.jdk_version = iprot.readString();
+              struct.set_jdk_version_isSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
           default:
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
         }
@@ -1099,6 +1277,20 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
           oprot.writeFieldEnd();
         }
       }
+      if (struct.storm_version != null) {
+        if (struct.is_set_storm_version()) {
+          oprot.writeFieldBegin(STORM_VERSION_FIELD_DESC);
+          oprot.writeString(struct.storm_version);
+          oprot.writeFieldEnd();
+        }
+      }
+      if (struct.jdk_version != null) {
+        if (struct.is_set_jdk_version()) {
+          oprot.writeFieldBegin(JDK_VERSION_FIELD_DESC);
+          oprot.writeString(struct.jdk_version);
+          oprot.writeFieldEnd();
+        }
+      }
       oprot.writeFieldStop();
       oprot.writeStructEnd();
     }
@@ -1150,7 +1342,13 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
       if (struct.is_set_dependency_artifacts()) {
         optionals.set(2);
       }
-      oprot.writeBitSet(optionals, 3);
+      if (struct.is_set_storm_version()) {
+        optionals.set(3);
+      }
+      if (struct.is_set_jdk_version()) {
+        optionals.set(4);
+      }
+      oprot.writeBitSet(optionals, 5);
       if (struct.is_set_worker_hooks()) {
         {
           oprot.writeI32(struct.worker_hooks.size());
@@ -1178,6 +1376,12 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
           }
         }
       }
+      if (struct.is_set_storm_version()) {
+        oprot.writeString(struct.storm_version);
+      }
+      if (struct.is_set_jdk_version()) {
+        oprot.writeString(struct.jdk_version);
+      }
     }
 
     @Override
@@ -1225,7 +1429,7 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
         }
       }
       struct.set_state_spouts_isSet(true);
-      BitSet incoming = iprot.readBitSet(3);
+      BitSet incoming = iprot.readBitSet(5);
       if (incoming.get(0)) {
         {
           org.apache.thrift.protocol.TList _list89 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, iprot.readI32());
@@ -1265,6 +1469,14 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
         }
         struct.set_dependency_artifacts_isSet(true);
       }
+      if (incoming.get(3)) {
+        struct.storm_version = iprot.readString();
+        struct.set_storm_version_isSet(true);
+      }
+      if (incoming.get(4)) {
+        struct.jdk_version = iprot.readString();
+        struct.set_jdk_version_isSet(true);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/23ed16ad/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java b/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
index 40ede4c..ea4488b 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
@@ -153,7 +153,7 @@ public class TopologyBuilder {
 
         stormTopology.set_worker_hooks(_workerHooks);
 
-        return stormTopology;
+        return Utils.addVersions(stormTopology);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/23ed16ad/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index 1a13a6f..b9ced2c 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -2271,4 +2271,24 @@ public class Utils {
         return ret;
     }
 
+    /**
+     * Add version information to the given topology
+     * @param topology the topology being submitted (MIGHT BE MODIFIED)
+     * @return topology
+     */
+    public static StormTopology addVersions(StormTopology topology) {
+        String stormVersion = VersionInfo.getVersion();
+        LOG.warn("STORM-VERSION new {} old {}", stormVersion, topology.get_storm_version());
+        if (stormVersion != null && 
+                !"Unknown".equalsIgnoreCase(stormVersion) && 
+                !topology.is_set_storm_version()) {
+            topology.set_storm_version(stormVersion);
+        }
+        
+        String jdkVersion = System.getProperty("java.version");
+        if (jdkVersion != null && !topology.is_set_jdk_version()) {
+            topology.set_jdk_version(jdkVersion);
+        }
+        return topology;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/23ed16ad/storm-core/src/py/storm/ttypes.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/ttypes.py b/storm-core/src/py/storm/ttypes.py
index 4fac146..8262285 100644
--- a/storm-core/src/py/storm/ttypes.py
+++ b/storm-core/src/py/storm/ttypes.py
@@ -1394,6 +1394,8 @@ class StormTopology:
    - worker_hooks
    - dependency_jars
    - dependency_artifacts
+   - storm_version
+   - jdk_version
   """
 
   thrift_spec = (
@@ -1404,15 +1406,19 @@ class StormTopology:
     (4, TType.LIST, 'worker_hooks', (TType.STRING,None), None, ), # 4
     (5, TType.LIST, 'dependency_jars', (TType.STRING,None), None, ), # 5
     (6, TType.LIST, 'dependency_artifacts', (TType.STRING,None), None, ), # 6
+    (7, TType.STRING, 'storm_version', None, None, ), # 7
+    (8, TType.STRING, 'jdk_version', None, None, ), # 8
   )
 
-  def __init__(self, spouts=None, bolts=None, state_spouts=None, worker_hooks=None, dependency_jars=None, dependency_artifacts=None,):
+  def __init__(self, spouts=None, bolts=None, state_spouts=None, worker_hooks=None, dependency_jars=None, dependency_artifacts=None, storm_version=None, jdk_version=None,):
     self.spouts = spouts
     self.bolts = bolts
     self.state_spouts = state_spouts
     self.worker_hooks = worker_hooks
     self.dependency_jars = dependency_jars
     self.dependency_artifacts = dependency_artifacts
+    self.storm_version = storm_version
+    self.jdk_version = jdk_version
 
   def read(self, iprot):
     if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -1489,6 +1495,16 @@ class StormTopology:
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
+      elif fid == 7:
+        if ftype == TType.STRING:
+          self.storm_version = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
+      elif fid == 8:
+        if ftype == TType.STRING:
+          self.jdk_version = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
       else:
         iprot.skip(ftype)
       iprot.readFieldEnd()
@@ -1544,6 +1560,14 @@ class StormTopology:
         oprot.writeString(iter86.encode('utf-8'))
       oprot.writeListEnd()
       oprot.writeFieldEnd()
+    if self.storm_version is not None:
+      oprot.writeFieldBegin('storm_version', TType.STRING, 7)
+      oprot.writeString(self.storm_version.encode('utf-8'))
+      oprot.writeFieldEnd()
+    if self.jdk_version is not None:
+      oprot.writeFieldBegin('jdk_version', TType.STRING, 8)
+      oprot.writeString(self.jdk_version.encode('utf-8'))
+      oprot.writeFieldEnd()
     oprot.writeFieldStop()
     oprot.writeStructEnd()
 
@@ -1565,6 +1589,8 @@ class StormTopology:
     value = (value * 31) ^ hash(self.worker_hooks)
     value = (value * 31) ^ hash(self.dependency_jars)
     value = (value * 31) ^ hash(self.dependency_artifacts)
+    value = (value * 31) ^ hash(self.storm_version)
+    value = (value * 31) ^ hash(self.jdk_version)
     return value
 
   def __repr__(self):

http://git-wip-us.apache.org/repos/asf/storm/blob/23ed16ad/storm-core/src/storm.thrift
----------------------------------------------------------------------
diff --git a/storm-core/src/storm.thrift b/storm-core/src/storm.thrift
index 700e5a0..146591f 100644
--- a/storm-core/src/storm.thrift
+++ b/storm-core/src/storm.thrift
@@ -120,6 +120,8 @@ struct StormTopology {
   4: optional list<binary> worker_hooks;
   5: optional list<string> dependency_jars;
   6: optional list<string> dependency_artifacts;
+  7: optional string storm_version;
+  8: optional string jdk_version;
 }
 
 exception AlreadyAliveException {


[2/3] storm git commit: Merge branch 'STORM-2448-1.x' of https://github.com/revans2/incubator-storm into STORM-2448-1.x

Posted by bo...@apache.org.
Merge branch 'STORM-2448-1.x' of https://github.com/revans2/incubator-storm into STORM-2448-1.x

STORM-2448: Add in Storm and JDK versions when submitting a topology


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bfffa7f7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bfffa7f7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bfffa7f7

Branch: refs/heads/1.x-branch
Commit: bfffa7f7f7accf54b2a1f1544e76819233c54ad5
Parents: 7d904f0 23ed16a
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Tue May 23 09:17:46 2017 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Tue May 23 09:17:46 2017 -0500

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/common.clj  |  34 ++-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |   4 +-
 storm-core/src/clj/org/apache/storm/testing.clj |   4 +-
 .../jvm/org/apache/storm/StormSubmitter.java    |   2 +-
 .../apache/storm/generated/StormTopology.java   | 220 ++++++++++++++++++-
 .../apache/storm/topology/TopologyBuilder.java  |   2 +-
 .../src/jvm/org/apache/storm/utils/Utils.java   |  20 ++
 storm-core/src/py/storm/ttypes.py               |  28 ++-
 storm-core/src/storm.thrift                     |   2 +
 9 files changed, 287 insertions(+), 29 deletions(-)
----------------------------------------------------------------------



[3/3] storm git commit: Added STORM-2448 to Changelog

Posted by bo...@apache.org.
Added STORM-2448 to Changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9a4185c4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9a4185c4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9a4185c4

Branch: refs/heads/1.x-branch
Commit: 9a4185c4973a4b18696729da7f97cf1a4f499749
Parents: bfffa7f
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Tue May 23 09:37:26 2017 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Tue May 23 09:37:26 2017 -0500

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/9a4185c4/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 1e1cdaa..4d5391d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 1.1.1
+ * STORM-2448: Add in Storm and JDK versions when submitting a topology
  * STORM-2413: Make new Kafka spout respect tuple retry limits
  * STORM-2518 Handles empty name for "USER type" ACL when normalizing
  * STORM-2501: Auto populate Hive Credentials