You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/05/23 15:13:40 UTC

[1/3] storm git commit: STORM-2448: Report storm and jdk versions to nimbus on topology submission

Repository: storm
Updated Branches:
  refs/heads/0.10.x-branch c56b7cf27 -> ecba328c8


STORM-2448: Report storm and jdk versions to nimbus on topology submission


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/77985a07
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/77985a07
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/77985a07

Branch: refs/heads/0.10.x-branch
Commit: 77985a0756d099c379059dbef021ade4af0d4e88
Parents: 4fa445f
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Apr 4 14:32:18 2017 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Sat Apr 8 21:09:20 2017 -0500

----------------------------------------------------------------------
 .../src/clj/backtype/storm/daemon/common.clj    |  11 +-
 .../src/clj/backtype/storm/daemon/nimbus.clj    |   4 +-
 storm-core/src/clj/backtype/storm/testing.clj   |   4 +-
 .../src/jvm/backtype/storm/StormSubmitter.java  |   1 +
 .../backtype/storm/generated/StormTopology.java | 222 ++++++++++++-
 .../storm/topology/TopologyBuilder.java         |   4 +-
 .../storm/utils/ThriftTopologyUtils.java        |  29 +-
 .../src/jvm/backtype/storm/utils/Utils.java     |  21 ++
 storm-core/src/py/storm/DistributedRPC-remote   |   2 +-
 storm-core/src/py/storm/DistributedRPC.py       |  20 +-
 .../py/storm/DistributedRPCInvocations-remote   |   2 +-
 .../src/py/storm/DistributedRPCInvocations.py   |  41 ++-
 storm-core/src/py/storm/Nimbus-remote           |   2 +-
 storm-core/src/py/storm/Nimbus.py               | 318 +++++++++++++++----
 storm-core/src/py/storm/constants.py            |   2 +-
 storm-core/src/py/storm/ttypes.py               | 161 ++++++----
 storm-core/src/storm.thrift                     |   5 +
 17 files changed, 661 insertions(+), 188 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/77985a07/storm-core/src/clj/backtype/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/common.clj b/storm-core/src/clj/backtype/storm/daemon/common.clj
index e3a10ef..a592d8e 100644
--- a/storm-core/src/clj/backtype/storm/daemon/common.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/common.clj
@@ -102,13 +102,12 @@
         )))))
 
 (defn- validate-ids! [^StormTopology topology]
-  (let [sets (map #(.getFieldValue topology %) thrift/STORM-TOPOLOGY-FIELDS)
+  (let [sets [(.get_bolts topology) (.get_spouts topology) (.get_state_spouts topology)]
         offending (apply any-intersection sets)]
     (if-not (empty? offending)
       (throw (InvalidTopologyException.
               (str "Duplicate component ids: " offending))))
-    (doseq [f thrift/STORM-TOPOLOGY-FIELDS
-            :let [obj-map (.getFieldValue topology f)]]
+    (doseq [obj-map sets]
       (doseq [id (keys obj-map)]
         (if (system-id? id)
           (throw (InvalidTopologyException.
@@ -122,9 +121,9 @@
 
 (defn all-components [^StormTopology topology]
   (apply merge {}
-         (for [f thrift/STORM-TOPOLOGY-FIELDS]
-           (.getFieldValue topology f)
-           )))
+    (.get_bolts topology)
+    (.get_spouts topology)
+    (.get_state_spouts topology)))
 
 (defn component-conf [component]
   (->> component

http://git-wip-us.apache.org/repos/asf/storm/blob/77985a07/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index 7757708..b8b17bc 100644
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@ -1095,7 +1095,9 @@
                 (throw (IllegalArgumentException. "The cluster is configured for zookeeper authentication, but no payload was provided.")))
             (log-message "Received topology submission for "
                          storm-name
-                         " with conf "
+                         " (storm-" (.get_storm_version topology)
+                         " JDK-" (.get_jdk_version topology)
+                         ") with conf "
                          (redact-value storm-conf STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD))
             ;; lock protects against multiple topologies being submitted at once and
             ;; cleanup thread killing topology in b/w assignment and starting the topology

http://git-wip-us.apache.org/repos/asf/storm/blob/77985a07/storm-core/src/clj/backtype/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/testing.clj b/storm-core/src/clj/backtype/storm/testing.clj
index b0c6637..76e1b3f 100644
--- a/storm-core/src/clj/backtype/storm/testing.clj
+++ b/storm-core/src/clj/backtype/storm/testing.clj
@@ -273,13 +273,13 @@
   [nimbus storm-name conf topology]
   (when-not (Utils/isValidConf conf)
     (throw (IllegalArgumentException. "Topology conf is not json-serializable")))
-  (.submitTopology nimbus storm-name nil (to-json conf) topology))
+  (.submitTopology nimbus storm-name nil (to-json conf) (Utils/addVersions topology)))
 
 (defn submit-local-topology-with-opts
   [nimbus storm-name conf topology submit-opts]
   (when-not (Utils/isValidConf conf)
     (throw (IllegalArgumentException. "Topology conf is not json-serializable")))
-  (.submitTopologyWithOpts nimbus storm-name nil (to-json conf) topology submit-opts))
+  (.submitTopologyWithOpts nimbus storm-name nil (to-json conf) (Utils/addVersions topology) submit-opts))
 
 (defn mocked-compute-new-topology->executor->node+port [storm-name executor->node+port]
   (fn [nimbus existing-assignments topologies scratch-topology-id]

http://git-wip-us.apache.org/repos/asf/storm/blob/77985a07/storm-core/src/jvm/backtype/storm/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/StormSubmitter.java b/storm-core/src/jvm/backtype/storm/StormSubmitter.java
index c30cc7d..cbb64c6 100644
--- a/storm-core/src/jvm/backtype/storm/StormSubmitter.java
+++ b/storm-core/src/jvm/backtype/storm/StormSubmitter.java
@@ -201,6 +201,7 @@ public class StormSubmitter {
                 passedCreds = tmpCreds.get_creds();
             }
         }
+        topology = Utils.addVersions(topology);
         Map<String,String> fullCreds = populateCredentials(conf, passedCreds);
         if (!fullCreds.isEmpty()) {
             if (opts == null) {

http://git-wip-us.apache.org/repos/asf/storm/blob/77985a07/storm-core/src/jvm/backtype/storm/generated/StormTopology.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/StormTopology.java b/storm-core/src/jvm/backtype/storm/generated/StormTopology.java
index d022e95..0a1f356 100644
--- a/storm-core/src/jvm/backtype/storm/generated/StormTopology.java
+++ b/storm-core/src/jvm/backtype/storm/generated/StormTopology.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 /**
- * Autogenerated by Thrift Compiler (0.9.2)
+ * Autogenerated by Thrift Compiler (0.9.3)
  *
  * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
  *  @generated
@@ -51,13 +51,15 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-2-6")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)", date = "2017-04-04")
 public class StormTopology implements org.apache.thrift.TBase<StormTopology, StormTopology._Fields>, java.io.Serializable, Cloneable, Comparable<StormTopology> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("StormTopology");
 
   private static final org.apache.thrift.protocol.TField SPOUTS_FIELD_DESC = new org.apache.thrift.protocol.TField("spouts", org.apache.thrift.protocol.TType.MAP, (short)1);
   private static final org.apache.thrift.protocol.TField BOLTS_FIELD_DESC = new org.apache.thrift.protocol.TField("bolts", org.apache.thrift.protocol.TType.MAP, (short)2);
   private static final org.apache.thrift.protocol.TField STATE_SPOUTS_FIELD_DESC = new org.apache.thrift.protocol.TField("state_spouts", org.apache.thrift.protocol.TType.MAP, (short)3);
+  private static final org.apache.thrift.protocol.TField STORM_VERSION_FIELD_DESC = new org.apache.thrift.protocol.TField("storm_version", org.apache.thrift.protocol.TType.STRING, (short)7);
+  private static final org.apache.thrift.protocol.TField JDK_VERSION_FIELD_DESC = new org.apache.thrift.protocol.TField("jdk_version", org.apache.thrift.protocol.TType.STRING, (short)8);
 
   private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
   static {
@@ -68,12 +70,16 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
   private Map<String,SpoutSpec> spouts; // required
   private Map<String,Bolt> bolts; // required
   private Map<String,StateSpoutSpec> state_spouts; // required
+  private String storm_version; // optional
+  private String jdk_version; // optional
 
   /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
     SPOUTS((short)1, "spouts"),
     BOLTS((short)2, "bolts"),
-    STATE_SPOUTS((short)3, "state_spouts");
+    STATE_SPOUTS((short)3, "state_spouts"),
+    STORM_VERSION((short)7, "storm_version"),
+    JDK_VERSION((short)8, "jdk_version");
 
     private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
 
@@ -94,6 +100,10 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
           return BOLTS;
         case 3: // STATE_SPOUTS
           return STATE_SPOUTS;
+        case 7: // STORM_VERSION
+          return STORM_VERSION;
+        case 8: // JDK_VERSION
+          return JDK_VERSION;
         default:
           return null;
       }
@@ -134,6 +144,7 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
   }
 
   // isset id assignments
+  private static final _Fields optionals[] = {_Fields.STORM_VERSION,_Fields.JDK_VERSION};
   public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
   static {
     Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -149,6 +160,10 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
         new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP, 
             new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING), 
             new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, StateSpoutSpec.class))));
+    tmpMap.put(_Fields.STORM_VERSION, new org.apache.thrift.meta_data.FieldMetaData("storm_version", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.JDK_VERSION, new org.apache.thrift.meta_data.FieldMetaData("jdk_version", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
     metaDataMap = Collections.unmodifiableMap(tmpMap);
     org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(StormTopology.class, metaDataMap);
   }
@@ -216,6 +231,12 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
       }
       this.state_spouts = __this__state_spouts;
     }
+    if (other.is_set_storm_version()) {
+      this.storm_version = other.storm_version;
+    }
+    if (other.is_set_jdk_version()) {
+      this.jdk_version = other.jdk_version;
+    }
   }
 
   public StormTopology deepCopy() {
@@ -227,6 +248,8 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
     this.spouts = null;
     this.bolts = null;
     this.state_spouts = null;
+    this.storm_version = null;
+    this.jdk_version = null;
   }
 
   public int get_spouts_size() {
@@ -331,6 +354,52 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
     }
   }
 
+  public String get_storm_version() {
+    return this.storm_version;
+  }
+
+  public void set_storm_version(String storm_version) {
+    this.storm_version = storm_version;
+  }
+
+  public void unset_storm_version() {
+    this.storm_version = null;
+  }
+
+  /** Returns true if field storm_version is set (has been assigned a value) and false otherwise */
+  public boolean is_set_storm_version() {
+    return this.storm_version != null;
+  }
+
+  public void set_storm_version_isSet(boolean value) {
+    if (!value) {
+      this.storm_version = null;
+    }
+  }
+
+  public String get_jdk_version() {
+    return this.jdk_version;
+  }
+
+  public void set_jdk_version(String jdk_version) {
+    this.jdk_version = jdk_version;
+  }
+
+  public void unset_jdk_version() {
+    this.jdk_version = null;
+  }
+
+  /** Returns true if field jdk_version is set (has been assigned a value) and false otherwise */
+  public boolean is_set_jdk_version() {
+    return this.jdk_version != null;
+  }
+
+  public void set_jdk_version_isSet(boolean value) {
+    if (!value) {
+      this.jdk_version = null;
+    }
+  }
+
   public void setFieldValue(_Fields field, Object value) {
     switch (field) {
     case SPOUTS:
@@ -357,6 +426,22 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
       }
       break;
 
+    case STORM_VERSION:
+      if (value == null) {
+        unset_storm_version();
+      } else {
+        set_storm_version((String)value);
+      }
+      break;
+
+    case JDK_VERSION:
+      if (value == null) {
+        unset_jdk_version();
+      } else {
+        set_jdk_version((String)value);
+      }
+      break;
+
     }
   }
 
@@ -371,6 +456,12 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
     case STATE_SPOUTS:
       return get_state_spouts();
 
+    case STORM_VERSION:
+      return get_storm_version();
+
+    case JDK_VERSION:
+      return get_jdk_version();
+
     }
     throw new IllegalStateException();
   }
@@ -388,6 +479,10 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
       return is_set_bolts();
     case STATE_SPOUTS:
       return is_set_state_spouts();
+    case STORM_VERSION:
+      return is_set_storm_version();
+    case JDK_VERSION:
+      return is_set_jdk_version();
     }
     throw new IllegalStateException();
   }
@@ -432,6 +527,24 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
         return false;
     }
 
+    boolean this_present_storm_version = true && this.is_set_storm_version();
+    boolean that_present_storm_version = true && that.is_set_storm_version();
+    if (this_present_storm_version || that_present_storm_version) {
+      if (!(this_present_storm_version && that_present_storm_version))
+        return false;
+      if (!this.storm_version.equals(that.storm_version))
+        return false;
+    }
+
+    boolean this_present_jdk_version = true && this.is_set_jdk_version();
+    boolean that_present_jdk_version = true && that.is_set_jdk_version();
+    if (this_present_jdk_version || that_present_jdk_version) {
+      if (!(this_present_jdk_version && that_present_jdk_version))
+        return false;
+      if (!this.jdk_version.equals(that.jdk_version))
+        return false;
+    }
+
     return true;
   }
 
@@ -454,6 +567,16 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
     if (present_state_spouts)
       list.add(state_spouts);
 
+    boolean present_storm_version = true && (is_set_storm_version());
+    list.add(present_storm_version);
+    if (present_storm_version)
+      list.add(storm_version);
+
+    boolean present_jdk_version = true && (is_set_jdk_version());
+    list.add(present_jdk_version);
+    if (present_jdk_version)
+      list.add(jdk_version);
+
     return list.hashCode();
   }
 
@@ -495,6 +618,26 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
         return lastComparison;
       }
     }
+    lastComparison = Boolean.valueOf(is_set_storm_version()).compareTo(other.is_set_storm_version());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_storm_version()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.storm_version, other.storm_version);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(is_set_jdk_version()).compareTo(other.is_set_jdk_version());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_jdk_version()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.jdk_version, other.jdk_version);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     return 0;
   }
 
@@ -538,6 +681,26 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
       sb.append(this.state_spouts);
     }
     first = false;
+    if (is_set_storm_version()) {
+      if (!first) sb.append(", ");
+      sb.append("storm_version:");
+      if (this.storm_version == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.storm_version);
+      }
+      first = false;
+    }
+    if (is_set_jdk_version()) {
+      if (!first) sb.append(", ");
+      sb.append("jdk_version:");
+      if (this.jdk_version == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.jdk_version);
+      }
+      first = false;
+    }
     sb.append(")");
     return sb.toString();
   }
@@ -656,6 +819,22 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
             }
             break;
+          case 7: // STORM_VERSION
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.storm_version = iprot.readString();
+              struct.set_storm_version_isSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 8: // JDK_VERSION
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.jdk_version = iprot.readString();
+              struct.set_jdk_version_isSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
           default:
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
         }
@@ -708,6 +887,20 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
         }
         oprot.writeFieldEnd();
       }
+      if (struct.storm_version != null) {
+        if (struct.is_set_storm_version()) {
+          oprot.writeFieldBegin(STORM_VERSION_FIELD_DESC);
+          oprot.writeString(struct.storm_version);
+          oprot.writeFieldEnd();
+        }
+      }
+      if (struct.jdk_version != null) {
+        if (struct.is_set_jdk_version()) {
+          oprot.writeFieldBegin(JDK_VERSION_FIELD_DESC);
+          oprot.writeString(struct.jdk_version);
+          oprot.writeFieldEnd();
+        }
+      }
       oprot.writeFieldStop();
       oprot.writeStructEnd();
     }
@@ -749,6 +942,20 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
           _iter61.getValue().write(oprot);
         }
       }
+      BitSet optionals = new BitSet();
+      if (struct.is_set_storm_version()) {
+        optionals.set(0);
+      }
+      if (struct.is_set_jdk_version()) {
+        optionals.set(1);
+      }
+      oprot.writeBitSet(optionals, 2);
+      if (struct.is_set_storm_version()) {
+        oprot.writeString(struct.storm_version);
+      }
+      if (struct.is_set_jdk_version()) {
+        oprot.writeString(struct.jdk_version);
+      }
     }
 
     @Override
@@ -796,6 +1003,15 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
         }
       }
       struct.set_state_spouts_isSet(true);
+      BitSet incoming = iprot.readBitSet(2);
+      if (incoming.get(0)) {
+        struct.storm_version = iprot.readString();
+        struct.set_storm_version_isSet(true);
+      }
+      if (incoming.get(1)) {
+        struct.jdk_version = iprot.readString();
+        struct.set_jdk_version_isSet(true);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/77985a07/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java b/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java
index 4d5a0bd..9a4409d 100644
--- a/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java
+++ b/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java
@@ -112,9 +112,9 @@ public class TopologyBuilder {
             spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.javaSerialize(spout)), common));
             
         }
-        return new StormTopology(spoutSpecs,
+        return Utils.addVersions(new StormTopology(spoutSpecs,
                                  boltSpecs,
-                                 new HashMap<String, StateSpoutSpec>());
+                                 new HashMap<String, StateSpoutSpec>()));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/77985a07/storm-core/src/jvm/backtype/storm/utils/ThriftTopologyUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/ThriftTopologyUtils.java b/storm-core/src/jvm/backtype/storm/utils/ThriftTopologyUtils.java
index 8306d9b..7b379c5 100644
--- a/storm-core/src/jvm/backtype/storm/utils/ThriftTopologyUtils.java
+++ b/storm-core/src/jvm/backtype/storm/utils/ThriftTopologyUtils.java
@@ -29,30 +29,19 @@ import java.util.Set;
 public class ThriftTopologyUtils {
     public static Set<String> getComponentIds(StormTopology topology) {
         Set<String> ret = new HashSet<String>();
-        for(StormTopology._Fields f: StormTopology.metaDataMap.keySet()) {
-            Map<String, Object> componentMap = (Map<String, Object>) topology.getFieldValue(f);
-            ret.addAll(componentMap.keySet());
-        }
+        ret.addAll(topology.get_bolts().keySet());
+        ret.addAll(topology.get_spouts().keySet());
+        ret.addAll(topology.get_state_spouts().keySet());
         return ret;
     }
 
     public static ComponentCommon getComponentCommon(StormTopology topology, String componentId) {
-        for(StormTopology._Fields f: StormTopology.metaDataMap.keySet()) {
-            Map<String, Object> componentMap = (Map<String, Object>) topology.getFieldValue(f);
-            if(componentMap.containsKey(componentId)) {
-                Object component = componentMap.get(componentId);
-                if(component instanceof Bolt) {
-                    return ((Bolt) component).get_common();
-                }
-                if(component instanceof SpoutSpec) {
-                    return ((SpoutSpec) component).get_common();
-                }
-                if(component instanceof StateSpoutSpec) {
-                    return ((StateSpoutSpec) component).get_common();
-                }
-                throw new RuntimeException("Unreachable code! No get_common conversion for component " + component);
-            }
-        }
+        Bolt b = topology.get_bolts().get(componentId);
+        if (b != null) return b.get_common();
+        SpoutSpec s = topology.get_spouts().get(componentId);
+        if (s != null) return s.get_common();
+        StateSpoutSpec ss = topology.get_state_spouts().get(componentId);
+        if (ss != null) return ss.get_common();
         throw new IllegalArgumentException("Could not find component common for " + componentId);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/77985a07/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java
index 15aae91..3f9b15c 100644
--- a/storm-core/src/jvm/backtype/storm/utils/Utils.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java
@@ -674,5 +674,26 @@ public class Utils {
     public static int toPositive(int number) {
         return number & Integer.MAX_VALUE;
     }
+
+     /**
+      * Add version information to the given topology
+      * @param topology the topology being submitted (MIGHT BE MODIFIED)
+      * @return topology
+      */
+     public static StormTopology addVersions(StormTopology topology) {
+         String stormVersion = VersionInfo.getVersion();
+         LOG.warn("STORM-VERSION new {} old {}", stormVersion, topology.get_storm_version());
+         if (stormVersion != null &&
+                 !"Unknown".equalsIgnoreCase(stormVersion) &&
+                 !topology.is_set_storm_version()) {
+             topology.set_storm_version(stormVersion);
+         }
+
+         String jdkVersion = System.getProperty("java.version");
+         if (jdkVersion != null && !topology.is_set_jdk_version()) {
+             topology.set_jdk_version(jdkVersion);
+         }
+         return topology;
+     }
 }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/77985a07/storm-core/src/py/storm/DistributedRPC-remote
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/DistributedRPC-remote b/storm-core/src/py/storm/DistributedRPC-remote
index 3d06d07..90f894a 100644
--- a/storm-core/src/py/storm/DistributedRPC-remote
+++ b/storm-core/src/py/storm/DistributedRPC-remote
@@ -18,7 +18,7 @@
 
 #!/usr/bin/env python
 #
-# Autogenerated by Thrift Compiler (0.9.2)
+# Autogenerated by Thrift Compiler (0.9.3)
 #
 # DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
 #

http://git-wip-us.apache.org/repos/asf/storm/blob/77985a07/storm-core/src/py/storm/DistributedRPC.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/DistributedRPC.py b/storm-core/src/py/storm/DistributedRPC.py
index 330499c..225c57e 100644
--- a/storm-core/src/py/storm/DistributedRPC.py
+++ b/storm-core/src/py/storm/DistributedRPC.py
@@ -17,7 +17,7 @@
 # limitations under the License.
 
 #
-# Autogenerated by Thrift Compiler (0.9.2)
+# Autogenerated by Thrift Compiler (0.9.3)
 #
 # DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
 #
@@ -25,6 +25,7 @@
 #
 
 from thrift.Thrift import TType, TMessageType, TException, TApplicationException
+import logging
 from ttypes import *
 from thrift.Thrift import TProcessor
 from thrift.transport import TTransport
@@ -87,7 +88,7 @@ class Client(Iface):
       raise result.e
     if result.aze is not None:
       raise result.aze
-    raise TApplicationException(TApplicationException.MISSING_RESULT, "execute failed: unknown result");
+    raise TApplicationException(TApplicationException.MISSING_RESULT, "execute failed: unknown result")
 
 
 class Processor(Iface, TProcessor):
@@ -118,11 +119,20 @@ class Processor(Iface, TProcessor):
     result = execute_result()
     try:
       result.success = self._handler.execute(args.functionName, args.funcArgs)
-    except DRPCExecutionException, e:
+      msg_type = TMessageType.REPLY
+    except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+      raise
+    except DRPCExecutionException as e:
+      msg_type = TMessageType.REPLY
       result.e = e
-    except AuthorizationException, aze:
+    except AuthorizationException as aze:
+      msg_type = TMessageType.REPLY
       result.aze = aze
-    oprot.writeMessageBegin("execute", TMessageType.REPLY, seqid)
+    except Exception as ex:
+      msg_type = TMessageType.EXCEPTION
+      logging.exception(ex)
+      result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+    oprot.writeMessageBegin("execute", msg_type, seqid)
     result.write(oprot)
     oprot.writeMessageEnd()
     oprot.trans.flush()

http://git-wip-us.apache.org/repos/asf/storm/blob/77985a07/storm-core/src/py/storm/DistributedRPCInvocations-remote
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/DistributedRPCInvocations-remote b/storm-core/src/py/storm/DistributedRPCInvocations-remote
index 9dd50cd..01435b6 100644
--- a/storm-core/src/py/storm/DistributedRPCInvocations-remote
+++ b/storm-core/src/py/storm/DistributedRPCInvocations-remote
@@ -18,7 +18,7 @@
 
 #!/usr/bin/env python
 #
-# Autogenerated by Thrift Compiler (0.9.2)
+# Autogenerated by Thrift Compiler (0.9.3)
 #
 # DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
 #

http://git-wip-us.apache.org/repos/asf/storm/blob/77985a07/storm-core/src/py/storm/DistributedRPCInvocations.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/DistributedRPCInvocations.py b/storm-core/src/py/storm/DistributedRPCInvocations.py
index 493fcc7..207fa9d 100644
--- a/storm-core/src/py/storm/DistributedRPCInvocations.py
+++ b/storm-core/src/py/storm/DistributedRPCInvocations.py
@@ -17,7 +17,7 @@
 # limitations under the License.
 
 #
-# Autogenerated by Thrift Compiler (0.9.2)
+# Autogenerated by Thrift Compiler (0.9.3)
 #
 # DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
 #
@@ -25,6 +25,7 @@
 #
 
 from thrift.Thrift import TType, TMessageType, TException, TApplicationException
+import logging
 from ttypes import *
 from thrift.Thrift import TProcessor
 from thrift.transport import TTransport
@@ -130,7 +131,7 @@ class Client(Iface):
       return result.success
     if result.aze is not None:
       raise result.aze
-    raise TApplicationException(TApplicationException.MISSING_RESULT, "fetchRequest failed: unknown result");
+    raise TApplicationException(TApplicationException.MISSING_RESULT, "fetchRequest failed: unknown result")
 
   def failRequest(self, id):
     """
@@ -194,9 +195,17 @@ class Processor(Iface, TProcessor):
     result = result_result()
     try:
       self._handler.result(args.id, args.result)
-    except AuthorizationException, aze:
+      msg_type = TMessageType.REPLY
+    except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+      raise
+    except AuthorizationException as aze:
+      msg_type = TMessageType.REPLY
       result.aze = aze
-    oprot.writeMessageBegin("result", TMessageType.REPLY, seqid)
+    except Exception as ex:
+      msg_type = TMessageType.EXCEPTION
+      logging.exception(ex)
+      result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+    oprot.writeMessageBegin("result", msg_type, seqid)
     result.write(oprot)
     oprot.writeMessageEnd()
     oprot.trans.flush()
@@ -208,9 +217,17 @@ class Processor(Iface, TProcessor):
     result = fetchRequest_result()
     try:
       result.success = self._handler.fetchRequest(args.functionName)
-    except AuthorizationException, aze:
+      msg_type = TMessageType.REPLY
+    except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+      raise
+    except AuthorizationException as aze:
+      msg_type = TMessageType.REPLY
       result.aze = aze
-    oprot.writeMessageBegin("fetchRequest", TMessageType.REPLY, seqid)
+    except Exception as ex:
+      msg_type = TMessageType.EXCEPTION
+      logging.exception(ex)
+      result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+    oprot.writeMessageBegin("fetchRequest", msg_type, seqid)
     result.write(oprot)
     oprot.writeMessageEnd()
     oprot.trans.flush()
@@ -222,9 +239,17 @@ class Processor(Iface, TProcessor):
     result = failRequest_result()
     try:
       self._handler.failRequest(args.id)
-    except AuthorizationException, aze:
+      msg_type = TMessageType.REPLY
+    except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+      raise
+    except AuthorizationException as aze:
+      msg_type = TMessageType.REPLY
       result.aze = aze
-    oprot.writeMessageBegin("failRequest", TMessageType.REPLY, seqid)
+    except Exception as ex:
+      msg_type = TMessageType.EXCEPTION
+      logging.exception(ex)
+      result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+    oprot.writeMessageBegin("failRequest", msg_type, seqid)
     result.write(oprot)
     oprot.writeMessageEnd()
     oprot.trans.flush()

http://git-wip-us.apache.org/repos/asf/storm/blob/77985a07/storm-core/src/py/storm/Nimbus-remote
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/Nimbus-remote b/storm-core/src/py/storm/Nimbus-remote
index 9af5ead..c6c1514 100644
--- a/storm-core/src/py/storm/Nimbus-remote
+++ b/storm-core/src/py/storm/Nimbus-remote
@@ -18,7 +18,7 @@
 
 #!/usr/bin/env python
 #
-# Autogenerated by Thrift Compiler (0.9.2)
+# Autogenerated by Thrift Compiler (0.9.3)
 #
 # DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
 #

http://git-wip-us.apache.org/repos/asf/storm/blob/77985a07/storm-core/src/py/storm/Nimbus.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/Nimbus.py b/storm-core/src/py/storm/Nimbus.py
index e9636c2..95bad54 100644
--- a/storm-core/src/py/storm/Nimbus.py
+++ b/storm-core/src/py/storm/Nimbus.py
@@ -17,7 +17,7 @@
 # limitations under the License.
 
 #
-# Autogenerated by Thrift Compiler (0.9.2)
+# Autogenerated by Thrift Compiler (0.9.3)
 #
 # DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
 #
@@ -25,6 +25,7 @@
 #
 
 from thrift.Thrift import TType, TMessageType, TException, TApplicationException
+import logging
 from ttypes import *
 from thrift.Thrift import TProcessor
 from thrift.transport import TTransport
@@ -506,7 +507,7 @@ class Client(Iface):
       return result.success
     if result.aze is not None:
       raise result.aze
-    raise TApplicationException(TApplicationException.MISSING_RESULT, "beginFileUpload failed: unknown result");
+    raise TApplicationException(TApplicationException.MISSING_RESULT, "beginFileUpload failed: unknown result")
 
   def uploadChunk(self, location, chunk):
     """
@@ -603,7 +604,7 @@ class Client(Iface):
       return result.success
     if result.aze is not None:
       raise result.aze
-    raise TApplicationException(TApplicationException.MISSING_RESULT, "beginFileDownload failed: unknown result");
+    raise TApplicationException(TApplicationException.MISSING_RESULT, "beginFileDownload failed: unknown result")
 
   def downloadChunk(self, id):
     """
@@ -636,7 +637,7 @@ class Client(Iface):
       return result.success
     if result.aze is not None:
       raise result.aze
-    raise TApplicationException(TApplicationException.MISSING_RESULT, "downloadChunk failed: unknown result");
+    raise TApplicationException(TApplicationException.MISSING_RESULT, "downloadChunk failed: unknown result")
 
   def getNimbusConf(self):
     self.send_getNimbusConf()
@@ -664,7 +665,7 @@ class Client(Iface):
       return result.success
     if result.aze is not None:
       raise result.aze
-    raise TApplicationException(TApplicationException.MISSING_RESULT, "getNimbusConf failed: unknown result");
+    raise TApplicationException(TApplicationException.MISSING_RESULT, "getNimbusConf failed: unknown result")
 
   def getClusterInfo(self):
     self.send_getClusterInfo()
@@ -692,7 +693,7 @@ class Client(Iface):
       return result.success
     if result.aze is not None:
       raise result.aze
-    raise TApplicationException(TApplicationException.MISSING_RESULT, "getClusterInfo failed: unknown result");
+    raise TApplicationException(TApplicationException.MISSING_RESULT, "getClusterInfo failed: unknown result")
 
   def getTopologyInfo(self, id):
     """
@@ -727,7 +728,7 @@ class Client(Iface):
       raise result.e
     if result.aze is not None:
       raise result.aze
-    raise TApplicationException(TApplicationException.MISSING_RESULT, "getTopologyInfo failed: unknown result");
+    raise TApplicationException(TApplicationException.MISSING_RESULT, "getTopologyInfo failed: unknown result")
 
   def getTopologyInfoWithOpts(self, id, options):
     """
@@ -764,7 +765,7 @@ class Client(Iface):
       raise result.e
     if result.aze is not None:
       raise result.aze
-    raise TApplicationException(TApplicationException.MISSING_RESULT, "getTopologyInfoWithOpts failed: unknown result");
+    raise TApplicationException(TApplicationException.MISSING_RESULT, "getTopologyInfoWithOpts failed: unknown result")
 
   def getTopologyConf(self, id):
     """
@@ -799,7 +800,7 @@ class Client(Iface):
       raise result.e
     if result.aze is not None:
       raise result.aze
-    raise TApplicationException(TApplicationException.MISSING_RESULT, "getTopologyConf failed: unknown result");
+    raise TApplicationException(TApplicationException.MISSING_RESULT, "getTopologyConf failed: unknown result")
 
   def getTopology(self, id):
     """
@@ -836,7 +837,7 @@ class Client(Iface):
       raise result.e
     if result.aze is not None:
       raise result.aze
-    raise TApplicationException(TApplicationException.MISSING_RESULT, "getTopology failed: unknown result");
+    raise TApplicationException(TApplicationException.MISSING_RESULT, "getTopology failed: unknown result")
 
   def getUserTopology(self, id):
     """
@@ -873,7 +874,7 @@ class Client(Iface):
       raise result.e
     if result.aze is not None:
       raise result.aze
-    raise TApplicationException(TApplicationException.MISSING_RESULT, "getUserTopology failed: unknown result");
+    raise TApplicationException(TApplicationException.MISSING_RESULT, "getUserTopology failed: unknown result")
 
 
 class Processor(Iface, TProcessor):
@@ -923,13 +924,23 @@ class Processor(Iface, TProcessor):
     result = submitTopology_result()
     try:
       self._handler.submitTopology(args.name, args.uploadedJarLocation, args.jsonConf, args.topology)
-    except AlreadyAliveException, e:
+      msg_type = TMessageType.REPLY
+    except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+      raise
+    except AlreadyAliveException as e:
+      msg_type = TMessageType.REPLY
       result.e = e
-    except InvalidTopologyException, ite:
+    except InvalidTopologyException as ite:
+      msg_type = TMessageType.REPLY
       result.ite = ite
-    except AuthorizationException, aze:
+    except AuthorizationException as aze:
+      msg_type = TMessageType.REPLY
       result.aze = aze
-    oprot.writeMessageBegin("submitTopology", TMessageType.REPLY, seqid)
+    except Exception as ex:
+      msg_type = TMessageType.EXCEPTION
+      logging.exception(ex)
+      result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+    oprot.writeMessageBegin("submitTopology", msg_type, seqid)
     result.write(oprot)
     oprot.writeMessageEnd()
     oprot.trans.flush()
@@ -941,13 +952,23 @@ class Processor(Iface, TProcessor):
     result = submitTopologyWithOpts_result()
     try:
       self._handler.submitTopologyWithOpts(args.name, args.uploadedJarLocation, args.jsonConf, args.topology, args.options)
-    except AlreadyAliveException, e:
+      msg_type = TMessageType.REPLY
+    except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+      raise
+    except AlreadyAliveException as e:
+      msg_type = TMessageType.REPLY
       result.e = e
-    except InvalidTopologyException, ite:
+    except InvalidTopologyException as ite:
+      msg_type = TMessageType.REPLY
       result.ite = ite
-    except AuthorizationException, aze:
+    except AuthorizationException as aze:
+      msg_type = TMessageType.REPLY
       result.aze = aze
-    oprot.writeMessageBegin("submitTopologyWithOpts", TMessageType.REPLY, seqid)
+    except Exception as ex:
+      msg_type = TMessageType.EXCEPTION
+      logging.exception(ex)
+      result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+    oprot.writeMessageBegin("submitTopologyWithOpts", msg_type, seqid)
     result.write(oprot)
     oprot.writeMessageEnd()
     oprot.trans.flush()
@@ -959,11 +980,20 @@ class Processor(Iface, TProcessor):
     result = killTopology_result()
     try:
       self._handler.killTopology(args.name)
-    except NotAliveException, e:
+      msg_type = TMessageType.REPLY
+    except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+      raise
+    except NotAliveException as e:
+      msg_type = TMessageType.REPLY
       result.e = e
-    except AuthorizationException, aze:
+    except AuthorizationException as aze:
+      msg_type = TMessageType.REPLY
       result.aze = aze
-    oprot.writeMessageBegin("killTopology", TMessageType.REPLY, seqid)
+    except Exception as ex:
+      msg_type = TMessageType.EXCEPTION
+      logging.exception(ex)
+      result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+    oprot.writeMessageBegin("killTopology", msg_type, seqid)
     result.write(oprot)
     oprot.writeMessageEnd()
     oprot.trans.flush()
@@ -975,11 +1005,20 @@ class Processor(Iface, TProcessor):
     result = killTopologyWithOpts_result()
     try:
       self._handler.killTopologyWithOpts(args.name, args.options)
-    except NotAliveException, e:
+      msg_type = TMessageType.REPLY
+    except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+      raise
+    except NotAliveException as e:
+      msg_type = TMessageType.REPLY
       result.e = e
-    except AuthorizationException, aze:
+    except AuthorizationException as aze:
+      msg_type = TMessageType.REPLY
       result.aze = aze
-    oprot.writeMessageBegin("killTopologyWithOpts", TMessageType.REPLY, seqid)
+    except Exception as ex:
+      msg_type = TMessageType.EXCEPTION
+      logging.exception(ex)
+      result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+    oprot.writeMessageBegin("killTopologyWithOpts", msg_type, seqid)
     result.write(oprot)
     oprot.writeMessageEnd()
     oprot.trans.flush()
@@ -991,11 +1030,20 @@ class Processor(Iface, TProcessor):
     result = activate_result()
     try:
       self._handler.activate(args.name)
-    except NotAliveException, e:
+      msg_type = TMessageType.REPLY
+    except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+      raise
+    except NotAliveException as e:
+      msg_type = TMessageType.REPLY
       result.e = e
-    except AuthorizationException, aze:
+    except AuthorizationException as aze:
+      msg_type = TMessageType.REPLY
       result.aze = aze
-    oprot.writeMessageBegin("activate", TMessageType.REPLY, seqid)
+    except Exception as ex:
+      msg_type = TMessageType.EXCEPTION
+      logging.exception(ex)
+      result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+    oprot.writeMessageBegin("activate", msg_type, seqid)
     result.write(oprot)
     oprot.writeMessageEnd()
     oprot.trans.flush()
@@ -1007,11 +1055,20 @@ class Processor(Iface, TProcessor):
     result = deactivate_result()
     try:
       self._handler.deactivate(args.name)
-    except NotAliveException, e:
+      msg_type = TMessageType.REPLY
+    except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+      raise
+    except NotAliveException as e:
+      msg_type = TMessageType.REPLY
       result.e = e
-    except AuthorizationException, aze:
+    except AuthorizationException as aze:
+      msg_type = TMessageType.REPLY
       result.aze = aze
-    oprot.writeMessageBegin("deactivate", TMessageType.REPLY, seqid)
+    except Exception as ex:
+      msg_type = TMessageType.EXCEPTION
+      logging.exception(ex)
+      result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+    oprot.writeMessageBegin("deactivate", msg_type, seqid)
     result.write(oprot)
     oprot.writeMessageEnd()
     oprot.trans.flush()
@@ -1023,13 +1080,23 @@ class Processor(Iface, TProcessor):
     result = rebalance_result()
     try:
       self._handler.rebalance(args.name, args.options)
-    except NotAliveException, e:
+      msg_type = TMessageType.REPLY
+    except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+      raise
+    except NotAliveException as e:
+      msg_type = TMessageType.REPLY
       result.e = e
-    except InvalidTopologyException, ite:
+    except InvalidTopologyException as ite:
+      msg_type = TMessageType.REPLY
       result.ite = ite
-    except AuthorizationException, aze:
+    except AuthorizationException as aze:
+      msg_type = TMessageType.REPLY
       result.aze = aze
-    oprot.writeMessageBegin("rebalance", TMessageType.REPLY, seqid)
+    except Exception as ex:
+      msg_type = TMessageType.EXCEPTION
+      logging.exception(ex)
+      result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+    oprot.writeMessageBegin("rebalance", msg_type, seqid)
     result.write(oprot)
     oprot.writeMessageEnd()
     oprot.trans.flush()
@@ -1041,13 +1108,23 @@ class Processor(Iface, TProcessor):
     result = uploadNewCredentials_result()
     try:
       self._handler.uploadNewCredentials(args.name, args.creds)
-    except NotAliveException, e:
+      msg_type = TMessageType.REPLY
+    except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+      raise
+    except NotAliveException as e:
+      msg_type = TMessageType.REPLY
       result.e = e
-    except InvalidTopologyException, ite:
+    except InvalidTopologyException as ite:
+      msg_type = TMessageType.REPLY
       result.ite = ite
-    except AuthorizationException, aze:
+    except AuthorizationException as aze:
+      msg_type = TMessageType.REPLY
       result.aze = aze
-    oprot.writeMessageBegin("uploadNewCredentials", TMessageType.REPLY, seqid)
+    except Exception as ex:
+      msg_type = TMessageType.EXCEPTION
+      logging.exception(ex)
+      result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+    oprot.writeMessageBegin("uploadNewCredentials", msg_type, seqid)
     result.write(oprot)
     oprot.writeMessageEnd()
     oprot.trans.flush()
@@ -1059,9 +1136,17 @@ class Processor(Iface, TProcessor):
     result = beginFileUpload_result()
     try:
       result.success = self._handler.beginFileUpload()
-    except AuthorizationException, aze:
+      msg_type = TMessageType.REPLY
+    except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+      raise
+    except AuthorizationException as aze:
+      msg_type = TMessageType.REPLY
       result.aze = aze
-    oprot.writeMessageBegin("beginFileUpload", TMessageType.REPLY, seqid)
+    except Exception as ex:
+      msg_type = TMessageType.EXCEPTION
+      logging.exception(ex)
+      result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+    oprot.writeMessageBegin("beginFileUpload", msg_type, seqid)
     result.write(oprot)
     oprot.writeMessageEnd()
     oprot.trans.flush()
@@ -1073,9 +1158,17 @@ class Processor(Iface, TProcessor):
     result = uploadChunk_result()
     try:
       self._handler.uploadChunk(args.location, args.chunk)
-    except AuthorizationException, aze:
+      msg_type = TMessageType.REPLY
+    except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+      raise
+    except AuthorizationException as aze:
+      msg_type = TMessageType.REPLY
       result.aze = aze
-    oprot.writeMessageBegin("uploadChunk", TMessageType.REPLY, seqid)
+    except Exception as ex:
+      msg_type = TMessageType.EXCEPTION
+      logging.exception(ex)
+      result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+    oprot.writeMessageBegin("uploadChunk", msg_type, seqid)
     result.write(oprot)
     oprot.writeMessageEnd()
     oprot.trans.flush()
@@ -1087,9 +1180,17 @@ class Processor(Iface, TProcessor):
     result = finishFileUpload_result()
     try:
       self._handler.finishFileUpload(args.location)
-    except AuthorizationException, aze:
+      msg_type = TMessageType.REPLY
+    except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+      raise
+    except AuthorizationException as aze:
+      msg_type = TMessageType.REPLY
       result.aze = aze
-    oprot.writeMessageBegin("finishFileUpload", TMessageType.REPLY, seqid)
+    except Exception as ex:
+      msg_type = TMessageType.EXCEPTION
+      logging.exception(ex)
+      result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+    oprot.writeMessageBegin("finishFileUpload", msg_type, seqid)
     result.write(oprot)
     oprot.writeMessageEnd()
     oprot.trans.flush()
@@ -1101,9 +1202,17 @@ class Processor(Iface, TProcessor):
     result = beginFileDownload_result()
     try:
       result.success = self._handler.beginFileDownload(args.file)
-    except AuthorizationException, aze:
+      msg_type = TMessageType.REPLY
+    except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+      raise
+    except AuthorizationException as aze:
+      msg_type = TMessageType.REPLY
       result.aze = aze
-    oprot.writeMessageBegin("beginFileDownload", TMessageType.REPLY, seqid)
+    except Exception as ex:
+      msg_type = TMessageType.EXCEPTION
+      logging.exception(ex)
+      result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+    oprot.writeMessageBegin("beginFileDownload", msg_type, seqid)
     result.write(oprot)
     oprot.writeMessageEnd()
     oprot.trans.flush()
@@ -1115,9 +1224,17 @@ class Processor(Iface, TProcessor):
     result = downloadChunk_result()
     try:
       result.success = self._handler.downloadChunk(args.id)
-    except AuthorizationException, aze:
+      msg_type = TMessageType.REPLY
+    except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+      raise
+    except AuthorizationException as aze:
+      msg_type = TMessageType.REPLY
       result.aze = aze
-    oprot.writeMessageBegin("downloadChunk", TMessageType.REPLY, seqid)
+    except Exception as ex:
+      msg_type = TMessageType.EXCEPTION
+      logging.exception(ex)
+      result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+    oprot.writeMessageBegin("downloadChunk", msg_type, seqid)
     result.write(oprot)
     oprot.writeMessageEnd()
     oprot.trans.flush()
@@ -1129,9 +1246,17 @@ class Processor(Iface, TProcessor):
     result = getNimbusConf_result()
     try:
       result.success = self._handler.getNimbusConf()
-    except AuthorizationException, aze:
+      msg_type = TMessageType.REPLY
+    except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+      raise
+    except AuthorizationException as aze:
+      msg_type = TMessageType.REPLY
       result.aze = aze
-    oprot.writeMessageBegin("getNimbusConf", TMessageType.REPLY, seqid)
+    except Exception as ex:
+      msg_type = TMessageType.EXCEPTION
+      logging.exception(ex)
+      result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+    oprot.writeMessageBegin("getNimbusConf", msg_type, seqid)
     result.write(oprot)
     oprot.writeMessageEnd()
     oprot.trans.flush()
@@ -1143,9 +1268,17 @@ class Processor(Iface, TProcessor):
     result = getClusterInfo_result()
     try:
       result.success = self._handler.getClusterInfo()
-    except AuthorizationException, aze:
+      msg_type = TMessageType.REPLY
+    except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+      raise
+    except AuthorizationException as aze:
+      msg_type = TMessageType.REPLY
       result.aze = aze
-    oprot.writeMessageBegin("getClusterInfo", TMessageType.REPLY, seqid)
+    except Exception as ex:
+      msg_type = TMessageType.EXCEPTION
+      logging.exception(ex)
+      result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+    oprot.writeMessageBegin("getClusterInfo", msg_type, seqid)
     result.write(oprot)
     oprot.writeMessageEnd()
     oprot.trans.flush()
@@ -1157,11 +1290,20 @@ class Processor(Iface, TProcessor):
     result = getTopologyInfo_result()
     try:
       result.success = self._handler.getTopologyInfo(args.id)
-    except NotAliveException, e:
+      msg_type = TMessageType.REPLY
+    except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+      raise
+    except NotAliveException as e:
+      msg_type = TMessageType.REPLY
       result.e = e
-    except AuthorizationException, aze:
+    except AuthorizationException as aze:
+      msg_type = TMessageType.REPLY
       result.aze = aze
-    oprot.writeMessageBegin("getTopologyInfo", TMessageType.REPLY, seqid)
+    except Exception as ex:
+      msg_type = TMessageType.EXCEPTION
+      logging.exception(ex)
+      result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+    oprot.writeMessageBegin("getTopologyInfo", msg_type, seqid)
     result.write(oprot)
     oprot.writeMessageEnd()
     oprot.trans.flush()
@@ -1173,11 +1315,20 @@ class Processor(Iface, TProcessor):
     result = getTopologyInfoWithOpts_result()
     try:
       result.success = self._handler.getTopologyInfoWithOpts(args.id, args.options)
-    except NotAliveException, e:
+      msg_type = TMessageType.REPLY
+    except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+      raise
+    except NotAliveException as e:
+      msg_type = TMessageType.REPLY
       result.e = e
-    except AuthorizationException, aze:
+    except AuthorizationException as aze:
+      msg_type = TMessageType.REPLY
       result.aze = aze
-    oprot.writeMessageBegin("getTopologyInfoWithOpts", TMessageType.REPLY, seqid)
+    except Exception as ex:
+      msg_type = TMessageType.EXCEPTION
+      logging.exception(ex)
+      result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+    oprot.writeMessageBegin("getTopologyInfoWithOpts", msg_type, seqid)
     result.write(oprot)
     oprot.writeMessageEnd()
     oprot.trans.flush()
@@ -1189,11 +1340,20 @@ class Processor(Iface, TProcessor):
     result = getTopologyConf_result()
     try:
       result.success = self._handler.getTopologyConf(args.id)
-    except NotAliveException, e:
+      msg_type = TMessageType.REPLY
+    except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+      raise
+    except NotAliveException as e:
+      msg_type = TMessageType.REPLY
       result.e = e
-    except AuthorizationException, aze:
+    except AuthorizationException as aze:
+      msg_type = TMessageType.REPLY
       result.aze = aze
-    oprot.writeMessageBegin("getTopologyConf", TMessageType.REPLY, seqid)
+    except Exception as ex:
+      msg_type = TMessageType.EXCEPTION
+      logging.exception(ex)
+      result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+    oprot.writeMessageBegin("getTopologyConf", msg_type, seqid)
     result.write(oprot)
     oprot.writeMessageEnd()
     oprot.trans.flush()
@@ -1205,11 +1365,20 @@ class Processor(Iface, TProcessor):
     result = getTopology_result()
     try:
       result.success = self._handler.getTopology(args.id)
-    except NotAliveException, e:
+      msg_type = TMessageType.REPLY
+    except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+      raise
+    except NotAliveException as e:
+      msg_type = TMessageType.REPLY
       result.e = e
-    except AuthorizationException, aze:
+    except AuthorizationException as aze:
+      msg_type = TMessageType.REPLY
       result.aze = aze
-    oprot.writeMessageBegin("getTopology", TMessageType.REPLY, seqid)
+    except Exception as ex:
+      msg_type = TMessageType.EXCEPTION
+      logging.exception(ex)
+      result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+    oprot.writeMessageBegin("getTopology", msg_type, seqid)
     result.write(oprot)
     oprot.writeMessageEnd()
     oprot.trans.flush()
@@ -1221,11 +1390,20 @@ class Processor(Iface, TProcessor):
     result = getUserTopology_result()
     try:
       result.success = self._handler.getUserTopology(args.id)
-    except NotAliveException, e:
+      msg_type = TMessageType.REPLY
+    except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+      raise
+    except NotAliveException as e:
+      msg_type = TMessageType.REPLY
       result.e = e
-    except AuthorizationException, aze:
+    except AuthorizationException as aze:
+      msg_type = TMessageType.REPLY
       result.aze = aze
-    oprot.writeMessageBegin("getUserTopology", TMessageType.REPLY, seqid)
+    except Exception as ex:
+      msg_type = TMessageType.EXCEPTION
+      logging.exception(ex)
+      result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+    oprot.writeMessageBegin("getUserTopology", msg_type, seqid)
     result.write(oprot)
     oprot.writeMessageEnd()
     oprot.trans.flush()
@@ -2742,7 +2920,7 @@ class uploadChunk_args:
           iprot.skip(ftype)
       elif fid == 2:
         if ftype == TType.STRING:
-          self.chunk = iprot.readString();
+          self.chunk = iprot.readString()
         else:
           iprot.skip(ftype)
       else:
@@ -3219,7 +3397,7 @@ class downloadChunk_result:
         break
       if fid == 0:
         if ftype == TType.STRING:
-          self.success = iprot.readString();
+          self.success = iprot.readString()
         else:
           iprot.skip(ftype)
       elif fid == 1:

http://git-wip-us.apache.org/repos/asf/storm/blob/77985a07/storm-core/src/py/storm/constants.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/constants.py b/storm-core/src/py/storm/constants.py
index 3f0c64a..b403f97 100644
--- a/storm-core/src/py/storm/constants.py
+++ b/storm-core/src/py/storm/constants.py
@@ -17,7 +17,7 @@
 # limitations under the License.
 
 #
-# Autogenerated by Thrift Compiler (0.9.2)
+# Autogenerated by Thrift Compiler (0.9.3)
 #
 # DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
 #

http://git-wip-us.apache.org/repos/asf/storm/blob/77985a07/storm-core/src/py/storm/ttypes.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/ttypes.py b/storm-core/src/py/storm/ttypes.py
index a06af92..0661254 100644
--- a/storm-core/src/py/storm/ttypes.py
+++ b/storm-core/src/py/storm/ttypes.py
@@ -17,7 +17,7 @@
 # limitations under the License.
 
 #
-# Autogenerated by Thrift Compiler (0.9.2)
+# Autogenerated by Thrift Compiler (0.9.3)
 #
 # DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
 #
@@ -126,12 +126,12 @@ class JavaObjectArg:
         break
       if fid == 1:
         if ftype == TType.I32:
-          self.int_arg = iprot.readI32();
+          self.int_arg = iprot.readI32()
         else:
           iprot.skip(ftype)
       elif fid == 2:
         if ftype == TType.I64:
-          self.long_arg = iprot.readI64();
+          self.long_arg = iprot.readI64()
         else:
           iprot.skip(ftype)
       elif fid == 3:
@@ -141,17 +141,17 @@ class JavaObjectArg:
           iprot.skip(ftype)
       elif fid == 4:
         if ftype == TType.BOOL:
-          self.bool_arg = iprot.readBool();
+          self.bool_arg = iprot.readBool()
         else:
           iprot.skip(ftype)
       elif fid == 5:
         if ftype == TType.STRING:
-          self.binary_arg = iprot.readString();
+          self.binary_arg = iprot.readString()
         else:
           iprot.skip(ftype)
       elif fid == 6:
         if ftype == TType.DOUBLE:
-          self.double_arg = iprot.readDouble();
+          self.double_arg = iprot.readDouble()
         else:
           iprot.skip(ftype)
       else:
@@ -521,7 +521,7 @@ class Grouping:
           iprot.skip(ftype)
       elif fid == 7:
         if ftype == TType.STRING:
-          self.custom_serialized = iprot.readString();
+          self.custom_serialized = iprot.readString()
         else:
           iprot.skip(ftype)
       elif fid == 8:
@@ -643,7 +643,7 @@ class StreamInfo:
           iprot.skip(ftype)
       elif fid == 2:
         if ftype == TType.BOOL:
-          self.direct = iprot.readBool();
+          self.direct = iprot.readBool()
         else:
           iprot.skip(ftype)
       else:
@@ -804,7 +804,7 @@ class ComponentObject:
         break
       if fid == 1:
         if ftype == TType.STRING:
-          self.serialized_java = iprot.readString();
+          self.serialized_java = iprot.readString()
         else:
           iprot.skip(ftype)
       elif fid == 2:
@@ -925,7 +925,7 @@ class ComponentCommon:
           iprot.skip(ftype)
       elif fid == 3:
         if ftype == TType.I32:
-          self.parallelism_hint = iprot.readI32();
+          self.parallelism_hint = iprot.readI32()
         else:
           iprot.skip(ftype)
       elif fid == 4:
@@ -1255,6 +1255,8 @@ class StormTopology:
    - spouts
    - bolts
    - state_spouts
+   - storm_version
+   - jdk_version
   """
 
   thrift_spec = (
@@ -1262,12 +1264,19 @@ class StormTopology:
     (1, TType.MAP, 'spouts', (TType.STRING,None,TType.STRUCT,(SpoutSpec, SpoutSpec.thrift_spec)), None, ), # 1
     (2, TType.MAP, 'bolts', (TType.STRING,None,TType.STRUCT,(Bolt, Bolt.thrift_spec)), None, ), # 2
     (3, TType.MAP, 'state_spouts', (TType.STRING,None,TType.STRUCT,(StateSpoutSpec, StateSpoutSpec.thrift_spec)), None, ), # 3
+    None, # 4
+    None, # 5
+    None, # 6
+    (7, TType.STRING, 'storm_version', None, None, ), # 7
+    (8, TType.STRING, 'jdk_version', None, None, ), # 8
   )
 
-  def __init__(self, spouts=None, bolts=None, state_spouts=None,):
+  def __init__(self, spouts=None, bolts=None, state_spouts=None, storm_version=None, jdk_version=None,):
     self.spouts = spouts
     self.bolts = bolts
     self.state_spouts = state_spouts
+    self.storm_version = storm_version
+    self.jdk_version = jdk_version
 
   def read(self, iprot):
     if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -1314,6 +1323,16 @@ class StormTopology:
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
+      elif fid == 7:
+        if ftype == TType.STRING:
+          self.storm_version = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
+      elif fid == 8:
+        if ftype == TType.STRING:
+          self.jdk_version = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
       else:
         iprot.skip(ftype)
       iprot.readFieldEnd()
@@ -1348,6 +1367,14 @@ class StormTopology:
         viter65.write(oprot)
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
+    if self.storm_version is not None:
+      oprot.writeFieldBegin('storm_version', TType.STRING, 7)
+      oprot.writeString(self.storm_version.encode('utf-8'))
+      oprot.writeFieldEnd()
+    if self.jdk_version is not None:
+      oprot.writeFieldBegin('jdk_version', TType.STRING, 8)
+      oprot.writeString(self.jdk_version.encode('utf-8'))
+      oprot.writeFieldEnd()
     oprot.writeFieldStop()
     oprot.writeStructEnd()
 
@@ -1366,6 +1393,8 @@ class StormTopology:
     value = (value * 31) ^ hash(self.spouts)
     value = (value * 31) ^ hash(self.bolts)
     value = (value * 31) ^ hash(self.state_spouts)
+    value = (value * 31) ^ hash(self.storm_version)
+    value = (value * 31) ^ hash(self.jdk_version)
     return value
 
   def __repr__(self):
@@ -2223,22 +2252,22 @@ class TopologySummary:
           iprot.skip(ftype)
       elif fid == 3:
         if ftype == TType.I32:
-          self.num_tasks = iprot.readI32();
+          self.num_tasks = iprot.readI32()
         else:
           iprot.skip(ftype)
       elif fid == 4:
         if ftype == TType.I32:
-          self.num_executors = iprot.readI32();
+          self.num_executors = iprot.readI32()
         else:
           iprot.skip(ftype)
       elif fid == 5:
         if ftype == TType.I32:
-          self.num_workers = iprot.readI32();
+          self.num_workers = iprot.readI32()
         else:
           iprot.skip(ftype)
       elif fid == 6:
         if ftype == TType.I32:
-          self.uptime_secs = iprot.readI32();
+          self.uptime_secs = iprot.readI32()
         else:
           iprot.skip(ftype)
       elif fid == 7:
@@ -2365,10 +2394,10 @@ class SupervisorSummary:
     (3, TType.I32, 'num_workers', None, None, ), # 3
     (4, TType.I32, 'num_used_workers', None, None, ), # 4
     (5, TType.STRING, 'supervisor_id', None, None, ), # 5
-    (6, TType.STRING, 'version', None, None, ), # 6
+    (6, TType.STRING, 'version', None, "VERSION_NOT_PROVIDED", ), # 6
   )
 
-  def __init__(self, host=None, uptime_secs=None, num_workers=None, num_used_workers=None, supervisor_id=None, version=None,):
+  def __init__(self, host=None, uptime_secs=None, num_workers=None, num_used_workers=None, supervisor_id=None, version=thrift_spec[6][4],):
     self.host = host
     self.uptime_secs = uptime_secs
     self.num_workers = num_workers
@@ -2392,17 +2421,17 @@ class SupervisorSummary:
           iprot.skip(ftype)
       elif fid == 2:
         if ftype == TType.I32:
-          self.uptime_secs = iprot.readI32();
+          self.uptime_secs = iprot.readI32()
         else:
           iprot.skip(ftype)
       elif fid == 3:
         if ftype == TType.I32:
-          self.num_workers = iprot.readI32();
+          self.num_workers = iprot.readI32()
         else:
           iprot.skip(ftype)
       elif fid == 4:
         if ftype == TType.I32:
-          self.num_used_workers = iprot.readI32();
+          self.num_used_workers = iprot.readI32()
         else:
           iprot.skip(ftype)
       elif fid == 5:
@@ -2463,8 +2492,6 @@ class SupervisorSummary:
       raise TProtocol.TProtocolException(message='Required field num_used_workers is unset!')
     if self.supervisor_id is None:
       raise TProtocol.TProtocolException(message='Required field supervisor_id is unset!')
-    if self.version is None:
-      raise TProtocol.TProtocolException(message='Required field version is unset!')
     return
 
 
@@ -2531,7 +2558,7 @@ class ClusterSummary:
           iprot.skip(ftype)
       elif fid == 2:
         if ftype == TType.I32:
-          self.nimbus_uptime_secs = iprot.readI32();
+          self.nimbus_uptime_secs = iprot.readI32()
         else:
           iprot.skip(ftype)
       elif fid == 3:
@@ -2643,7 +2670,7 @@ class ErrorInfo:
           iprot.skip(ftype)
       elif fid == 2:
         if ftype == TType.I32:
-          self.error_time_secs = iprot.readI32();
+          self.error_time_secs = iprot.readI32()
         else:
           iprot.skip(ftype)
       elif fid == 3:
@@ -2653,7 +2680,7 @@ class ErrorInfo:
           iprot.skip(ftype)
       elif fid == 4:
         if ftype == TType.I32:
-          self.port = iprot.readI32();
+          self.port = iprot.readI32()
         else:
           iprot.skip(ftype)
       else:
@@ -2758,7 +2785,7 @@ class BoltStats:
             for _i91 in xrange(_size87):
               _key92 = GlobalStreamId()
               _key92.read(iprot)
-              _val93 = iprot.readI64();
+              _val93 = iprot.readI64()
               _val86[_key92] = _val93
             iprot.readMapEnd()
             self.acked[_key85] = _val86
@@ -2776,7 +2803,7 @@ class BoltStats:
             for _i105 in xrange(_size101):
               _key106 = GlobalStreamId()
               _key106.read(iprot)
-              _val107 = iprot.readI64();
+              _val107 = iprot.readI64()
               _val100[_key106] = _val107
             iprot.readMapEnd()
             self.failed[_key99] = _val100
@@ -2794,7 +2821,7 @@ class BoltStats:
             for _i119 in xrange(_size115):
               _key120 = GlobalStreamId()
               _key120.read(iprot)
-              _val121 = iprot.readDouble();
+              _val121 = iprot.readDouble()
               _val114[_key120] = _val121
             iprot.readMapEnd()
             self.process_ms_avg[_key113] = _val114
@@ -2812,7 +2839,7 @@ class BoltStats:
             for _i133 in xrange(_size129):
               _key134 = GlobalStreamId()
               _key134.read(iprot)
-              _val135 = iprot.readI64();
+              _val135 = iprot.readI64()
               _val128[_key134] = _val135
             iprot.readMapEnd()
             self.executed[_key127] = _val128
@@ -2830,7 +2857,7 @@ class BoltStats:
             for _i147 in xrange(_size143):
               _key148 = GlobalStreamId()
               _key148.read(iprot)
-              _val149 = iprot.readDouble();
+              _val149 = iprot.readDouble()
               _val142[_key148] = _val149
             iprot.readMapEnd()
             self.execute_ms_avg[_key141] = _val142
@@ -2983,7 +3010,7 @@ class SpoutStats:
             (_ktype178, _vtype179, _size177 ) = iprot.readMapBegin()
             for _i181 in xrange(_size177):
               _key182 = iprot.readString().decode('utf-8')
-              _val183 = iprot.readI64();
+              _val183 = iprot.readI64()
               _val176[_key182] = _val183
             iprot.readMapEnd()
             self.acked[_key175] = _val176
@@ -3000,7 +3027,7 @@ class SpoutStats:
             (_ktype192, _vtype193, _size191 ) = iprot.readMapBegin()
             for _i195 in xrange(_size191):
               _key196 = iprot.readString().decode('utf-8')
-              _val197 = iprot.readI64();
+              _val197 = iprot.readI64()
               _val190[_key196] = _val197
             iprot.readMapEnd()
             self.failed[_key189] = _val190
@@ -3017,7 +3044,7 @@ class SpoutStats:
             (_ktype206, _vtype207, _size205 ) = iprot.readMapBegin()
             for _i209 in xrange(_size205):
               _key210 = iprot.readString().decode('utf-8')
-              _val211 = iprot.readDouble();
+              _val211 = iprot.readDouble()
               _val204[_key210] = _val211
             iprot.readMapEnd()
             self.complete_ms_avg[_key203] = _val204
@@ -3223,7 +3250,7 @@ class ExecutorStats:
             (_ktype232, _vtype233, _size231 ) = iprot.readMapBegin()
             for _i235 in xrange(_size231):
               _key236 = iprot.readString().decode('utf-8')
-              _val237 = iprot.readI64();
+              _val237 = iprot.readI64()
               _val230[_key236] = _val237
             iprot.readMapEnd()
             self.emitted[_key229] = _val230
@@ -3240,7 +3267,7 @@ class ExecutorStats:
             (_ktype246, _vtype247, _size245 ) = iprot.readMapBegin()
             for _i249 in xrange(_size245):
               _key250 = iprot.readString().decode('utf-8')
-              _val251 = iprot.readI64();
+              _val251 = iprot.readI64()
               _val244[_key250] = _val251
             iprot.readMapEnd()
             self.transferred[_key243] = _val244
@@ -3255,7 +3282,7 @@ class ExecutorStats:
           iprot.skip(ftype)
       elif fid == 4:
         if ftype == TType.DOUBLE:
-          self.rate = iprot.readDouble();
+          self.rate = iprot.readDouble()
         else:
           iprot.skip(ftype)
       else:
@@ -3362,12 +3389,12 @@ class ExecutorInfo:
         break
       if fid == 1:
         if ftype == TType.I32:
-          self.task_start = iprot.readI32();
+          self.task_start = iprot.readI32()
         else:
           iprot.skip(ftype)
       elif fid == 2:
         if ftype == TType.I32:
-          self.task_end = iprot.readI32();
+          self.task_end = iprot.readI32()
         else:
           iprot.skip(ftype)
       else:
@@ -3473,12 +3500,12 @@ class ExecutorSummary:
           iprot.skip(ftype)
       elif fid == 4:
         if ftype == TType.I32:
-          self.port = iprot.readI32();
+          self.port = iprot.readI32()
         else:
           iprot.skip(ftype)
       elif fid == 5:
         if ftype == TType.I32:
-          self.uptime_secs = iprot.readI32();
+          self.uptime_secs = iprot.readI32()
         else:
           iprot.skip(ftype)
       elif fid == 7:
@@ -4121,7 +4148,7 @@ class TopologyInfo:
           iprot.skip(ftype)
       elif fid == 3:
         if ftype == TType.I32:
-          self.uptime_secs = iprot.readI32();
+          self.uptime_secs = iprot.readI32()
         else:
           iprot.skip(ftype)
       elif fid == 4:
@@ -4286,7 +4313,7 @@ class KillOptions:
         break
       if fid == 1:
         if ftype == TType.I32:
-          self.wait_secs = iprot.readI32();
+          self.wait_secs = iprot.readI32()
         else:
           iprot.skip(ftype)
       else:
@@ -4357,12 +4384,12 @@ class RebalanceOptions:
         break
       if fid == 1:
         if ftype == TType.I32:
-          self.wait_secs = iprot.readI32();
+          self.wait_secs = iprot.readI32()
         else:
           iprot.skip(ftype)
       elif fid == 2:
         if ftype == TType.I32:
-          self.num_workers = iprot.readI32();
+          self.num_workers = iprot.readI32()
         else:
           iprot.skip(ftype)
       elif fid == 3:
@@ -4371,7 +4398,7 @@ class RebalanceOptions:
           (_ktype284, _vtype285, _size283 ) = iprot.readMapBegin()
           for _i287 in xrange(_size283):
             _key288 = iprot.readString().decode('utf-8')
-            _val289 = iprot.readI32();
+            _val289 = iprot.readI32()
             self.num_executors[_key288] = _val289
           iprot.readMapEnd()
         else:
@@ -4532,7 +4559,7 @@ class SubmitOptions:
         break
       if fid == 1:
         if ftype == TType.I32:
-          self.initial_status = iprot.readI32();
+          self.initial_status = iprot.readI32()
         else:
           iprot.skip(ftype)
       elif fid == 2:
@@ -4631,7 +4658,7 @@ class SupervisorInfo:
         break
       if fid == 1:
         if ftype == TType.I64:
-          self.time_secs = iprot.readI64();
+          self.time_secs = iprot.readI64()
         else:
           iprot.skip(ftype)
       elif fid == 2:
@@ -4649,7 +4676,7 @@ class SupervisorInfo:
           self.used_ports = []
           (_etype304, _size301) = iprot.readListBegin()
           for _i305 in xrange(_size301):
-            _elem306 = iprot.readI64();
+            _elem306 = iprot.readI64()
             self.used_ports.append(_elem306)
           iprot.readListEnd()
         else:
@@ -4659,7 +4686,7 @@ class SupervisorInfo:
           self.meta = []
           (_etype310, _size307) = iprot.readListBegin()
           for _i311 in xrange(_size307):
-            _elem312 = iprot.readI64();
+            _elem312 = iprot.readI64()
             self.meta.append(_elem312)
           iprot.readListEnd()
         else:
@@ -4677,7 +4704,7 @@ class SupervisorInfo:
           iprot.skip(ftype)
       elif fid == 7:
         if ftype == TType.I64:
-          self.uptime_secs = iprot.readI64();
+          self.uptime_secs = iprot.readI64()
         else:
           iprot.skip(ftype)
       elif fid == 8:
@@ -4807,7 +4834,7 @@ class NodeInfo:
           self.port = set()
           (_etype327, _size324) = iprot.readSetBegin()
           for _i328 in xrange(_size324):
-            _elem329 = iprot.readI64();
+            _elem329 = iprot.readI64()
             self.port.add(_elem329)
           iprot.readSetEnd()
         else:
@@ -4929,7 +4956,7 @@ class Assignment:
             _key343 = []
             (_etype348, _size345) = iprot.readListBegin()
             for _i349 in xrange(_size345):
-              _elem350 = iprot.readI64();
+              _elem350 = iprot.readI64()
               _key343.append(_elem350)
             iprot.readListEnd()
             _val344 = NodeInfo()
@@ -4946,10 +4973,10 @@ class Assignment:
             _key356 = []
             (_etype361, _size358) = iprot.readListBegin()
             for _i362 in xrange(_size358):
-              _elem363 = iprot.readI64();
+              _elem363 = iprot.readI64()
               _key356.append(_elem363)
             iprot.readListEnd()
-            _val357 = iprot.readI64();
+            _val357 = iprot.readI64()
             self.executor_start_time_secs[_key356] = _val357
           iprot.readMapEnd()
         else:
@@ -5157,12 +5184,12 @@ class StormBase:
           iprot.skip(ftype)
       elif fid == 2:
         if ftype == TType.I32:
-          self.status = iprot.readI32();
+          self.status = iprot.readI32()
         else:
           iprot.skip(ftype)
       elif fid == 3:
         if ftype == TType.I32:
-          self.num_workers = iprot.readI32();
+          self.num_workers = iprot.readI32()
         else:
           iprot.skip(ftype)
       elif fid == 4:
@@ -5171,14 +5198,14 @@ class StormBase:
           (_ktype373, _vtype374, _size372 ) = iprot.readMapBegin()
           for _i376 in xrange(_size372):
             _key377 = iprot.readString().decode('utf-8')
-            _val378 = iprot.readI32();
+            _val378 = iprot.readI32()
             self.component_executors[_key377] = _val378
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
       elif fid == 5:
         if ftype == TType.I32:
-          self.launch_time_secs = iprot.readI32();
+          self.launch_time_secs = iprot.readI32()
         else:
           iprot.skip(ftype)
       elif fid == 6:
@@ -5194,7 +5221,7 @@ class StormBase:
           iprot.skip(ftype)
       elif fid == 8:
         if ftype == TType.I32:
-          self.prev_status = iprot.readI32();
+          self.prev_status = iprot.readI32()
         else:
           iprot.skip(ftype)
       else:
@@ -5331,12 +5358,12 @@ class ClusterWorkerHeartbeat:
           iprot.skip(ftype)
       elif fid == 3:
         if ftype == TType.I32:
-          self.time_secs = iprot.readI32();
+          self.time_secs = iprot.readI32()
         else:
           iprot.skip(ftype)
       elif fid == 4:
         if ftype == TType.I32:
-          self.uptime_secs = iprot.readI32();
+          self.uptime_secs = iprot.readI32()
         else:
           iprot.skip(ftype)
       else:
@@ -5436,7 +5463,7 @@ class ThriftSerializedObject:
           iprot.skip(ftype)
       elif fid == 2:
         if ftype == TType.STRING:
-          self.bits = iprot.readString();
+          self.bits = iprot.readString()
         else:
           iprot.skip(ftype)
       else:
@@ -5750,7 +5777,7 @@ class LSApprovedWorkers:
           (_ktype407, _vtype408, _size406 ) = iprot.readMapBegin()
           for _i410 in xrange(_size406):
             _key411 = iprot.readString().decode('utf-8')
-            _val412 = iprot.readI32();
+            _val412 = iprot.readI32()
             self.approved_workers[_key411] = _val412
           iprot.readMapEnd()
         else:
@@ -5826,7 +5853,7 @@ class LSSupervisorAssignments:
           self.assignments = {}
           (_ktype416, _vtype417, _size415 ) = iprot.readMapBegin()
           for _i419 in xrange(_size415):
-            _key420 = iprot.readI32();
+            _key420 = iprot.readI32()
             _val421 = LocalAssignment()
             _val421.read(iprot)
             self.assignments[_key420] = _val421
@@ -5910,7 +5937,7 @@ class LSWorkerHeartbeat:
         break
       if fid == 1:
         if ftype == TType.I32:
-          self.time_secs = iprot.readI32();
+          self.time_secs = iprot.readI32()
         else:
           iprot.skip(ftype)
       elif fid == 2:
@@ -5931,7 +5958,7 @@ class LSWorkerHeartbeat:
           iprot.skip(ftype)
       elif fid == 4:
         if ftype == TType.I32:
-          self.port = iprot.readI32();
+          self.port = iprot.readI32()
         else:
           iprot.skip(ftype)
       else:
@@ -6022,7 +6049,7 @@ class GetInfoOptions:
         break
       if fid == 1:
         if ftype == TType.I32:
-          self.num_err_choice = iprot.readI32();
+          self.num_err_choice = iprot.readI32()
         else:
           iprot.skip(ftype)
       else:

http://git-wip-us.apache.org/repos/asf/storm/blob/77985a07/storm-core/src/storm.thrift
----------------------------------------------------------------------
diff --git a/storm-core/src/storm.thrift b/storm-core/src/storm.thrift
index a4b0b2a..4f81635 100644
--- a/storm-core/src/storm.thrift
+++ b/storm-core/src/storm.thrift
@@ -117,6 +117,11 @@ struct StormTopology {
   1: required map<string, SpoutSpec> spouts;
   2: required map<string, Bolt> bolts;
   3: required map<string, StateSpoutSpec> state_spouts;
+  #reserved 4: optional list<binary> worker_hooks;
+  #reserved 5: optional list<string> dependency_jars;
+  #reserved 6: optional list<string> dependency_artifacts;
+  7: optional string storm_version;
+  8: optional string jdk_version;
 }
 
 exception AlreadyAliveException {


[2/3] storm git commit: Merge branch 'STORM-2448-0.10.x' of https://github.com/revans2/incubator-storm into STORM-2448-0.10.x

Posted by bo...@apache.org.
Merge branch 'STORM-2448-0.10.x' of https://github.com/revans2/incubator-storm into STORM-2448-0.10.x

STORM-2448: Report storm and jdk versions to nimbus on topology
submission


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b2f3f38f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b2f3f38f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b2f3f38f

Branch: refs/heads/0.10.x-branch
Commit: b2f3f38fcef87af01a339255c1e14bd2424734bf
Parents: c56b7cf 77985a0
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Tue May 23 10:03:13 2017 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Tue May 23 10:03:13 2017 -0500

----------------------------------------------------------------------
 .../src/clj/backtype/storm/daemon/common.clj    |  11 +-
 .../src/clj/backtype/storm/daemon/nimbus.clj    |   4 +-
 storm-core/src/clj/backtype/storm/testing.clj   |   4 +-
 .../src/jvm/backtype/storm/StormSubmitter.java  |   1 +
 .../backtype/storm/generated/StormTopology.java | 222 ++++++++++++-
 .../storm/topology/TopologyBuilder.java         |   4 +-
 .../storm/utils/ThriftTopologyUtils.java        |  29 +-
 .../src/jvm/backtype/storm/utils/Utils.java     |  21 ++
 storm-core/src/py/storm/DistributedRPC-remote   |   2 +-
 storm-core/src/py/storm/DistributedRPC.py       |  20 +-
 .../py/storm/DistributedRPCInvocations-remote   |   2 +-
 .../src/py/storm/DistributedRPCInvocations.py   |  41 ++-
 storm-core/src/py/storm/Nimbus-remote           |   2 +-
 storm-core/src/py/storm/Nimbus.py               | 318 +++++++++++++++----
 storm-core/src/py/storm/constants.py            |   2 +-
 storm-core/src/py/storm/ttypes.py               | 161 ++++++----
 storm-core/src/storm.thrift                     |   5 +
 17 files changed, 661 insertions(+), 188 deletions(-)
----------------------------------------------------------------------



[3/3] storm git commit: Added STORM-2448 to Changelog

Posted by bo...@apache.org.
Added STORM-2448 to Changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ecba328c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ecba328c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ecba328c

Branch: refs/heads/0.10.x-branch
Commit: ecba328c8729d54372077c2f0b537aab6ee1cf91
Parents: b2f3f38
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Tue May 23 10:03:44 2017 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Tue May 23 10:03:44 2017 -0500

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ecba328c/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 211922f..9c8a182 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.10.3
+ * STORM-2448: Report storm and jdk versions to nimbus on topology submission
  * STORM-2486: Prevent cd from printing target directory to avoid breaking classpath
  * STORM-1114: Race condition in trident zookeeper zk-node create/delete
  * STORM-2158: Fix OutOfMemoryError in Nimbus' SimpleTransportPlugin