You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/05/22 21:49:59 UTC

[1/4] storm git commit: STORM-2448: Add in Storm and JDK versions when submitting a topology

Repository: storm
Updated Branches:
  refs/heads/master d7fb0fbc4 -> 9e31509d4


http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-client/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
index 498edcb..45bd123 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
@@ -18,37 +18,6 @@
 
 package org.apache.storm.utils;
 
-import org.apache.storm.Config;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.input.ClassLoaderObjectInputStream;
-import org.apache.storm.blobstore.ClientBlobStore;
-import org.apache.storm.generated.ClusterSummary;
-import org.apache.storm.generated.ComponentCommon;
-import org.apache.storm.generated.ComponentObject;
-import org.apache.storm.generated.GlobalStreamId;
-import org.apache.storm.generated.InvalidTopologyException;
-import org.apache.storm.generated.Nimbus;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.generated.TopologyInfo;
-import org.apache.storm.generated.TopologySummary;
-import org.apache.storm.serialization.DefaultSerializationDelegate;
-import org.apache.storm.serialization.SerializationDelegate;
-import org.apache.thrift.TBase;
-import org.apache.thrift.TDeserializer;
-import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Id;
-import org.json.simple.JSONValue;
-import org.json.simple.parser.ParseException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.yaml.snakeyaml.Yaml;
-import org.yaml.snakeyaml.constructor.SafeConstructor;
-
 import java.io.BufferedReader;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -74,6 +43,7 @@ import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Enumeration;
@@ -82,15 +52,50 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.Callable;
+import java.util.jar.JarEntry;
+import java.util.jar.JarFile;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.input.ClassLoaderObjectInputStream;
+import org.apache.storm.Config;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.generated.ClusterSummary;
+import org.apache.storm.generated.ComponentCommon;
+import org.apache.storm.generated.ComponentObject;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.generated.Nimbus;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.TopologyInfo;
+import org.apache.storm.generated.TopologySummary;
+import org.apache.storm.serialization.DefaultSerializationDelegate;
+import org.apache.storm.serialization.SerializationDelegate;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
+import org.json.simple.JSONValue;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+import org.yaml.snakeyaml.constructor.SafeConstructor;
+
+import com.google.common.annotations.VisibleForTesting;
+
 public class Utils {
     public static final Logger LOG = LoggerFactory.getLogger(Utils.class);
     public static final String DEFAULT_STREAM_ID = "default";
@@ -1267,4 +1272,147 @@ public class Utils {
             return Time.deltaSecs(startTime);
         }
     }
+
+    /**
+     * Add version information to the given topology
+     * @param topology the topology being submitted (MIGHT BE MODIFIED)
+     * @return topology
+     */
+    public static StormTopology addVersions(StormTopology topology) {
+        String stormVersion = VersionInfo.getVersion();
+        if (stormVersion != null && 
+                !"Unknown".equalsIgnoreCase(stormVersion) && 
+                !topology.is_set_storm_version()) {
+            topology.set_storm_version(stormVersion);
+        }
+        
+        String jdkVersion = System.getProperty("java.version");
+        if (jdkVersion != null && !topology.is_set_jdk_version()) {
+            topology.set_jdk_version(jdkVersion);
+        }
+        return topology;
+    }
+    
+    /**
+     * Get a map of version to classpath from the conf Config.SUPERVISOR_WORKER_VERSION_CLASSPATH_MAP
+     * @param conf what to read it out of
+     * @param currentCP the current classpath for this version of storm (not included in the conf, but returned by this)
+     * @return the map
+     */
+    public static NavigableMap<SimpleVersion, List<String>> getConfiguredClasspathVersions(Map<String, Object> conf, List<String> currentCP) {
+        TreeMap<SimpleVersion, List<String>> ret = new TreeMap<>();
+        Map<String, String> fromConf = (Map<String, String>) conf.getOrDefault(Config.SUPERVISOR_WORKER_VERSION_CLASSPATH_MAP, Collections.emptyMap());
+        for (Map.Entry<String, String> entry: fromConf.entrySet()) {
+            ret.put(new SimpleVersion(entry.getKey()), Arrays.asList(entry.getValue().split(File.pathSeparator)));
+        }
+        ret.put(VersionInfo.OUR_VERSION, currentCP);
+        return ret;
+    }
+    
+    /**
+     * Get a map of version to worker main from the conf Config.SUPERVISOR_WORKER_VERSION_MAIN_MAP
+     * @param conf what to read it out of
+     * @return the map
+     */
+    public static NavigableMap<SimpleVersion, String> getConfiguredWorkerMainVersions(Map<String, Object> conf) {
+        TreeMap<SimpleVersion, String> ret = new TreeMap<>();
+        Map<String, String> fromConf = (Map<String, String>) conf.getOrDefault(Config.SUPERVISOR_WORKER_VERSION_MAIN_MAP, Collections.emptyMap());
+        for (Map.Entry<String, String> entry: fromConf.entrySet()) {
+            ret.put(new SimpleVersion(entry.getKey()), entry.getValue());
+        }
+
+        ret.put(VersionInfo.OUR_VERSION, "org.apache.storm.daemon.worker.Worker");
+        return ret;
+    }
+    
+    
+    /**
+     * Get a map of version to worker log writer from the conf Config.SUPERVISOR_WORKER_VERSION_LOGWRITER_MAP
+     * @param conf what to read it out of
+     * @return the map
+     */
+    public static NavigableMap<SimpleVersion, String> getConfiguredWorkerLogWriterVersions(Map<String, Object> conf) {
+        TreeMap<SimpleVersion, String> ret = new TreeMap<>();
+        Map<String, String> fromConf = (Map<String, String>) conf.getOrDefault(Config.SUPERVISOR_WORKER_VERSION_LOGWRITER_MAP, Collections.emptyMap());
+        for (Map.Entry<String, String> entry: fromConf.entrySet()) {
+            ret.put(new SimpleVersion(entry.getKey()), entry.getValue());
+        }
+
+        ret.put(VersionInfo.OUR_VERSION, "org.apache.storm.LogWriter");
+        return ret;
+    }
+    
+    
+    public static <T> T getCompatibleVersion(NavigableMap<SimpleVersion, T> versionedMap, SimpleVersion desiredVersion, String what, T defaultValue) {
+        Entry<SimpleVersion, T> ret = versionedMap.ceilingEntry(desiredVersion);
+        if (ret == null || ret.getKey().getMajor() != desiredVersion.getMajor()) {
+            //Could not find a "fully" compatible version.  Look to see if there is a possibly compatible version right below it
+            ret = versionedMap.floorEntry(desiredVersion);
+            if (ret == null || ret.getKey().getMajor() != desiredVersion.getMajor()) {
+                if (defaultValue != null) {
+                    LOG.warn("Could not find any compatible {} falling back to using {}", what, defaultValue);
+                }
+                return defaultValue;
+            }
+            LOG.warn("Could not find a higer compatible version for {} {}, using {} instead", what, desiredVersion, ret.getKey());
+        }
+        return ret.getValue();
+    }
+    
+    @SuppressWarnings("unchecked")
+    private static Map<String, Object> readConfIgnoreNotFound(Yaml yaml, File f) throws IOException {
+        Map<String, Object> ret = null;
+        if (f.exists()) {
+            try (FileReader fr = new FileReader(f)) {
+                ret = (Map<String, Object>) yaml.load(fr);
+            }
+        }
+        return ret;
+    }
+    
+    public static Map<String, Object> getConfigFromClasspath(List<String> cp, Map<String, Object> conf) throws IOException {
+        if (cp == null || cp.isEmpty()) {
+            return conf;
+        }
+        Yaml yaml = new Yaml(new SafeConstructor());
+        Map<String, Object> defaultsConf = null;
+        Map<String, Object> stormConf = null;
+        for (String part: cp) {
+            File f = new File(part);
+            if (f.isDirectory()) {
+                if (defaultsConf == null) {
+                    defaultsConf = readConfIgnoreNotFound(yaml, new File(f, "defaults.yaml"));
+                }
+                
+                if (stormConf == null) {
+                    stormConf = readConfIgnoreNotFound(yaml, new File(f, "storm.yaml"));
+                }
+            } else {
+                //Lets assume it is a jar file
+                try (JarFile jarFile = new JarFile(f)) {
+                    Enumeration<JarEntry> jarEnums = jarFile.entries();
+                    while (jarEnums.hasMoreElements()) {
+                        JarEntry entry = jarEnums.nextElement();
+                        if (!entry.isDirectory()) {
+                            if (defaultsConf == null && entry.getName().equals("defaults.yaml")) {
+                                try (InputStream in = jarFile.getInputStream(entry)) {
+                                    defaultsConf = (Map<String, Object>) yaml.load(new InputStreamReader(in));
+                                }
+                            }
+                            
+                            if (stormConf == null && entry.getName().equals("storm.yaml")) {
+                                try (InputStream in = jarFile.getInputStream(entry)) {
+                                    stormConf = (Map<String, Object>) yaml.load(new InputStreamReader(in));
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        }
+        if (stormConf != null) {
+            defaultsConf.putAll(stormConf);
+        }
+        return defaultsConf;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-client/src/jvm/org/apache/storm/utils/VersionInfo.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/VersionInfo.java b/storm-client/src/jvm/org/apache/storm/utils/VersionInfo.java
index b3c21d5..f60586f 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/VersionInfo.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/VersionInfo.java
@@ -23,7 +23,8 @@ import java.io.InputStream;
 import java.util.Properties;
 
 public class VersionInfo {
-
+  private static final VersionInfo COMMON_VERSION_INFO = new VersionInfo("storm-core");
+  public static final SimpleVersion OUR_VERSION = new SimpleVersion(COMMON_VERSION_INFO._getVersion());
   private Properties info;
 
   protected VersionInfo(String component) {
@@ -85,9 +86,6 @@ public class VersionInfo {
       " source checksum " + _getSrcChecksum();
   }
 
-
-  private static final VersionInfo COMMON_VERSION_INFO = new VersionInfo("storm-core");
-
   public static String getVersion() {
     return COMMON_VERSION_INFO._getVersion();
   }

http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-client/src/py/storm/ttypes.py
----------------------------------------------------------------------
diff --git a/storm-client/src/py/storm/ttypes.py b/storm-client/src/py/storm/ttypes.py
index e9085e6..d47483f 100644
--- a/storm-client/src/py/storm/ttypes.py
+++ b/storm-client/src/py/storm/ttypes.py
@@ -1414,6 +1414,8 @@ class StormTopology:
    - worker_hooks
    - dependency_jars
    - dependency_artifacts
+   - storm_version
+   - jdk_version
   """
 
   thrift_spec = (
@@ -1424,15 +1426,19 @@ class StormTopology:
     (4, TType.LIST, 'worker_hooks', (TType.STRING,None), None, ), # 4
     (5, TType.LIST, 'dependency_jars', (TType.STRING,None), None, ), # 5
     (6, TType.LIST, 'dependency_artifacts', (TType.STRING,None), None, ), # 6
+    (7, TType.STRING, 'storm_version', None, None, ), # 7
+    (8, TType.STRING, 'jdk_version', None, None, ), # 8
   )
 
-  def __init__(self, spouts=None, bolts=None, state_spouts=None, worker_hooks=None, dependency_jars=None, dependency_artifacts=None,):
+  def __init__(self, spouts=None, bolts=None, state_spouts=None, worker_hooks=None, dependency_jars=None, dependency_artifacts=None, storm_version=None, jdk_version=None,):
     self.spouts = spouts
     self.bolts = bolts
     self.state_spouts = state_spouts
     self.worker_hooks = worker_hooks
     self.dependency_jars = dependency_jars
     self.dependency_artifacts = dependency_artifacts
+    self.storm_version = storm_version
+    self.jdk_version = jdk_version
 
   def read(self, iprot):
     if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -1509,6 +1515,16 @@ class StormTopology:
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
+      elif fid == 7:
+        if ftype == TType.STRING:
+          self.storm_version = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
+      elif fid == 8:
+        if ftype == TType.STRING:
+          self.jdk_version = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
       else:
         iprot.skip(ftype)
       iprot.readFieldEnd()
@@ -1564,6 +1580,14 @@ class StormTopology:
         oprot.writeString(iter86.encode('utf-8'))
       oprot.writeListEnd()
       oprot.writeFieldEnd()
+    if self.storm_version is not None:
+      oprot.writeFieldBegin('storm_version', TType.STRING, 7)
+      oprot.writeString(self.storm_version.encode('utf-8'))
+      oprot.writeFieldEnd()
+    if self.jdk_version is not None:
+      oprot.writeFieldBegin('jdk_version', TType.STRING, 8)
+      oprot.writeString(self.jdk_version.encode('utf-8'))
+      oprot.writeFieldEnd()
     oprot.writeFieldStop()
     oprot.writeStructEnd()
 
@@ -1585,6 +1609,8 @@ class StormTopology:
     value = (value * 31) ^ hash(self.worker_hooks)
     value = (value * 31) ^ hash(self.dependency_jars)
     value = (value * 31) ^ hash(self.dependency_artifacts)
+    value = (value * 31) ^ hash(self.storm_version)
+    value = (value * 31) ^ hash(self.jdk_version)
     return value
 
   def __repr__(self):
@@ -2028,6 +2054,7 @@ class TopologySummary:
    - num_workers
    - uptime_secs
    - status
+   - storm_version
    - sched_status
    - owner
    - replication_count
@@ -2048,7 +2075,7 @@ class TopologySummary:
     (5, TType.I32, 'num_workers', None, None, ), # 5
     (6, TType.I32, 'uptime_secs', None, None, ), # 6
     (7, TType.STRING, 'status', None, None, ), # 7
-    None, # 8
+    (8, TType.STRING, 'storm_version', None, None, ), # 8
     None, # 9
     None, # 10
     None, # 11
@@ -2569,7 +2596,7 @@ class TopologySummary:
     (526, TType.DOUBLE, 'assigned_cpu', None, None, ), # 526
   )
 
-  def __init__(self, id=None, name=None, num_tasks=None, num_executors=None, num_workers=None, uptime_secs=None, status=None, sched_status=None, owner=None, replication_count=None, requested_memonheap=None, requested_memoffheap=None, requested_cpu=None, assigned_memonheap=None, assigned_memoffheap=None, assigned_cpu=None,):
+  def __init__(self, id=None, name=None, num_tasks=None, num_executors=None, num_workers=None, uptime_secs=None, status=None, storm_version=None, sched_status=None, owner=None, replication_count=None, requested_memonheap=None, requested_memoffheap=None, requested_cpu=None, assigned_memonheap=None, assigned_memoffheap=None, assigned_cpu=None,):
     self.id = id
     self.name = name
     self.num_tasks = num_tasks
@@ -2577,6 +2604,7 @@ class TopologySummary:
     self.num_workers = num_workers
     self.uptime_secs = uptime_secs
     self.status = status
+    self.storm_version = storm_version
     self.sched_status = sched_status
     self.owner = owner
     self.replication_count = replication_count
@@ -2631,6 +2659,11 @@ class TopologySummary:
           self.status = iprot.readString().decode('utf-8')
         else:
           iprot.skip(ftype)
+      elif fid == 8:
+        if ftype == TType.STRING:
+          self.storm_version = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
       elif fid == 513:
         if ftype == TType.STRING:
           self.sched_status = iprot.readString().decode('utf-8')
@@ -2714,6 +2747,10 @@ class TopologySummary:
       oprot.writeFieldBegin('status', TType.STRING, 7)
       oprot.writeString(self.status.encode('utf-8'))
       oprot.writeFieldEnd()
+    if self.storm_version is not None:
+      oprot.writeFieldBegin('storm_version', TType.STRING, 8)
+      oprot.writeString(self.storm_version.encode('utf-8'))
+      oprot.writeFieldEnd()
     if self.sched_status is not None:
       oprot.writeFieldBegin('sched_status', TType.STRING, 513)
       oprot.writeString(self.sched_status.encode('utf-8'))
@@ -2780,6 +2817,7 @@ class TopologySummary:
     value = (value * 31) ^ hash(self.num_workers)
     value = (value * 31) ^ hash(self.uptime_secs)
     value = (value * 31) ^ hash(self.status)
+    value = (value * 31) ^ hash(self.storm_version)
     value = (value * 31) ^ hash(self.sched_status)
     value = (value * 31) ^ hash(self.owner)
     value = (value * 31) ^ hash(self.replication_count)
@@ -4298,6 +4336,7 @@ class TopologyInfo:
    - status
    - errors
    - component_debug
+   - storm_version
    - sched_status
    - owner
    - replication_count
@@ -4318,7 +4357,7 @@ class TopologyInfo:
     (5, TType.STRING, 'status', None, None, ), # 5
     (6, TType.MAP, 'errors', (TType.STRING,None,TType.LIST,(TType.STRUCT,(ErrorInfo, ErrorInfo.thrift_spec))), None, ), # 6
     (7, TType.MAP, 'component_debug', (TType.STRING,None,TType.STRUCT,(DebugOptions, DebugOptions.thrift_spec)), None, ), # 7
-    None, # 8
+    (8, TType.STRING, 'storm_version', None, None, ), # 8
     None, # 9
     None, # 10
     None, # 11
@@ -4839,7 +4878,7 @@ class TopologyInfo:
     (526, TType.DOUBLE, 'assigned_cpu', None, None, ), # 526
   )
 
-  def __init__(self, id=None, name=None, uptime_secs=None, executors=None, status=None, errors=None, component_debug=None, sched_status=None, owner=None, replication_count=None, requested_memonheap=None, requested_memoffheap=None, requested_cpu=None, assigned_memonheap=None, assigned_memoffheap=None, assigned_cpu=None,):
+  def __init__(self, id=None, name=None, uptime_secs=None, executors=None, status=None, errors=None, component_debug=None, storm_version=None, sched_status=None, owner=None, replication_count=None, requested_memonheap=None, requested_memoffheap=None, requested_cpu=None, assigned_memonheap=None, assigned_memoffheap=None, assigned_cpu=None,):
     self.id = id
     self.name = name
     self.uptime_secs = uptime_secs
@@ -4847,6 +4886,7 @@ class TopologyInfo:
     self.status = status
     self.errors = errors
     self.component_debug = component_debug
+    self.storm_version = storm_version
     self.sched_status = sched_status
     self.owner = owner
     self.replication_count = replication_count
@@ -4926,6 +4966,11 @@ class TopologyInfo:
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
+      elif fid == 8:
+        if ftype == TType.STRING:
+          self.storm_version = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
       elif fid == 513:
         if ftype == TType.STRING:
           self.sched_status = iprot.readString().decode('utf-8')
@@ -5023,6 +5068,10 @@ class TopologyInfo:
         viter328.write(oprot)
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
+    if self.storm_version is not None:
+      oprot.writeFieldBegin('storm_version', TType.STRING, 8)
+      oprot.writeString(self.storm_version.encode('utf-8'))
+      oprot.writeFieldEnd()
     if self.sched_status is not None:
       oprot.writeFieldBegin('sched_status', TType.STRING, 513)
       oprot.writeString(self.sched_status.encode('utf-8'))
@@ -5087,6 +5136,7 @@ class TopologyInfo:
     value = (value * 31) ^ hash(self.status)
     value = (value * 31) ^ hash(self.errors)
     value = (value * 31) ^ hash(self.component_debug)
+    value = (value * 31) ^ hash(self.storm_version)
     value = (value * 31) ^ hash(self.sched_status)
     value = (value * 31) ^ hash(self.owner)
     value = (value * 31) ^ hash(self.replication_count)
@@ -6668,6 +6718,7 @@ class TopologyPageInfo:
    - debug_options
    - replication_count
    - workers
+   - storm_version
    - requested_memonheap
    - requested_memoffheap
    - requested_cpu
@@ -6694,7 +6745,7 @@ class TopologyPageInfo:
     (14, TType.STRUCT, 'debug_options', (DebugOptions, DebugOptions.thrift_spec), None, ), # 14
     (15, TType.I32, 'replication_count', None, None, ), # 15
     (16, TType.LIST, 'workers', (TType.STRUCT,(WorkerSummary, WorkerSummary.thrift_spec)), None, ), # 16
-    None, # 17
+    (17, TType.STRING, 'storm_version', None, None, ), # 17
     None, # 18
     None, # 19
     None, # 20
@@ -7206,7 +7257,7 @@ class TopologyPageInfo:
     (526, TType.DOUBLE, 'assigned_cpu', None, None, ), # 526
   )
 
-  def __init__(self, id=None, name=None, uptime_secs=None, status=None, num_tasks=None, num_workers=None, num_executors=None, topology_conf=None, id_to_spout_agg_stats=None, id_to_bolt_agg_stats=None, sched_status=None, topology_stats=None, owner=None, debug_options=None, replication_count=None, workers=None, requested_memonheap=None, requested_memoffheap=None, requested_cpu=None, assigned_memonheap=None, assigned_memoffheap=None, assigned_cpu=None,):
+  def __init__(self, id=None, name=None, uptime_secs=None, status=None, num_tasks=None, num_workers=None, num_executors=None, topology_conf=None, id_to_spout_agg_stats=None, id_to_bolt_agg_stats=None, sched_status=None, topology_stats=None, owner=None, debug_options=None, replication_count=None, workers=None, storm_version=None, requested_memonheap=None, requested_memoffheap=None, requested_cpu=None, assigned_memonheap=None, assigned_memoffheap=None, assigned_cpu=None,):
     self.id = id
     self.name = name
     self.uptime_secs = uptime_secs
@@ -7223,6 +7274,7 @@ class TopologyPageInfo:
     self.debug_options = debug_options
     self.replication_count = replication_count
     self.workers = workers
+    self.storm_version = storm_version
     self.requested_memonheap = requested_memonheap
     self.requested_memoffheap = requested_memoffheap
     self.requested_cpu = requested_cpu
@@ -7341,6 +7393,11 @@ class TopologyPageInfo:
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
+      elif fid == 17:
+        if ftype == TType.STRING:
+          self.storm_version = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
       elif fid == 521:
         if ftype == TType.DOUBLE:
           self.requested_memonheap = iprot.readDouble()
@@ -7456,6 +7513,10 @@ class TopologyPageInfo:
         iter430.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
+    if self.storm_version is not None:
+      oprot.writeFieldBegin('storm_version', TType.STRING, 17)
+      oprot.writeString(self.storm_version.encode('utf-8'))
+      oprot.writeFieldEnd()
     if self.requested_memonheap is not None:
       oprot.writeFieldBegin('requested_memonheap', TType.DOUBLE, 521)
       oprot.writeDouble(self.requested_memonheap)
@@ -7507,6 +7568,7 @@ class TopologyPageInfo:
     value = (value * 31) ^ hash(self.debug_options)
     value = (value * 31) ^ hash(self.replication_count)
     value = (value * 31) ^ hash(self.workers)
+    value = (value * 31) ^ hash(self.storm_version)
     value = (value * 31) ^ hash(self.requested_memonheap)
     value = (value * 31) ^ hash(self.requested_memoffheap)
     value = (value * 31) ^ hash(self.requested_cpu)

http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-client/src/storm.thrift
----------------------------------------------------------------------
diff --git a/storm-client/src/storm.thrift b/storm-client/src/storm.thrift
index cb2239d..ee02d1b 100644
--- a/storm-client/src/storm.thrift
+++ b/storm-client/src/storm.thrift
@@ -120,6 +120,8 @@ struct StormTopology {
   4: optional list<binary> worker_hooks;
   5: optional list<string> dependency_jars;
   6: optional list<string> dependency_artifacts;
+  7: optional string storm_version;
+  8: optional string jdk_version;
 }
 
 exception AlreadyAliveException {
@@ -154,6 +156,7 @@ struct TopologySummary {
   5: required i32 num_workers;
   6: required i32 uptime_secs;
   7: required string status;
+  8: optional string storm_version;
 513: optional string sched_status;
 514: optional string owner;
 515: optional i32 replication_count;
@@ -255,6 +258,7 @@ struct TopologyInfo {
   5: required string status;
   6: required map<string, list<ErrorInfo>> errors;
   7: optional map<string, DebugOptions> component_debug;
+  8: optional string storm_version;
 513: optional string sched_status;
 514: optional string owner;
 515: optional i32 replication_count;
@@ -352,6 +356,7 @@ struct TopologyPageInfo {
 14: optional DebugOptions debug_options;
 15: optional i32 replication_count;
 16: optional list<WorkerSummary> workers;
+17: optional string storm_version;
 521: optional double requested_memonheap;
 522: optional double requested_memoffheap;
 523: optional double requested_cpu;

http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-core/src/clj/org/apache/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj b/storm-core/src/clj/org/apache/storm/ui/core.clj
index 2a07fdc..d1bdee8 100644
--- a/storm-core/src/clj/org/apache/storm/ui/core.clj
+++ b/storm-core/src/clj/org/apache/storm/ui/core.clj
@@ -554,7 +554,8 @@
        "assignedMemOnHeap" (.get_assigned_memonheap t)
        "assignedMemOffHeap" (.get_assigned_memoffheap t)
        "assignedTotalMem" (+ (.get_assigned_memonheap t) (.get_assigned_memoffheap t))
-       "assignedCpu" (.get_assigned_cpu t)})
+       "assignedCpu" (.get_assigned_cpu t)
+       "stormVersion" (.get_storm_version t)})
     "schedulerDisplayResource" (*STORM-CONF* SCHEDULER-DISPLAY-RESOURCE)}))
 
 (defn topology-stats [window stats]
@@ -694,7 +695,8 @@
      "configuration" (.get_topology_conf topo-info)
      "debug" (or debugEnabled false)
      "samplingPct" (or samplingPct 10)
-     "replicationCount" (.get_replication_count topo-info)}))
+     "replicationCount" (.get_replication_count topo-info)
+     "stormVersion" (.get_storm_version topo-info)}))
 
 (defn exec-host-port
   [executors]

http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-core/src/ui/public/templates/index-page-template.html
----------------------------------------------------------------------
diff --git a/storm-core/src/ui/public/templates/index-page-template.html b/storm-core/src/ui/public/templates/index-page-template.html
index ed589d9..1802452 100644
--- a/storm-core/src/ui/public/templates/index-page-template.html
+++ b/storm-core/src/ui/public/templates/index-page-template.html
@@ -213,6 +213,11 @@
             Scheduler Info
           </span>
         </th>
+        <th>
+          <span data-toggle="tooltip" data-placement="left" title="The version of the storm client that this topology request (was launched with)">
+            Storm Version
+          </span>
+        </th>
       </tr>
     </thead>
     <tbody>
@@ -231,6 +236,7 @@
         <td>{{assignedCpu}}</td>
         {{/schedulerDisplayResource}}
         <td>{{schedulerInfo}}</td>
+        <td>{{stormVersion}}</td>
       </tr>
       {{/topologies}}
     </tbody>

http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-core/src/ui/public/templates/topology-page-template.html
----------------------------------------------------------------------
diff --git a/storm-core/src/ui/public/templates/topology-page-template.html b/storm-core/src/ui/public/templates/topology-page-template.html
index 2106d6c..0d45de0 100644
--- a/storm-core/src/ui/public/templates/topology-page-template.html
+++ b/storm-core/src/ui/public/templates/topology-page-template.html
@@ -80,6 +80,11 @@
             Scheduler Info
           </span>
         </th>
+        <th>
+          <span data-toggle="tooltip" data-placement="left" title="The version of the storm client that this topology request (was launched with)">
+            Storm Version
+          </span>
+        </th> 
       </tr>
     </thead>
     <tbody>
@@ -98,6 +103,7 @@
         <td>{{assignedCpu}}</td>
         {{/schedulerDisplayResource}}
         <td>{{schedulerInfo}}</td>
+        <td>{{stormVersion}}</td>
       </tr>
     </tbody>
   </table>

http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index e91a9b0..bfa8bd6 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -37,7 +37,7 @@
             TopologyInitialStatus TopologyStatus AlreadyAliveException KillOptions RebalanceOptions
             InvalidTopologyException AuthorizationException
             LogConfig LogLevel LogLevelAction Assignment NodeInfo])
-  (:import [java.util HashMap HashSet Optional])
+  (:import [java.util Map HashMap HashSet Optional])
   (:import [java.io File])
   (:import [javax.security.auth Subject])
   (:import [org.apache.storm.utils Time Time$SimulatedTime IPredicate StormCommonInstaller Utils$UptimeComputer ReflectionUtils Utils ConfigUtils ServerConfigUtils]
@@ -1338,8 +1338,7 @@
       (.thenReturn (Mockito/when (.readTopologyConf blob-store (Mockito/any String) (Mockito/anyObject))) expected-conf)
       (.thenReturn (Mockito/when (.readTopology blob-store (Mockito/any String) (Mockito/anyObject))) nil)
       (testing "getTopologyConf calls check-authorization! with the correct parameters."
-      (let [expected-operation "getTopologyConf"
-            expected-conf-json (JSONValue/toJSONString expected-conf)]
+      (let [expected-operation "getTopologyConf"]
           (try
             (is (= expected-conf
                    (->> (.getTopologyConf nimbus topology-id)
@@ -1347,7 +1346,8 @@
                         clojurify-structure)))
             (catch NotAliveException e)
             (finally
-              (.checkAuthorization (Mockito/verify nimbus) topology-name expected-conf expected-operation)))))
+              (.checkAuthorization (Mockito/verify nimbus) nil nil "getClusterInfo")
+              (.checkAuthorization (Mockito/verify nimbus) (Mockito/eq topology-name) (Mockito/any Map) (Mockito/eq expected-operation))))))
 
       (testing "getTopology calls check-authorization! with the correct parameters."
         (let [expected-operation "getTopology"
@@ -1360,9 +1360,9 @@
               (.getTopology nimbus topology-id)
               (catch NotAliveException e)
               (finally
-                (.checkAuthorization (Mockito/verify nimbus) topology-name expected-conf expected-operation)
+                (.checkAuthorization (Mockito/verify nimbus) (Mockito/eq topology-name) (Mockito/any Map) (Mockito/eq expected-operation))
                 (. (Mockito/verify common-spy)
-                  (systemTopologyImpl (Matchers/eq expected-conf)
+                  (systemTopologyImpl (Matchers/any Map)
                                       (Matchers/any))))))))
 
       (testing "getUserTopology calls check-authorization with the correct parameters."
@@ -1371,7 +1371,7 @@
             (.getUserTopology nimbus topology-id)
             (catch NotAliveException e)
             (finally
-              (.checkAuthorization (Mockito/verify nimbus) topology-name expected-conf expected-operation)
+              (.checkAuthorization (Mockito/verify nimbus) (Mockito/eq topology-name) (Mockito/any Map) (Mockito/eq expected-operation))
               ;;One for this time and one for getTopology call
               (.readTopology (Mockito/verify blob-store (Mockito/times 2)) (Mockito/eq topology-id) (Mockito/anyObject))))))))))
 
@@ -1417,8 +1417,9 @@
       (.getSupervisorPageInfo nimbus "super1" nil true)
  
       ;; afterwards, it should get called twice
-      (.checkAuthorization (Mockito/verify nimbus) expected-name expected-conf "getSupervisorPageInfo")
-      (.checkAuthorization (Mockito/verify nimbus) expected-name expected-conf "getTopology")))))
+      (.checkAuthorization (Mockito/verify nimbus) (Mockito/eq expected-name) (Mockito/any Map) (Mockito/eq "getSupervisorPageInfo"))
+      (.checkAuthorization (Mockito/verify nimbus) nil nil "getClusterInfo")
+      (.checkAuthorization (Mockito/verify nimbus) (Mockito/eq expected-name) (Mockito/any Map) (Mockito/eq "getTopology"))))))
 
 (deftest test-nimbus-iface-getTopology-methods-throw-correctly
   (with-open [cluster (LocalCluster. )]

http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-server/src/main/java/org/apache/storm/LocalCluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/LocalCluster.java b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
index 1330594..25a3ec6 100644
--- a/storm-server/src/main/java/org/apache/storm/LocalCluster.java
+++ b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
@@ -531,7 +531,7 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
         if (!Utils.isValidConf(conf)) {
             throw new IllegalArgumentException("Topology conf is not json-serializable");
         }
-        getNimbus().submitTopology(topologyName, null, JSONValue.toJSONString(conf), topology);
+        getNimbus().submitTopology(topologyName, null, JSONValue.toJSONString(conf), Utils.addVersions(topology));
         
         ISubmitterHook hook = (ISubmitterHook) Utils.getConfiguredClass(conf, Config.STORM_TOPOLOGY_SUBMISSION_NOTIFIER_PLUGIN);
         if (hook != null) {
@@ -551,22 +551,20 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
         if (!Utils.isValidConf(conf)) {
             throw new IllegalArgumentException("Topology conf is not json-serializable");
         }
-        getNimbus().submitTopologyWithOpts(topologyName, null, JSONValue.toJSONString(conf), topology, submitOpts);
+        getNimbus().submitTopologyWithOpts(topologyName, null, JSONValue.toJSONString(conf),  Utils.addVersions(topology), submitOpts);
         return new LocalTopology(topologyName, topology);
     }
 
     @Override
     public LocalTopology submitTopology(String topologyName, Map<String, Object> conf, TrackedTopology topology)
             throws TException {
-        submitTopology(topologyName, conf, topology.getTopology());
-        return new LocalTopology(topologyName, topology.getTopology());
+        return submitTopology(topologyName, conf, topology.getTopology());
     }
 
     @Override
     public LocalTopology submitTopologyWithOpts(String topologyName, Map<String, Object> conf, TrackedTopology topology, SubmitOptions submitOpts)
             throws TException {
-            submitTopologyWithOpts(topologyName, conf, topology.getTopology(), submitOpts);
-            return new LocalTopology(topologyName, topology.getTopology());
+            return submitTopologyWithOpts(topologyName, conf, topology.getTopology(), submitOpts);
     }
     
     @Override
@@ -611,25 +609,21 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
         }
     }
 
-
     @Override
     public String getTopologyConf(String id) throws TException {
         return getNimbus().getTopologyConf(id);
     }
 
-
     @Override
     public StormTopology getTopology(String id) throws TException {
         return getNimbus().getTopology(id);
     }
 
-
     @Override
     public ClusterSummary getClusterInfo() throws TException {
         return getNimbus().getClusterInfo();
     }
 
-
     @Override
     public TopologyInfo getTopologyInfo(String id) throws TException {
         return getNimbus().getTopologyInfo(id);

http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 032bf7c..f569e9b 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -39,6 +39,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.NavigableMap;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -157,6 +158,7 @@ import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.ReflectionUtils;
 import org.apache.storm.utils.ServerConfigUtils;
 import org.apache.storm.utils.ServerUtils;
+import org.apache.storm.utils.SimpleVersion;
 import org.apache.storm.utils.Time;
 import org.apache.storm.utils.TimeCacheMap;
 import org.apache.storm.utils.TupleUtils;
@@ -1022,6 +1024,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     }
     
     private final Map<String, Object> conf;
+    private final NavigableMap<SimpleVersion, List<String>> supervisorClasspaths;
     private final NimbusInfo nimbusHostPortInfo;
     private final INimbus inimbus;
     private IAuthorizer authorizationHandler;
@@ -1128,6 +1131,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         }
         this.groupMapper = groupMapper;
         this.principalToLocal = AuthUtils.GetPrincipalToLocalPlugin(conf);
+        this.supervisorClasspaths = Collections.unmodifiableNavigableMap(Utils.getConfiguredClasspathVersions(conf, EMPTY_STRING_LIST));// We don't use the classpath part of this, so just an empty list
     }
 
     Map<String, Object> getConf() {
@@ -1975,6 +1979,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     
     private boolean isAuthorized(String operation, String topoId) throws NotAliveException, AuthorizationException, IOException {
         Map<String, Object> topoConf = tryReadTopoConf(topoId, blobStore);
+        topoConf = merge(conf, topoConf);
         String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
         try {
             checkAuthorization(topoName, topoConf, operation);
@@ -2155,7 +2160,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         List<String> assignedIds = state.activeStorms();
         if (assignedIds != null) {
             for (String id: assignedIds) {
-                Map<String, Object> topoConf = Collections.unmodifiableMap(tryReadTopoConf(id, store));
+                Map<String, Object> topoConf = Collections.unmodifiableMap(merge(conf, tryReadTopoConf(id, store)));
                 synchronized(lock) {
                     Credentials origCreds = state.credentials(id, null);
                     if (origCreds != null) {
@@ -2268,7 +2273,14 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             
             TopologySummary summary = new TopologySummary(topoId, base.get_name(), numTasks, numExecutors, numWorkers,
                     Time.deltaSecs(base.get_launch_time_secs()), extractStatusStr(base));
-            
+            try {
+                StormTopology topo = tryReadTopology(topoId, blobStore);
+                if (topo != null && topo.is_set_storm_version()) {
+                    summary.set_storm_version(topo.get_storm_version());
+                }
+            } catch (NotAliveException e) {
+                //Ignored it is not set
+            }
             if (base.is_set_owner()) {
                 summary.set_owner(base.get_owner());
             }
@@ -2520,7 +2532,22 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             if (!(Boolean)conf.getOrDefault(DaemonConfig.STORM_TOPOLOGY_CLASSPATH_BEGINNING_ENABLED, false)) {
                 topoConf.remove(Config.TOPOLOGY_CLASSPATH_BEGINNING);
             }
-            Map<String, Object> totalConf = merge(conf, topoConf);
+            String topoVersionString = topology.get_storm_version();
+            if (topoVersionString == null) {
+                topoVersionString = (String)conf.getOrDefault(Config.SUPERVISOR_WORKER_DEFAULT_VERSION, VersionInfo.getVersion());
+            }
+            //Check if we can run a topology with that version of storm.
+            SimpleVersion topoVersion = new SimpleVersion(topoVersionString);
+            List<String> cp = Utils.getCompatibleVersion(supervisorClasspaths, topoVersion, "classpath", null);
+            if (cp == null) {
+                throw new InvalidTopologyException("Topology submitted with storm version " + topoVersionString 
+                        + " but could not find a configured compatible version to use " + supervisorClasspaths.keySet());
+            }
+            Map<String, Object> otherConf = Utils.getConfigFromClasspath(cp, conf);
+            Map<String, Object> totalConfToSave = merge(otherConf, topoConf);
+            Map<String, Object> totalConf = merge(totalConfToSave, conf);
+            //When reading the conf in nimbus we want to fall back to our own settings
+            // if the other config does not have it set.
             topology = normalizeTopology(totalConf, topology);
             IStormClusterState state = stormClusterState;
             
@@ -2541,7 +2568,10 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
                     !Utils.isZkAuthenticationConfiguredTopology(topoConf)) {
                 throw new IllegalArgumentException("The cluster is configured for zookeeper authentication, but no payload was provided.");
             }
-            LOG.info("Received topology submission for {} with conf {}", topoName, Utils.redactValue(topoConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD));
+            LOG.info("Received topology submission for {} (storm-{} JDK-{}) with conf {}", topoName, 
+                    topoVersionString, topology.get_jdk_version(),
+                    Utils.redactValue(topoConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD));
+            
             // lock protects against multiple topologies being submitted at once and
             // cleanup thread killing topology in b/w assignment and starting the topology
             synchronized(submitLock) {
@@ -2551,7 +2581,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
                     state.setCredentials(topoId, new Credentials(creds), topoConf);
                 }
                 LOG.info("uploadedJar {}", uploadedJarLocation);
-                setupStormCode(conf, topoId, uploadedJarLocation, totalConf, topology);
+                setupStormCode(conf, topoId, uploadedJarLocation, totalConfToSave, topology);
                 waitForDesiredCodeReplication(totalConf, topoId);
                 state.setupHeatbeats(topoId);
                 if (ObjectReader.getBoolean(totalConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false)) {
@@ -2594,6 +2624,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         assertTopoActive(topoName, true);
         try {
             Map<String, Object> topoConf = tryReadTopoConfFromName(topoName);
+            topoConf = merge(conf, topoConf);
             final String operation = "killTopology";
             checkAuthorization(topoName, topoConf, operation);
             Integer waitAmount = null;
@@ -2617,6 +2648,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         activateCalls.mark();
         try {
             Map<String, Object> topoConf = tryReadTopoConfFromName(topoName);
+            topoConf = merge(conf, topoConf);
             final String operation = "activate";
             checkAuthorization(topoName, topoConf, operation);
             transitionName(topoName, TopologyActions.ACTIVATE, null, true);
@@ -2635,6 +2667,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         deactivateCalls.mark();
         try {
             Map<String, Object> topoConf = tryReadTopoConfFromName(topoName);
+            topoConf = merge(conf, topoConf);
             final String operation = "deactivate";
             checkAuthorization(topoName, topoConf, operation);
             transitionName(topoName, TopologyActions.INACTIVATE, null, true);
@@ -2655,6 +2688,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         assertTopoActive(topoName, true);
         try {
             Map<String, Object> topoConf = tryReadTopoConfFromName(topoName);
+            topoConf = merge(conf, topoConf);
             final String operation = "rebalance";
             checkAuthorization(topoName, topoConf, operation);
             Map<String, Integer> execOverrides = options.is_set_num_executors() ? options.get_num_executors() : Collections.emptyMap();
@@ -2679,6 +2713,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         try {
             setLogConfigCalls.mark();
             Map<String, Object> topoConf = tryReadTopoConf(topoId, blobStore);
+            topoConf = merge(conf, topoConf);
             String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
             checkAuthorization(topoName, topoConf, "setLogConfig");
             IStormClusterState state = stormClusterState;
@@ -2732,6 +2767,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         try {
             getLogConfigCalls.mark();
             Map<String, Object> topoConf = tryReadTopoConf(topoId, blobStore);
+            topoConf = merge(conf, topoConf);
             String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
             checkAuthorization(topoName, topoConf, "getLogConfig");
             IStormClusterState state = stormClusterState;
@@ -2757,6 +2793,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             IStormClusterState state = stormClusterState;
             String topoId = toTopoId(topoName);
             Map<String, Object> topoConf = tryReadTopoConf(topoId, blobStore);
+            topoConf = merge(conf, topoConf);
             // make sure samplingPct is within bounds.
             double spct = Math.max(Math.min(samplingPercentage, 100.0), 0.0);
             // while disabling we retain the sampling pct.
@@ -2797,6 +2834,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         try {
             setWorkerProfilerCalls.mark();
             Map<String, Object> topoConf = tryReadTopoConf(topoId, blobStore);
+            topoConf = merge(conf, topoConf);
             String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
             checkAuthorization(topoName, topoConf, "setWorkerProfiler");
             IStormClusterState state = stormClusterState;
@@ -2868,6 +2906,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
                 throw new NotAliveException(topoName + " is not alive");
             }
             Map<String, Object> topoConf = tryReadTopoConf(topoId, blobStore);
+            topoConf = merge(conf, topoConf);
             if (credentials == null) {
                 credentials = new Credentials(Collections.emptyMap());
             }
@@ -3364,6 +3403,9 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             }
             TopologyInfo topoInfo = new TopologyInfo(topoId, common.topoName, Time.deltaSecs(common.launchTimeSecs),
                     summaries, extractStatusStr(common.base), errors);
+            if (common.topology.is_set_storm_version()) {
+                topoInfo.set_storm_version(common.topology.get_storm_version());
+            }
             if (common.base.is_set_owner()) {
                 topoInfo.set_owner(common.base.get_owner());
             }
@@ -3407,7 +3449,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             Map<List<Integer>, Map<String, Object>> beats = common.beats;
             Map<Integer, String> taskToComp = common.taskToComponent;
             StormTopology topology = common.topology;
-            Map<String, Object> topoConf = common.topoConf;
+            Map<String, Object> topoConf = merge(conf, common.topoConf);
             StormBase base = common.base;
             if (base == null) {
                 throw new NotAliveException(topoId);
@@ -3444,6 +3486,10 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
                     includeSys,
                     state);
             
+            if (topology.is_set_storm_version()) {
+                topoPageInfo.set_storm_version(topology.get_storm_version());
+            }
+            
             Map<String, Map<String, Double>> spoutResources = ResourceUtils.getSpoutsResources(topology, topoConf);
             for (Entry<String, ComponentAggregateStats> entry: topoPageInfo.get_id_to_spout_agg_stats().entrySet()) {
                 CommonAggregateStats commonStats = entry.getValue().get_common_stats();
@@ -3576,6 +3622,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             }
             StormTopology topology = info.topology;
             Map<String, Object> topoConf = info.topoConf;
+            topoConf = merge(conf, topoConf);
             Assignment assignment = info.assignment;
             Map<List<Long>, List<Object>> exec2NodePort = new HashMap<>();
             Map<String, String> nodeToHost;
@@ -3652,8 +3699,9 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         try {
             getTopologyConfCalls.mark();
             Map<String, Object> topoConf = tryReadTopoConf(id, blobStore);
-            String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
-            checkAuthorization(topoName, topoConf, "getTopologyConf");
+            Map<String, Object> checkConf = merge(conf, topoConf);
+            String topoName = (String) checkConf.get(Config.TOPOLOGY_NAME);
+            checkAuthorization(topoName, checkConf, "getTopologyConf");
             return JSONValue.toJSONString(topoConf);
         } catch (Exception e) {
             LOG.warn("Get topo conf exception. (topology id='{}')", id, e);
@@ -3669,6 +3717,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         try {
             getTopologyCalls.mark();
             Map<String, Object> topoConf = tryReadTopoConf(id, blobStore);
+            topoConf = merge(conf, topoConf);
             String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
             checkAuthorization(topoName, topoConf, "getTopology");
             return StormCommon.systemTopology(topoConf, tryReadTopology(id, blobStore));
@@ -3686,6 +3735,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         try {
             getUserTopologyCalls.mark();
             Map<String, Object> topoConf = tryReadTopoConf(id, blobStore);
+            topoConf = merge(conf, topoConf);
             String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
             checkAuthorization(topoName, topoConf, "getUserTopology");
             return tryReadTopology(id, blobStore);
@@ -3710,6 +3760,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             boolean isAdmin = adminUsers.contains(user);
             for (String topoId: assignedIds) {
                 Map<String, Object> topoConf = tryReadTopoConf(topoId, store);
+                topoConf = merge(conf, topoConf);
                 List<String> groups = ServerConfigUtils.getTopoLogsGroups(topoConf);
                 List<String> topoLogUsers = ServerConfigUtils.getTopoLogsUsers(topoConf);
                 if (user == null || isAdmin ||

http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
index cd9ca97..0307baa 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
@@ -33,6 +33,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.NavigableMap;
 import java.util.stream.Collectors;
 
 import org.apache.commons.lang.StringUtils;
@@ -45,11 +46,13 @@ import org.apache.storm.generated.ProfileRequest;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.generated.WorkerResources;
 import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.ServerConfigUtils;
 import org.apache.storm.utils.ServerUtils;
+import org.apache.storm.utils.SimpleVersion;
 import org.apache.storm.utils.Utils;
-import org.apache.storm.utils.ObjectReader;
-import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.VersionInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -344,7 +347,7 @@ public class BasicContainer extends Container {
         return Paths.get(dir.toString(), "*").toString();
     }
     
-    protected List<String> frameworkClasspath() {
+    protected List<String> frameworkClasspath(SimpleVersion topoVersion) {
         File stormWorkerLibDir = new File(_stormHome, "lib-worker");
         String topoConfDir =
                 System.getenv("STORM_CONF_DIR") != null ?
@@ -358,7 +361,32 @@ public class BasicContainer extends Container {
         pathElements.add(extcp);
         pathElements.add(topoConfDir);
 
-        return pathElements;
+        NavigableMap<SimpleVersion, List<String>> classpaths = Utils.getConfiguredClasspathVersions(_conf, pathElements);
+        
+        return Utils.getCompatibleVersion(classpaths, topoVersion, "classpath", pathElements);
+    }
+    
+    protected String getWorkerMain(SimpleVersion topoVersion) {
+        String defaultWorkerGuess = "org.apache.storm.daemon.worker.Worker";
+        if (topoVersion.getMajor() == 0) {
+            //Prior to the org.apache change
+            defaultWorkerGuess = "backtype.storm.daemon.worker";
+        } else if (topoVersion.getMajor() == 1) {
+            //Have not moved to a java worker yet
+            defaultWorkerGuess = "org.apache.storm.daemon.worker";
+        }
+        NavigableMap<SimpleVersion,String> mains = Utils.getConfiguredWorkerMainVersions(_conf);
+        return Utils.getCompatibleVersion(mains, topoVersion, "worker main class", defaultWorkerGuess);
+    }
+    
+    protected String getWorkerLogWriter(SimpleVersion topoVersion) {
+        String defaultGuess = "org.apache.storm.LogWriter";
+        if (topoVersion.getMajor() == 0) {
+            //Prior to the org.apache change
+            defaultGuess = "backtype.storm.LogWriter";
+        }
+        NavigableMap<SimpleVersion,String> mains = Utils.getConfiguredWorkerLogWriterVersions(_conf);
+        return Utils.getCompatibleVersion(mains, topoVersion, "worker log writer class", defaultGuess);
     }
     
     @SuppressWarnings("unchecked")
@@ -375,12 +403,13 @@ public class BasicContainer extends Container {
      * Compute the classpath for the worker process
      * @param stormJar the topology jar
      * @param dependencyLocations any dependencies from the topology
+     * @param stormVerison the version of the storm framework to use
      * @return the full classpath
      */
-    protected String getWorkerClassPath(String stormJar, List<String> dependencyLocations) {
+    protected String getWorkerClassPath(String stormJar, List<String> dependencyLocations, SimpleVersion topoVersion) {
         List<String> workercp = new ArrayList<>();
         workercp.addAll(asStringList(_topoConf.get(Config.TOPOLOGY_CLASSPATH_BEGINNING)));
-        workercp.addAll(frameworkClasspath());
+        workercp.addAll(frameworkClasspath(topoVersion));
         workercp.add(stormJar);
         workercp.addAll(dependencyLocations);
         workercp.addAll(asStringList(_topoConf.get(Config.TOPOLOGY_CLASSPATH)));
@@ -472,14 +501,16 @@ public class BasicContainer extends Container {
         return log4jConfigurationDir + Utils.FILE_PATH_SEPARATOR + "worker.xml";
     }
     
-    private static class DependencyLocations {
-        private List<String> _data = null;
+    private static class TopologyMetaData {
+        private boolean _dataCached = false;
+        private List<String> _depLocs = null;
+        private String _stormVersion = null;
         private final Map<String, Object> _conf;
         private final String _topologyId;
         private final AdvancedFSOps _ops;
         private final String _stormRoot;
         
-        public DependencyLocations(final Map<String, Object> conf, final String topologyId, final AdvancedFSOps ops, final String stormRoot) {
+        public TopologyMetaData(final Map<String, Object> conf, final String topologyId, final AdvancedFSOps ops, final String stormRoot) {
             _conf = conf;
             _topologyId = topologyId;
             _ops = ops;
@@ -488,16 +519,15 @@ public class BasicContainer extends Container {
         
         public String toString() {
             List<String> data;
+            String stormVersion;
             synchronized(this) {
-                data = _data;
+                data = _depLocs;
+                stormVersion = _stormVersion;
             }
-            return "DEP_LOCS for " + _topologyId +" => " + data;
+            return "META for " + _topologyId +" DEP_LOCS => " + data + " STORM_VERSION => " + stormVersion;
         }
         
-        public synchronized List<String> get() throws IOException {
-            if (_data != null) {
-                return _data;
-            }
+        private synchronized void readData() throws IOException {
             final StormTopology stormTopology = ConfigUtils.readSupervisorTopology(_conf, _topologyId, _ops);
             final List<String> dependencyLocations = new ArrayList<>();
             if (stormTopology.get_dependency_jars() != null) {
@@ -511,27 +541,42 @@ public class BasicContainer extends Container {
                     dependencyLocations.add(new File(_stormRoot, dependency).getAbsolutePath());
                 }
             }
-            _data = dependencyLocations;
-            return _data;
+            _depLocs = dependencyLocations;
+            _stormVersion = stormTopology.get_storm_version();
+            _dataCached = true;
+        }
+        
+        public synchronized List<String> getDepLocs() throws IOException {
+            if (!_dataCached) {
+                readData();
+            }
+            return _depLocs;
+        }
+
+        public synchronized String getStormVersion() throws IOException {
+            if (!_dataCached) {
+                readData();
+            }
+            return _stormVersion;
         }
     }
 
-    static class DepLRUCache {
+    static class TopoMetaLRUCache {
         public final int _maxSize = 100; //We could make this configurable in the future...
         
         @SuppressWarnings("serial")
-        private LinkedHashMap<String, DependencyLocations> _cache = new LinkedHashMap<String, DependencyLocations>() {
+        private LinkedHashMap<String, TopologyMetaData> _cache = new LinkedHashMap<String, TopologyMetaData>() {
             @Override
-            protected boolean removeEldestEntry(Map.Entry<String,DependencyLocations> eldest) {
+            protected boolean removeEldestEntry(Map.Entry<String,TopologyMetaData> eldest) {
                 return (size() > _maxSize);
             }
         };
         
-        public synchronized DependencyLocations get(final Map<String, Object> conf, final String topologyId, final AdvancedFSOps ops, String stormRoot) {
+        public synchronized TopologyMetaData get(final Map<String, Object> conf, final String topologyId, final AdvancedFSOps ops, String stormRoot) {
             //Only go off of the topology id for now.
-            DependencyLocations dl = _cache.get(topologyId);
+            TopologyMetaData dl = _cache.get(topologyId);
             if (dl == null) {
-                _cache.putIfAbsent(topologyId, new DependencyLocations(conf, topologyId, ops, stormRoot));
+                _cache.putIfAbsent(topologyId, new TopologyMetaData(conf, topologyId, ops, stormRoot));
                 dl = _cache.get(topologyId);
             }
             return dl;
@@ -542,10 +587,14 @@ public class BasicContainer extends Container {
         }
     }
     
-    static final DepLRUCache DEP_LOC_CACHE = new DepLRUCache();
+    static final TopoMetaLRUCache TOPO_META_CACHE = new TopoMetaLRUCache();
     
     public static List<String> getDependencyLocationsFor(final Map<String, Object> conf, final String topologyId, final AdvancedFSOps ops, String stormRoot) throws IOException {
-        return DEP_LOC_CACHE.get(conf, topologyId, ops, stormRoot).get();
+        return TOPO_META_CACHE.get(conf, topologyId, ops, stormRoot).getDepLocs();
+    }
+    
+    public static String getStormVersionFor(final Map<String, Object> conf, final String topologyId, final AdvancedFSOps ops, String stormRoot) throws IOException {
+        return TOPO_META_CACHE.get(conf, topologyId, ops, stormRoot).getStormVersion();
     }
     
     /**
@@ -555,10 +604,10 @@ public class BasicContainer extends Container {
      * @return the classpath for the topology as command line arguments.
      * @throws IOException on any error.
      */
-    private List<String> getClassPathParams(final String stormRoot) throws IOException {
+    private List<String> getClassPathParams(final String stormRoot, final SimpleVersion topoVersion) throws IOException {
         final String stormJar = ConfigUtils.supervisorStormJarPath(stormRoot);
         final List<String> dependencyLocations = getDependencyLocationsFor(_conf, _topologyId, _ops, stormRoot);
-        final String workerClassPath = getWorkerClassPath(stormJar, dependencyLocations);
+        final String workerClassPath = getWorkerClassPath(stormJar, dependencyLocations, topoVersion);
         
         List<String> classPathParams = new ArrayList<>();
         classPathParams.add("-cp");
@@ -636,17 +685,25 @@ public class BasicContainer extends Container {
         final String stormOptions = ConfigUtils.concatIfNotNull(System.getProperty("storm.options"));
         final String topoConfFile = ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file"));
         final String workerTmpDir = ConfigUtils.workerTmpRoot(_conf, _workerId);
+        String topoVersionString = getStormVersionFor(_conf, _topologyId, _ops, stormRoot);
+        if (topoVersionString == null) {
+            topoVersionString = (String)_conf.getOrDefault(Config.SUPERVISOR_WORKER_DEFAULT_VERSION, VersionInfo.getVersion());
+        }
+        final SimpleVersion topoVersion = new SimpleVersion(topoVersionString);
         
-        List<String> classPathParams = getClassPathParams(stormRoot);
+        List<String> classPathParams = getClassPathParams(stormRoot, topoVersion);
         List<String> commonParams = getCommonParams();
         
         List<String> commandList = new ArrayList<>();
-        //Log Writer Command...
-        commandList.add(javaCmd);
-        commandList.addAll(classPathParams);
-        commandList.addAll(substituteChildopts(_topoConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS)));
-        commandList.addAll(commonParams);
-        commandList.add("org.apache.storm.LogWriter"); //The LogWriter in turn launches the actual worker.
+        String logWriter = getWorkerLogWriter(topoVersion);
+        if (logWriter != null) {
+            //Log Writer Command...
+            commandList.add(javaCmd);
+            commandList.addAll(classPathParams);
+            commandList.addAll(substituteChildopts(_topoConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS)));
+            commandList.addAll(commonParams);
+            commandList.add(logWriter); //The LogWriter in turn launches the actual worker.
+        }
 
         //Worker Command...
         commandList.add(javaCmd);
@@ -663,7 +720,7 @@ public class BasicContainer extends Container {
         commandList.add("-Dstorm.options=" + stormOptions);
         commandList.add("-Djava.io.tmpdir=" + workerTmpDir);
         commandList.addAll(classPathParams);
-        commandList.add("org.apache.storm.daemon.worker.Worker");
+        commandList.add(getWorkerMain(topoVersion));
         commandList.add(_topologyId);
         commandList.add(_supervisorId);
         commandList.add(String.valueOf(_port));

http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
index 51cca0e..dd267e3 100644
--- a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
+++ b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
@@ -37,6 +37,7 @@ import org.apache.storm.generated.LocalAssignment;
 import org.apache.storm.generated.ProfileAction;
 import org.apache.storm.generated.ProfileRequest;
 import org.apache.storm.generated.StormTopology;
+import org.apache.storm.utils.SimpleVersion;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.LocalState;
 import org.junit.Test;
@@ -102,7 +103,7 @@ public class BasicContainerTest {
         }
         
         @Override
-        protected List<String> frameworkClasspath() {
+        protected List<String> frameworkClasspath(SimpleVersion version) {
             //We are not really running anything so make this
             // simple to check for
             return Arrays.asList("FRAMEWORK_CP");


[4/4] storm git commit: Added STORM-2448 to Changelog

Posted by bo...@apache.org.
Added STORM-2448 to Changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9e31509d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9e31509d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9e31509d

Branch: refs/heads/master
Commit: 9e31509d47c4e91c1009f55c7ccf321d7d7e63aa
Parents: e50c907
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Mon May 22 16:40:16 2017 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Mon May 22 16:40:16 2017 -0500

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/9e31509d/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index f3fba33..7fed540 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 2.0.0
+ * STORM-2448: Add in Storm and JDK versions when submitting a topology.
  * STORM-2503: Fix lgtm.com alerts on equality and comparison operations.
  * STORM-2499: Add Serialization plugin for EventHub System Properties
  * STORM-2520: AutoHDFS should prefer cluster-wise hdfs kerberos principal


[3/4] storm git commit: Merge branch 'STORM-2448' of https://github.com/revans2/incubator-storm into STORM-2448

Posted by bo...@apache.org.
Merge branch 'STORM-2448' of https://github.com/revans2/incubator-storm into STORM-2448

STORM-2448: Add in Storm and JDK versions when submitting a topology


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e50c907e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e50c907e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e50c907e

Branch: refs/heads/master
Commit: e50c907e3ce17d9a8c71d43104ae36245d715ba4
Parents: d7fb0fb 22d1fe3
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Mon May 22 16:20:02 2017 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Mon May 22 16:20:02 2017 -0500

----------------------------------------------------------------------
 .../src/jvm/org/apache/storm/Config.java        |  45 ++++
 .../jvm/org/apache/storm/StormSubmitter.java    |   2 +-
 .../org/apache/storm/daemon/StormCommon.java    |  75 +++----
 .../apache/storm/generated/StormTopology.java   | 220 ++++++++++++++++++-
 .../apache/storm/generated/TopologyInfo.java    | 146 ++++++++++--
 .../storm/generated/TopologyPageInfo.java       | 134 +++++++++--
 .../apache/storm/generated/TopologySummary.java | 146 ++++++++++--
 .../apache/storm/topology/TopologyBuilder.java  |   2 +-
 .../jvm/org/apache/storm/utils/LocalState.java  |   8 +-
 .../org/apache/storm/utils/SimpleVersion.java   |  88 ++++++++
 .../src/jvm/org/apache/storm/utils/Utils.java   | 210 +++++++++++++++---
 .../jvm/org/apache/storm/utils/VersionInfo.java |   6 +-
 storm-client/src/py/storm/ttypes.py             |  76 ++++++-
 storm-client/src/storm.thrift                   |   5 +
 storm-core/src/clj/org/apache/storm/ui/core.clj |   6 +-
 .../public/templates/index-page-template.html   |   6 +
 .../templates/topology-page-template.html       |   6 +
 .../test/clj/org/apache/storm/nimbus_test.clj   |  19 +-
 .../java/org/apache/storm/LocalCluster.java     |  14 +-
 .../org/apache/storm/daemon/nimbus/Nimbus.java  |  67 +++++-
 .../storm/daemon/supervisor/BasicContainer.java | 127 ++++++++---
 .../daemon/supervisor/BasicContainerTest.java   |   3 +-
 22 files changed, 1203 insertions(+), 208 deletions(-)
----------------------------------------------------------------------



[2/4] storm git commit: STORM-2448: Add in Storm and JDK versions when submitting a topology

Posted by bo...@apache.org.
STORM-2448:  Add in Storm and JDK versions when submitting a topology


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/22d1fe38
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/22d1fe38
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/22d1fe38

Branch: refs/heads/master
Commit: 22d1fe3881f0d62feddcf1cc7b8fcd3e9fed0360
Parents: d7fb0fb
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Apr 4 12:34:49 2017 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon May 22 16:00:03 2017 -0500

----------------------------------------------------------------------
 .../src/jvm/org/apache/storm/Config.java        |  45 ++++
 .../jvm/org/apache/storm/StormSubmitter.java    |   2 +-
 .../org/apache/storm/daemon/StormCommon.java    |  75 +++----
 .../apache/storm/generated/StormTopology.java   | 220 ++++++++++++++++++-
 .../apache/storm/generated/TopologyInfo.java    | 146 ++++++++++--
 .../storm/generated/TopologyPageInfo.java       | 134 +++++++++--
 .../apache/storm/generated/TopologySummary.java | 146 ++++++++++--
 .../apache/storm/topology/TopologyBuilder.java  |   2 +-
 .../jvm/org/apache/storm/utils/LocalState.java  |   8 +-
 .../org/apache/storm/utils/SimpleVersion.java   |  88 ++++++++
 .../src/jvm/org/apache/storm/utils/Utils.java   | 210 +++++++++++++++---
 .../jvm/org/apache/storm/utils/VersionInfo.java |   6 +-
 storm-client/src/py/storm/ttypes.py             |  76 ++++++-
 storm-client/src/storm.thrift                   |   5 +
 storm-core/src/clj/org/apache/storm/ui/core.clj |   6 +-
 .../public/templates/index-page-template.html   |   6 +
 .../templates/topology-page-template.html       |   6 +
 .../test/clj/org/apache/storm/nimbus_test.clj   |  19 +-
 .../java/org/apache/storm/LocalCluster.java     |  14 +-
 .../org/apache/storm/daemon/nimbus/Nimbus.java  |  67 +++++-
 .../storm/daemon/supervisor/BasicContainer.java | 127 ++++++++---
 .../daemon/supervisor/BasicContainerTest.java   |   3 +-
 22 files changed, 1203 insertions(+), 208 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-client/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index e85841c..b1f0381 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -1321,6 +1321,51 @@ public class Config extends HashMap<String, Object> {
     public static final String SUPERVISOR_WORKER_LAUNCHER = "supervisor.worker.launcher";
 
     /**
+     * Map a version of storm to a worker classpath that can be used to run it.
+     * This allows the supervisor to select an available version of storm that is compatible with what a
+     * topology was launched with.
+     *
+     * Only the major and minor version numbers are used, although this may change in the
+     * future.  The code will first try to find a version that is the same or higher than the requested version,
+     * but with the same major version number.  If it cannot it will fall back to using one with a lower
+     * minor version, but in some cases this might fail as some features may be missing.
+     * 
+     * Because of how this selection process works please don't include two releases
+     * with the same major and minor versions as it is undefined which will be selected.  Also it is good
+     * practice to just include one release for each major version you want to support unless the
+     * minor versions are truly not compatible with each other. This is to avoid
+     * maintenance and testing overhead.
+     *
+     * This config needs to be set on all supervisors and on nimbus.  In general this can be the output of
+     * calling storm classpath on the version you want and adding in an entry for the config directory for
+     * that release.  You should modify the storm.yaml of each of these versions to match the features
+     * and settings you want on the main version.
+     */
+    @isMapEntryType(keyType = String.class, valueType = String.class)
+    public static final String SUPERVISOR_WORKER_VERSION_CLASSPATH_MAP = "supervisor.worker.version.classpath.map";
+
+    /**
+     * Map a version of storm to a worker's main class.  In most cases storm should have correct defaults and
+     * just setting SUPERVISOR_WORKER_VERSION_CLASSPATH_MAP is enough.
+     */
+    @isMapEntryType(keyType = String.class, valueType = String.class)
+    public static final String SUPERVISOR_WORKER_VERSION_MAIN_MAP = "supervisor.worker.version.main.map";
+    
+    /**
+     * Map a version of storm to a worker's logwriter class. In most cases storm should have correct defaults and
+     * just setting SUPERVISOR_WORKER_VERSION_CLASSPATH_MAP is enough.
+     */
+    @isMapEntryType(keyType = String.class, valueType = String.class)
+    public static final String SUPERVISOR_WORKER_VERSION_LOGWRITER_MAP = "supervisor.worker.version.logwriter.map";
+
+    /**
+     * The version of storm to assume a topology should run as if not version is given by the client when
+     * submitting the topology.
+     */
+    @isString
+    public static final String SUPERVISOR_WORKER_DEFAULT_VERSION = "supervisor.worker.default.version";
+
+    /**
      * A directory on the local filesystem used by Storm for any local
      * filesystem usage it needs. The directory must exist and the Storm daemons must
      * have permission to read/write from this location.

http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/StormSubmitter.java b/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
index 423c679..788b1a4 100644
--- a/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
+++ b/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
@@ -322,7 +322,7 @@ public class StormSubmitter {
         try {
             String jar = submitJarAs(conf, System.getProperty("storm.jar"), progressListener, client);
             LOG.info("Submitting topology {} in distributed mode with conf {}", name, serConf);
-
+            Utils.addVersions(topology);
             if (opts != null) {
                 client.getClient().submitTopologyWithOpts(name, jar, serConf, topology, opts);
             } else {

http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java b/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
index 3b7167d..3f85a13 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
@@ -17,6 +17,16 @@
  */
 package org.apache.storm.daemon;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.storm.Config;
 import org.apache.storm.Constants;
@@ -42,6 +52,7 @@ import org.apache.storm.task.IBolt;
 import org.apache.storm.task.WorkerTopologyContext;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.ThriftTopologyUtils;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.ThriftTopologyUtils;
@@ -49,16 +60,6 @@ import org.json.simple.JSONValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-
 public class StormCommon {
     // A singleton instance allows us to mock delegated static methods in our
     // tests by subclassing.
@@ -105,32 +106,30 @@ public class StormCommon {
         }
     }
 
-    @SuppressWarnings("unchecked")
-    private static void validateIds(StormTopology topology) throws InvalidTopologyException {
-        List<String> componentIds = new ArrayList<>();
-
-        for (StormTopology._Fields field : Thrift.getTopologyFields()) {
-            if (!ThriftTopologyUtils.isWorkerHook(field) && !ThriftTopologyUtils.isDependencies(field)) {
-                Object value = topology.getFieldValue(field);
-                Map<String, Object> componentMap = (Map<String, Object>) value;
-                componentIds.addAll(componentMap.keySet());
-
-                for (String id : componentMap.keySet()) {
-                    if (Utils.isSystemId(id)) {
-                        throw new InvalidTopologyException(id + " is not a valid component id.");
-                    }
-                }
-                for (Object componentObj : componentMap.values()) {
-                    ComponentCommon common = getComponentCommon(componentObj);
-                    Set<String> streamIds = common.get_streams().keySet();
-                    for (String id : streamIds) {
-                        if (Utils.isSystemId(id)) {
-                            throw new InvalidTopologyException(id + " is not a valid stream id.");
-                        }
-                    }
+    private static Set<String> validateIds(Map<String, ? extends Object> componentMap) throws InvalidTopologyException {
+        Set<String> keys = componentMap.keySet();
+        for (String id : keys) {
+            if (Utils.isSystemId(id)) {
+                throw new InvalidTopologyException(id + " is not a valid component id.");
+            }
+        }
+        for (Object componentObj : componentMap.values()) {
+            ComponentCommon common = getComponentCommon(componentObj);
+            Set<String> streamIds = common.get_streams().keySet();
+            for (String id : streamIds) {
+                if (Utils.isSystemId(id)) {
+                    throw new InvalidTopologyException(id + " is not a valid stream id.");
                 }
             }
         }
+        return keys;
+    }
+    
+    private static void validateIds(StormTopology topology) throws InvalidTopologyException {
+        List<String> componentIds = new ArrayList<>();
+        componentIds.addAll(validateIds(topology.get_bolts()));
+        componentIds.addAll(validateIds(topology.get_spouts()));
+        componentIds.addAll(validateIds(topology.get_state_spouts()));
 
         List<String> offending = Utils.getRepeat(componentIds);
         if (!offending.isEmpty()) {
@@ -146,15 +145,11 @@ public class StormCommon {
         }
     }
 
-    @SuppressWarnings("unchecked")
     public static Map<String, Object> allComponents(StormTopology topology) {
         Map<String, Object> components = new HashMap<>();
-        List<StormTopology._Fields> topologyFields = Arrays.asList(Thrift.getTopologyFields());
-        for (StormTopology._Fields field : topologyFields) {
-            if (!ThriftTopologyUtils.isWorkerHook(field) && !ThriftTopologyUtils.isDependencies(field)) {
-                components.putAll(((Map) topology.getFieldValue(field)));
-            }
-        }
+        components.putAll(topology.get_bolts());
+        components.putAll(topology.get_spouts());
+        components.putAll(topology.get_state_spouts());
         return components;
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-client/src/jvm/org/apache/storm/generated/StormTopology.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/generated/StormTopology.java b/storm-client/src/jvm/org/apache/storm/generated/StormTopology.java
index caec6c6..6241d7b 100644
--- a/storm-client/src/jvm/org/apache/storm/generated/StormTopology.java
+++ b/storm-client/src/jvm/org/apache/storm/generated/StormTopology.java
@@ -61,6 +61,8 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
   private static final org.apache.thrift.protocol.TField WORKER_HOOKS_FIELD_DESC = new org.apache.thrift.protocol.TField("worker_hooks", org.apache.thrift.protocol.TType.LIST, (short)4);
   private static final org.apache.thrift.protocol.TField DEPENDENCY_JARS_FIELD_DESC = new org.apache.thrift.protocol.TField("dependency_jars", org.apache.thrift.protocol.TType.LIST, (short)5);
   private static final org.apache.thrift.protocol.TField DEPENDENCY_ARTIFACTS_FIELD_DESC = new org.apache.thrift.protocol.TField("dependency_artifacts", org.apache.thrift.protocol.TType.LIST, (short)6);
+  private static final org.apache.thrift.protocol.TField STORM_VERSION_FIELD_DESC = new org.apache.thrift.protocol.TField("storm_version", org.apache.thrift.protocol.TType.STRING, (short)7);
+  private static final org.apache.thrift.protocol.TField JDK_VERSION_FIELD_DESC = new org.apache.thrift.protocol.TField("jdk_version", org.apache.thrift.protocol.TType.STRING, (short)8);
 
   private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
   static {
@@ -74,6 +76,8 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
   private List<ByteBuffer> worker_hooks; // optional
   private List<String> dependency_jars; // optional
   private List<String> dependency_artifacts; // optional
+  private String storm_version; // optional
+  private String jdk_version; // optional
 
   /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -82,7 +86,9 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
     STATE_SPOUTS((short)3, "state_spouts"),
     WORKER_HOOKS((short)4, "worker_hooks"),
     DEPENDENCY_JARS((short)5, "dependency_jars"),
-    DEPENDENCY_ARTIFACTS((short)6, "dependency_artifacts");
+    DEPENDENCY_ARTIFACTS((short)6, "dependency_artifacts"),
+    STORM_VERSION((short)7, "storm_version"),
+    JDK_VERSION((short)8, "jdk_version");
 
     private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
 
@@ -109,6 +115,10 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
           return DEPENDENCY_JARS;
         case 6: // DEPENDENCY_ARTIFACTS
           return DEPENDENCY_ARTIFACTS;
+        case 7: // STORM_VERSION
+          return STORM_VERSION;
+        case 8: // JDK_VERSION
+          return JDK_VERSION;
         default:
           return null;
       }
@@ -149,7 +159,7 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
   }
 
   // isset id assignments
-  private static final _Fields optionals[] = {_Fields.WORKER_HOOKS,_Fields.DEPENDENCY_JARS,_Fields.DEPENDENCY_ARTIFACTS};
+  private static final _Fields optionals[] = {_Fields.WORKER_HOOKS,_Fields.DEPENDENCY_JARS,_Fields.DEPENDENCY_ARTIFACTS,_Fields.STORM_VERSION,_Fields.JDK_VERSION};
   public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
   static {
     Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -174,6 +184,10 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
     tmpMap.put(_Fields.DEPENDENCY_ARTIFACTS, new org.apache.thrift.meta_data.FieldMetaData("dependency_artifacts", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
         new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST, 
             new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))));
+    tmpMap.put(_Fields.STORM_VERSION, new org.apache.thrift.meta_data.FieldMetaData("storm_version", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.JDK_VERSION, new org.apache.thrift.meta_data.FieldMetaData("jdk_version", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
     metaDataMap = Collections.unmodifiableMap(tmpMap);
     org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(StormTopology.class, metaDataMap);
   }
@@ -253,6 +267,12 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
       List<String> __this__dependency_artifacts = new ArrayList<String>(other.dependency_artifacts);
       this.dependency_artifacts = __this__dependency_artifacts;
     }
+    if (other.is_set_storm_version()) {
+      this.storm_version = other.storm_version;
+    }
+    if (other.is_set_jdk_version()) {
+      this.jdk_version = other.jdk_version;
+    }
   }
 
   public StormTopology deepCopy() {
@@ -267,6 +287,8 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
     this.worker_hooks = null;
     this.dependency_jars = null;
     this.dependency_artifacts = null;
+    this.storm_version = null;
+    this.jdk_version = null;
   }
 
   public int get_spouts_size() {
@@ -485,6 +507,52 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
     }
   }
 
+  public String get_storm_version() {
+    return this.storm_version;
+  }
+
+  public void set_storm_version(String storm_version) {
+    this.storm_version = storm_version;
+  }
+
+  public void unset_storm_version() {
+    this.storm_version = null;
+  }
+
+  /** Returns true if field storm_version is set (has been assigned a value) and false otherwise */
+  public boolean is_set_storm_version() {
+    return this.storm_version != null;
+  }
+
+  public void set_storm_version_isSet(boolean value) {
+    if (!value) {
+      this.storm_version = null;
+    }
+  }
+
+  public String get_jdk_version() {
+    return this.jdk_version;
+  }
+
+  public void set_jdk_version(String jdk_version) {
+    this.jdk_version = jdk_version;
+  }
+
+  public void unset_jdk_version() {
+    this.jdk_version = null;
+  }
+
+  /** Returns true if field jdk_version is set (has been assigned a value) and false otherwise */
+  public boolean is_set_jdk_version() {
+    return this.jdk_version != null;
+  }
+
+  public void set_jdk_version_isSet(boolean value) {
+    if (!value) {
+      this.jdk_version = null;
+    }
+  }
+
   public void setFieldValue(_Fields field, Object value) {
     switch (field) {
     case SPOUTS:
@@ -535,6 +603,22 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
       }
       break;
 
+    case STORM_VERSION:
+      if (value == null) {
+        unset_storm_version();
+      } else {
+        set_storm_version((String)value);
+      }
+      break;
+
+    case JDK_VERSION:
+      if (value == null) {
+        unset_jdk_version();
+      } else {
+        set_jdk_version((String)value);
+      }
+      break;
+
     }
   }
 
@@ -558,6 +642,12 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
     case DEPENDENCY_ARTIFACTS:
       return get_dependency_artifacts();
 
+    case STORM_VERSION:
+      return get_storm_version();
+
+    case JDK_VERSION:
+      return get_jdk_version();
+
     }
     throw new IllegalStateException();
   }
@@ -581,6 +671,10 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
       return is_set_dependency_jars();
     case DEPENDENCY_ARTIFACTS:
       return is_set_dependency_artifacts();
+    case STORM_VERSION:
+      return is_set_storm_version();
+    case JDK_VERSION:
+      return is_set_jdk_version();
     }
     throw new IllegalStateException();
   }
@@ -652,6 +746,24 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
         return false;
     }
 
+    boolean this_present_storm_version = true && this.is_set_storm_version();
+    boolean that_present_storm_version = true && that.is_set_storm_version();
+    if (this_present_storm_version || that_present_storm_version) {
+      if (!(this_present_storm_version && that_present_storm_version))
+        return false;
+      if (!this.storm_version.equals(that.storm_version))
+        return false;
+    }
+
+    boolean this_present_jdk_version = true && this.is_set_jdk_version();
+    boolean that_present_jdk_version = true && that.is_set_jdk_version();
+    if (this_present_jdk_version || that_present_jdk_version) {
+      if (!(this_present_jdk_version && that_present_jdk_version))
+        return false;
+      if (!this.jdk_version.equals(that.jdk_version))
+        return false;
+    }
+
     return true;
   }
 
@@ -689,6 +801,16 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
     if (present_dependency_artifacts)
       list.add(dependency_artifacts);
 
+    boolean present_storm_version = true && (is_set_storm_version());
+    list.add(present_storm_version);
+    if (present_storm_version)
+      list.add(storm_version);
+
+    boolean present_jdk_version = true && (is_set_jdk_version());
+    list.add(present_jdk_version);
+    if (present_jdk_version)
+      list.add(jdk_version);
+
     return list.hashCode();
   }
 
@@ -760,6 +882,26 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
         return lastComparison;
       }
     }
+    lastComparison = Boolean.valueOf(is_set_storm_version()).compareTo(other.is_set_storm_version());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_storm_version()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.storm_version, other.storm_version);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(is_set_jdk_version()).compareTo(other.is_set_jdk_version());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_jdk_version()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.jdk_version, other.jdk_version);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     return 0;
   }
 
@@ -833,6 +975,26 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
       }
       first = false;
     }
+    if (is_set_storm_version()) {
+      if (!first) sb.append(", ");
+      sb.append("storm_version:");
+      if (this.storm_version == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.storm_version);
+      }
+      first = false;
+    }
+    if (is_set_jdk_version()) {
+      if (!first) sb.append(", ");
+      sb.append("jdk_version:");
+      if (this.jdk_version == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.jdk_version);
+      }
+      first = false;
+    }
     sb.append(")");
     return sb.toString();
   }
@@ -1005,6 +1167,22 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
             }
             break;
+          case 7: // STORM_VERSION
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.storm_version = iprot.readString();
+              struct.set_storm_version_isSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 8: // JDK_VERSION
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.jdk_version = iprot.readString();
+              struct.set_jdk_version_isSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
           default:
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
         }
@@ -1099,6 +1277,20 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
           oprot.writeFieldEnd();
         }
       }
+      if (struct.storm_version != null) {
+        if (struct.is_set_storm_version()) {
+          oprot.writeFieldBegin(STORM_VERSION_FIELD_DESC);
+          oprot.writeString(struct.storm_version);
+          oprot.writeFieldEnd();
+        }
+      }
+      if (struct.jdk_version != null) {
+        if (struct.is_set_jdk_version()) {
+          oprot.writeFieldBegin(JDK_VERSION_FIELD_DESC);
+          oprot.writeString(struct.jdk_version);
+          oprot.writeFieldEnd();
+        }
+      }
       oprot.writeFieldStop();
       oprot.writeStructEnd();
     }
@@ -1150,7 +1342,13 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
       if (struct.is_set_dependency_artifacts()) {
         optionals.set(2);
       }
-      oprot.writeBitSet(optionals, 3);
+      if (struct.is_set_storm_version()) {
+        optionals.set(3);
+      }
+      if (struct.is_set_jdk_version()) {
+        optionals.set(4);
+      }
+      oprot.writeBitSet(optionals, 5);
       if (struct.is_set_worker_hooks()) {
         {
           oprot.writeI32(struct.worker_hooks.size());
@@ -1178,6 +1376,12 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
           }
         }
       }
+      if (struct.is_set_storm_version()) {
+        oprot.writeString(struct.storm_version);
+      }
+      if (struct.is_set_jdk_version()) {
+        oprot.writeString(struct.jdk_version);
+      }
     }
 
     @Override
@@ -1225,7 +1429,7 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
         }
       }
       struct.set_state_spouts_isSet(true);
-      BitSet incoming = iprot.readBitSet(3);
+      BitSet incoming = iprot.readBitSet(5);
       if (incoming.get(0)) {
         {
           org.apache.thrift.protocol.TList _list89 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, iprot.readI32());
@@ -1265,6 +1469,14 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
         }
         struct.set_dependency_artifacts_isSet(true);
       }
+      if (incoming.get(3)) {
+        struct.storm_version = iprot.readString();
+        struct.set_storm_version_isSet(true);
+      }
+      if (incoming.get(4)) {
+        struct.jdk_version = iprot.readString();
+        struct.set_jdk_version_isSet(true);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-client/src/jvm/org/apache/storm/generated/TopologyInfo.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/generated/TopologyInfo.java b/storm-client/src/jvm/org/apache/storm/generated/TopologyInfo.java
index fa95be7..622bd81 100644
--- a/storm-client/src/jvm/org/apache/storm/generated/TopologyInfo.java
+++ b/storm-client/src/jvm/org/apache/storm/generated/TopologyInfo.java
@@ -62,6 +62,7 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
   private static final org.apache.thrift.protocol.TField STATUS_FIELD_DESC = new org.apache.thrift.protocol.TField("status", org.apache.thrift.protocol.TType.STRING, (short)5);
   private static final org.apache.thrift.protocol.TField ERRORS_FIELD_DESC = new org.apache.thrift.protocol.TField("errors", org.apache.thrift.protocol.TType.MAP, (short)6);
   private static final org.apache.thrift.protocol.TField COMPONENT_DEBUG_FIELD_DESC = new org.apache.thrift.protocol.TField("component_debug", org.apache.thrift.protocol.TType.MAP, (short)7);
+  private static final org.apache.thrift.protocol.TField STORM_VERSION_FIELD_DESC = new org.apache.thrift.protocol.TField("storm_version", org.apache.thrift.protocol.TType.STRING, (short)8);
   private static final org.apache.thrift.protocol.TField SCHED_STATUS_FIELD_DESC = new org.apache.thrift.protocol.TField("sched_status", org.apache.thrift.protocol.TType.STRING, (short)513);
   private static final org.apache.thrift.protocol.TField OWNER_FIELD_DESC = new org.apache.thrift.protocol.TField("owner", org.apache.thrift.protocol.TType.STRING, (short)514);
   private static final org.apache.thrift.protocol.TField REPLICATION_COUNT_FIELD_DESC = new org.apache.thrift.protocol.TField("replication_count", org.apache.thrift.protocol.TType.I32, (short)515);
@@ -85,6 +86,7 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
   private String status; // required
   private Map<String,List<ErrorInfo>> errors; // required
   private Map<String,DebugOptions> component_debug; // optional
+  private String storm_version; // optional
   private String sched_status; // optional
   private String owner; // optional
   private int replication_count; // optional
@@ -104,6 +106,7 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
     STATUS((short)5, "status"),
     ERRORS((short)6, "errors"),
     COMPONENT_DEBUG((short)7, "component_debug"),
+    STORM_VERSION((short)8, "storm_version"),
     SCHED_STATUS((short)513, "sched_status"),
     OWNER((short)514, "owner"),
     REPLICATION_COUNT((short)515, "replication_count"),
@@ -141,6 +144,8 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
           return ERRORS;
         case 7: // COMPONENT_DEBUG
           return COMPONENT_DEBUG;
+        case 8: // STORM_VERSION
+          return STORM_VERSION;
         case 513: // SCHED_STATUS
           return SCHED_STATUS;
         case 514: // OWNER
@@ -208,7 +213,7 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
   private static final int __ASSIGNED_MEMOFFHEAP_ISSET_ID = 6;
   private static final int __ASSIGNED_CPU_ISSET_ID = 7;
   private byte __isset_bitfield = 0;
-  private static final _Fields optionals[] = {_Fields.COMPONENT_DEBUG,_Fields.SCHED_STATUS,_Fields.OWNER,_Fields.REPLICATION_COUNT,_Fields.REQUESTED_MEMONHEAP,_Fields.REQUESTED_MEMOFFHEAP,_Fields.REQUESTED_CPU,_Fields.ASSIGNED_MEMONHEAP,_Fields.ASSIGNED_MEMOFFHEAP,_Fields.ASSIGNED_CPU};
+  private static final _Fields optionals[] = {_Fields.COMPONENT_DEBUG,_Fields.STORM_VERSION,_Fields.SCHED_STATUS,_Fields.OWNER,_Fields.REPLICATION_COUNT,_Fields.REQUESTED_MEMONHEAP,_Fields.REQUESTED_MEMOFFHEAP,_Fields.REQUESTED_CPU,_Fields.ASSIGNED_MEMONHEAP,_Fields.ASSIGNED_MEMOFFHEAP,_Fields.ASSIGNED_CPU};
   public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
   static {
     Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -232,6 +237,8 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
         new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP, 
             new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING), 
             new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, DebugOptions.class))));
+    tmpMap.put(_Fields.STORM_VERSION, new org.apache.thrift.meta_data.FieldMetaData("storm_version", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
     tmpMap.put(_Fields.SCHED_STATUS, new org.apache.thrift.meta_data.FieldMetaData("sched_status", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
         new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
     tmpMap.put(_Fields.OWNER, new org.apache.thrift.meta_data.FieldMetaData("owner", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
@@ -330,6 +337,9 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
       }
       this.component_debug = __this__component_debug;
     }
+    if (other.is_set_storm_version()) {
+      this.storm_version = other.storm_version;
+    }
     if (other.is_set_sched_status()) {
       this.sched_status = other.sched_status;
     }
@@ -359,6 +369,7 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
     this.status = null;
     this.errors = null;
     this.component_debug = null;
+    this.storm_version = null;
     this.sched_status = null;
     this.owner = null;
     set_replication_count_isSet(false);
@@ -574,6 +585,29 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
     }
   }
 
+  public String get_storm_version() {
+    return this.storm_version;
+  }
+
+  public void set_storm_version(String storm_version) {
+    this.storm_version = storm_version;
+  }
+
+  public void unset_storm_version() {
+    this.storm_version = null;
+  }
+
+  /** Returns true if field storm_version is set (has been assigned a value) and false otherwise */
+  public boolean is_set_storm_version() {
+    return this.storm_version != null;
+  }
+
+  public void set_storm_version_isSet(boolean value) {
+    if (!value) {
+      this.storm_version = null;
+    }
+  }
+
   public String get_sched_status() {
     return this.sched_status;
   }
@@ -832,6 +866,14 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
       }
       break;
 
+    case STORM_VERSION:
+      if (value == null) {
+        unset_storm_version();
+      } else {
+        set_storm_version((String)value);
+      }
+      break;
+
     case SCHED_STATUS:
       if (value == null) {
         unset_sched_status();
@@ -930,6 +972,9 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
     case COMPONENT_DEBUG:
       return get_component_debug();
 
+    case STORM_VERSION:
+      return get_storm_version();
+
     case SCHED_STATUS:
       return get_sched_status();
 
@@ -982,6 +1027,8 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
       return is_set_errors();
     case COMPONENT_DEBUG:
       return is_set_component_debug();
+    case STORM_VERSION:
+      return is_set_storm_version();
     case SCHED_STATUS:
       return is_set_sched_status();
     case OWNER:
@@ -1080,6 +1127,15 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
         return false;
     }
 
+    boolean this_present_storm_version = true && this.is_set_storm_version();
+    boolean that_present_storm_version = true && that.is_set_storm_version();
+    if (this_present_storm_version || that_present_storm_version) {
+      if (!(this_present_storm_version && that_present_storm_version))
+        return false;
+      if (!this.storm_version.equals(that.storm_version))
+        return false;
+    }
+
     boolean this_present_sched_status = true && this.is_set_sched_status();
     boolean that_present_sched_status = true && that.is_set_sched_status();
     if (this_present_sched_status || that_present_sched_status) {
@@ -1203,6 +1259,11 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
     if (present_component_debug)
       list.add(component_debug);
 
+    boolean present_storm_version = true && (is_set_storm_version());
+    list.add(present_storm_version);
+    if (present_storm_version)
+      list.add(storm_version);
+
     boolean present_sched_status = true && (is_set_sched_status());
     list.add(present_sched_status);
     if (present_sched_status)
@@ -1329,6 +1390,16 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
         return lastComparison;
       }
     }
+    lastComparison = Boolean.valueOf(is_set_storm_version()).compareTo(other.is_set_storm_version());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_storm_version()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.storm_version, other.storm_version);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     lastComparison = Boolean.valueOf(is_set_sched_status()).compareTo(other.is_set_sched_status());
     if (lastComparison != 0) {
       return lastComparison;
@@ -1492,6 +1563,16 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
       }
       first = false;
     }
+    if (is_set_storm_version()) {
+      if (!first) sb.append(", ");
+      sb.append("storm_version:");
+      if (this.storm_version == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.storm_version);
+      }
+      first = false;
+    }
     if (is_set_sched_status()) {
       if (!first) sb.append(", ");
       sb.append("sched_status:");
@@ -1726,6 +1807,14 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
             }
             break;
+          case 8: // STORM_VERSION
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.storm_version = iprot.readString();
+              struct.set_storm_version_isSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
           case 513: // SCHED_STATUS
             if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
               struct.sched_status = iprot.readString();
@@ -1876,6 +1965,13 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
           oprot.writeFieldEnd();
         }
       }
+      if (struct.storm_version != null) {
+        if (struct.is_set_storm_version()) {
+          oprot.writeFieldBegin(STORM_VERSION_FIELD_DESC);
+          oprot.writeString(struct.storm_version);
+          oprot.writeFieldEnd();
+        }
+      }
       if (struct.sched_status != null) {
         if (struct.is_set_sched_status()) {
           oprot.writeFieldBegin(SCHED_STATUS_FIELD_DESC);
@@ -1971,34 +2067,37 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
       if (struct.is_set_component_debug()) {
         optionals.set(0);
       }
-      if (struct.is_set_sched_status()) {
+      if (struct.is_set_storm_version()) {
         optionals.set(1);
       }
-      if (struct.is_set_owner()) {
+      if (struct.is_set_sched_status()) {
         optionals.set(2);
       }
-      if (struct.is_set_replication_count()) {
+      if (struct.is_set_owner()) {
         optionals.set(3);
       }
-      if (struct.is_set_requested_memonheap()) {
+      if (struct.is_set_replication_count()) {
         optionals.set(4);
       }
-      if (struct.is_set_requested_memoffheap()) {
+      if (struct.is_set_requested_memonheap()) {
         optionals.set(5);
       }
-      if (struct.is_set_requested_cpu()) {
+      if (struct.is_set_requested_memoffheap()) {
         optionals.set(6);
       }
-      if (struct.is_set_assigned_memonheap()) {
+      if (struct.is_set_requested_cpu()) {
         optionals.set(7);
       }
-      if (struct.is_set_assigned_memoffheap()) {
+      if (struct.is_set_assigned_memonheap()) {
         optionals.set(8);
       }
-      if (struct.is_set_assigned_cpu()) {
+      if (struct.is_set_assigned_memoffheap()) {
         optionals.set(9);
       }
-      oprot.writeBitSet(optionals, 10);
+      if (struct.is_set_assigned_cpu()) {
+        optionals.set(10);
+      }
+      oprot.writeBitSet(optionals, 11);
       if (struct.is_set_component_debug()) {
         {
           oprot.writeI32(struct.component_debug.size());
@@ -2009,6 +2108,9 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
           }
         }
       }
+      if (struct.is_set_storm_version()) {
+        oprot.writeString(struct.storm_version);
+      }
       if (struct.is_set_sched_status()) {
         oprot.writeString(struct.sched_status);
       }
@@ -2084,7 +2186,7 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
         }
       }
       struct.set_errors_isSet(true);
-      BitSet incoming = iprot.readBitSet(10);
+      BitSet incoming = iprot.readBitSet(11);
       if (incoming.get(0)) {
         {
           org.apache.thrift.protocol.TMap _map364 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
@@ -2102,38 +2204,42 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
         struct.set_component_debug_isSet(true);
       }
       if (incoming.get(1)) {
+        struct.storm_version = iprot.readString();
+        struct.set_storm_version_isSet(true);
+      }
+      if (incoming.get(2)) {
         struct.sched_status = iprot.readString();
         struct.set_sched_status_isSet(true);
       }
-      if (incoming.get(2)) {
+      if (incoming.get(3)) {
         struct.owner = iprot.readString();
         struct.set_owner_isSet(true);
       }
-      if (incoming.get(3)) {
+      if (incoming.get(4)) {
         struct.replication_count = iprot.readI32();
         struct.set_replication_count_isSet(true);
       }
-      if (incoming.get(4)) {
+      if (incoming.get(5)) {
         struct.requested_memonheap = iprot.readDouble();
         struct.set_requested_memonheap_isSet(true);
       }
-      if (incoming.get(5)) {
+      if (incoming.get(6)) {
         struct.requested_memoffheap = iprot.readDouble();
         struct.set_requested_memoffheap_isSet(true);
       }
-      if (incoming.get(6)) {
+      if (incoming.get(7)) {
         struct.requested_cpu = iprot.readDouble();
         struct.set_requested_cpu_isSet(true);
       }
-      if (incoming.get(7)) {
+      if (incoming.get(8)) {
         struct.assigned_memonheap = iprot.readDouble();
         struct.set_assigned_memonheap_isSet(true);
       }
-      if (incoming.get(8)) {
+      if (incoming.get(9)) {
         struct.assigned_memoffheap = iprot.readDouble();
         struct.set_assigned_memoffheap_isSet(true);
       }
-      if (incoming.get(9)) {
+      if (incoming.get(10)) {
         struct.assigned_cpu = iprot.readDouble();
         struct.set_assigned_cpu_isSet(true);
       }

http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-client/src/jvm/org/apache/storm/generated/TopologyPageInfo.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/generated/TopologyPageInfo.java b/storm-client/src/jvm/org/apache/storm/generated/TopologyPageInfo.java
index 25bda95..04c2232 100644
--- a/storm-client/src/jvm/org/apache/storm/generated/TopologyPageInfo.java
+++ b/storm-client/src/jvm/org/apache/storm/generated/TopologyPageInfo.java
@@ -71,6 +71,7 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
   private static final org.apache.thrift.protocol.TField DEBUG_OPTIONS_FIELD_DESC = new org.apache.thrift.protocol.TField("debug_options", org.apache.thrift.protocol.TType.STRUCT, (short)14);
   private static final org.apache.thrift.protocol.TField REPLICATION_COUNT_FIELD_DESC = new org.apache.thrift.protocol.TField("replication_count", org.apache.thrift.protocol.TType.I32, (short)15);
   private static final org.apache.thrift.protocol.TField WORKERS_FIELD_DESC = new org.apache.thrift.protocol.TField("workers", org.apache.thrift.protocol.TType.LIST, (short)16);
+  private static final org.apache.thrift.protocol.TField STORM_VERSION_FIELD_DESC = new org.apache.thrift.protocol.TField("storm_version", org.apache.thrift.protocol.TType.STRING, (short)17);
   private static final org.apache.thrift.protocol.TField REQUESTED_MEMONHEAP_FIELD_DESC = new org.apache.thrift.protocol.TField("requested_memonheap", org.apache.thrift.protocol.TType.DOUBLE, (short)521);
   private static final org.apache.thrift.protocol.TField REQUESTED_MEMOFFHEAP_FIELD_DESC = new org.apache.thrift.protocol.TField("requested_memoffheap", org.apache.thrift.protocol.TType.DOUBLE, (short)522);
   private static final org.apache.thrift.protocol.TField REQUESTED_CPU_FIELD_DESC = new org.apache.thrift.protocol.TField("requested_cpu", org.apache.thrift.protocol.TType.DOUBLE, (short)523);
@@ -100,6 +101,7 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
   private DebugOptions debug_options; // optional
   private int replication_count; // optional
   private List<WorkerSummary> workers; // optional
+  private String storm_version; // optional
   private double requested_memonheap; // optional
   private double requested_memoffheap; // optional
   private double requested_cpu; // optional
@@ -125,6 +127,7 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
     DEBUG_OPTIONS((short)14, "debug_options"),
     REPLICATION_COUNT((short)15, "replication_count"),
     WORKERS((short)16, "workers"),
+    STORM_VERSION((short)17, "storm_version"),
     REQUESTED_MEMONHEAP((short)521, "requested_memonheap"),
     REQUESTED_MEMOFFHEAP((short)522, "requested_memoffheap"),
     REQUESTED_CPU((short)523, "requested_cpu"),
@@ -177,6 +180,8 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
           return REPLICATION_COUNT;
         case 16: // WORKERS
           return WORKERS;
+        case 17: // STORM_VERSION
+          return STORM_VERSION;
         case 521: // REQUESTED_MEMONHEAP
           return REQUESTED_MEMONHEAP;
         case 522: // REQUESTED_MEMOFFHEAP
@@ -241,7 +246,7 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
   private static final int __ASSIGNED_MEMOFFHEAP_ISSET_ID = 9;
   private static final int __ASSIGNED_CPU_ISSET_ID = 10;
   private short __isset_bitfield = 0;
-  private static final _Fields optionals[] = {_Fields.NAME,_Fields.UPTIME_SECS,_Fields.STATUS,_Fields.NUM_TASKS,_Fields.NUM_WORKERS,_Fields.NUM_EXECUTORS,_Fields.TOPOLOGY_CONF,_Fields.ID_TO_SPOUT_AGG_STATS,_Fields.ID_TO_BOLT_AGG_STATS,_Fields.SCHED_STATUS,_Fields.TOPOLOGY_STATS,_Fields.OWNER,_Fields.DEBUG_OPTIONS,_Fields.REPLICATION_COUNT,_Fields.WORKERS,_Fields.REQUESTED_MEMONHEAP,_Fields.REQUESTED_MEMOFFHEAP,_Fields.REQUESTED_CPU,_Fields.ASSIGNED_MEMONHEAP,_Fields.ASSIGNED_MEMOFFHEAP,_Fields.ASSIGNED_CPU};
+  private static final _Fields optionals[] = {_Fields.NAME,_Fields.UPTIME_SECS,_Fields.STATUS,_Fields.NUM_TASKS,_Fields.NUM_WORKERS,_Fields.NUM_EXECUTORS,_Fields.TOPOLOGY_CONF,_Fields.ID_TO_SPOUT_AGG_STATS,_Fields.ID_TO_BOLT_AGG_STATS,_Fields.SCHED_STATUS,_Fields.TOPOLOGY_STATS,_Fields.OWNER,_Fields.DEBUG_OPTIONS,_Fields.REPLICATION_COUNT,_Fields.WORKERS,_Fields.STORM_VERSION,_Fields.REQUESTED_MEMONHEAP,_Fields.REQUESTED_MEMOFFHEAP,_Fields.REQUESTED_CPU,_Fields.ASSIGNED_MEMONHEAP,_Fields.ASSIGNED_MEMOFFHEAP,_Fields.ASSIGNED_CPU};
   public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
   static {
     Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -282,6 +287,8 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
     tmpMap.put(_Fields.WORKERS, new org.apache.thrift.meta_data.FieldMetaData("workers", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
         new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST, 
             new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, WorkerSummary.class))));
+    tmpMap.put(_Fields.STORM_VERSION, new org.apache.thrift.meta_data.FieldMetaData("storm_version", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
     tmpMap.put(_Fields.REQUESTED_MEMONHEAP, new org.apache.thrift.meta_data.FieldMetaData("requested_memonheap", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
         new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.DOUBLE)));
     tmpMap.put(_Fields.REQUESTED_MEMOFFHEAP, new org.apache.thrift.meta_data.FieldMetaData("requested_memoffheap", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
@@ -379,6 +386,9 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
       }
       this.workers = __this__workers;
     }
+    if (other.is_set_storm_version()) {
+      this.storm_version = other.storm_version;
+    }
     this.requested_memonheap = other.requested_memonheap;
     this.requested_memoffheap = other.requested_memoffheap;
     this.requested_cpu = other.requested_cpu;
@@ -414,6 +424,7 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
     set_replication_count_isSet(false);
     this.replication_count = 0;
     this.workers = null;
+    this.storm_version = null;
     set_requested_memonheap_isSet(false);
     this.requested_memonheap = 0.0;
     set_requested_memoffheap_isSet(false);
@@ -828,6 +839,29 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
     }
   }
 
+  public String get_storm_version() {
+    return this.storm_version;
+  }
+
+  public void set_storm_version(String storm_version) {
+    this.storm_version = storm_version;
+  }
+
+  public void unset_storm_version() {
+    this.storm_version = null;
+  }
+
+  /** Returns true if field storm_version is set (has been assigned a value) and false otherwise */
+  public boolean is_set_storm_version() {
+    return this.storm_version != null;
+  }
+
+  public void set_storm_version_isSet(boolean value) {
+    if (!value) {
+      this.storm_version = null;
+    }
+  }
+
   public double get_requested_memonheap() {
     return this.requested_memonheap;
   }
@@ -1090,6 +1124,14 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
       }
       break;
 
+    case STORM_VERSION:
+      if (value == null) {
+        unset_storm_version();
+      } else {
+        set_storm_version((String)value);
+      }
+      break;
+
     case REQUESTED_MEMONHEAP:
       if (value == null) {
         unset_requested_memonheap();
@@ -1191,6 +1233,9 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
     case WORKERS:
       return get_workers();
 
+    case STORM_VERSION:
+      return get_storm_version();
+
     case REQUESTED_MEMONHEAP:
       return get_requested_memonheap();
 
@@ -1252,6 +1297,8 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
       return is_set_replication_count();
     case WORKERS:
       return is_set_workers();
+    case STORM_VERSION:
+      return is_set_storm_version();
     case REQUESTED_MEMONHEAP:
       return is_set_requested_memonheap();
     case REQUESTED_MEMOFFHEAP:
@@ -1425,6 +1472,15 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
         return false;
     }
 
+    boolean this_present_storm_version = true && this.is_set_storm_version();
+    boolean that_present_storm_version = true && that.is_set_storm_version();
+    if (this_present_storm_version || that_present_storm_version) {
+      if (!(this_present_storm_version && that_present_storm_version))
+        return false;
+      if (!this.storm_version.equals(that.storm_version))
+        return false;
+    }
+
     boolean this_present_requested_memonheap = true && this.is_set_requested_memonheap();
     boolean that_present_requested_memonheap = true && that.is_set_requested_memonheap();
     if (this_present_requested_memonheap || that_present_requested_memonheap) {
@@ -1566,6 +1622,11 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
     if (present_workers)
       list.add(workers);
 
+    boolean present_storm_version = true && (is_set_storm_version());
+    list.add(present_storm_version);
+    if (present_storm_version)
+      list.add(storm_version);
+
     boolean present_requested_memonheap = true && (is_set_requested_memonheap());
     list.add(present_requested_memonheap);
     if (present_requested_memonheap)
@@ -1767,6 +1828,16 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
         return lastComparison;
       }
     }
+    lastComparison = Boolean.valueOf(is_set_storm_version()).compareTo(other.is_set_storm_version());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_storm_version()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.storm_version, other.storm_version);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     lastComparison = Boolean.valueOf(is_set_requested_memonheap()).compareTo(other.is_set_requested_memonheap());
     if (lastComparison != 0) {
       return lastComparison;
@@ -1984,6 +2055,16 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
       }
       first = false;
     }
+    if (is_set_storm_version()) {
+      if (!first) sb.append(", ");
+      sb.append("storm_version:");
+      if (this.storm_version == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.storm_version);
+      }
+      first = false;
+    }
     if (is_set_requested_memonheap()) {
       if (!first) sb.append(", ");
       sb.append("requested_memonheap:");
@@ -2242,6 +2323,14 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
             }
             break;
+          case 17: // STORM_VERSION
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.storm_version = iprot.readString();
+              struct.set_storm_version_isSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
           case 521: // REQUESTED_MEMONHEAP
             if (schemeField.type == org.apache.thrift.protocol.TType.DOUBLE) {
               struct.requested_memonheap = iprot.readDouble();
@@ -2426,6 +2515,13 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
           oprot.writeFieldEnd();
         }
       }
+      if (struct.storm_version != null) {
+        if (struct.is_set_storm_version()) {
+          oprot.writeFieldBegin(STORM_VERSION_FIELD_DESC);
+          oprot.writeString(struct.storm_version);
+          oprot.writeFieldEnd();
+        }
+      }
       if (struct.is_set_requested_memonheap()) {
         oprot.writeFieldBegin(REQUESTED_MEMONHEAP_FIELD_DESC);
         oprot.writeDouble(struct.requested_memonheap);
@@ -2520,25 +2616,28 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
       if (struct.is_set_workers()) {
         optionals.set(14);
       }
-      if (struct.is_set_requested_memonheap()) {
+      if (struct.is_set_storm_version()) {
         optionals.set(15);
       }
-      if (struct.is_set_requested_memoffheap()) {
+      if (struct.is_set_requested_memonheap()) {
         optionals.set(16);
       }
-      if (struct.is_set_requested_cpu()) {
+      if (struct.is_set_requested_memoffheap()) {
         optionals.set(17);
       }
-      if (struct.is_set_assigned_memonheap()) {
+      if (struct.is_set_requested_cpu()) {
         optionals.set(18);
       }
-      if (struct.is_set_assigned_memoffheap()) {
+      if (struct.is_set_assigned_memonheap()) {
         optionals.set(19);
       }
-      if (struct.is_set_assigned_cpu()) {
+      if (struct.is_set_assigned_memoffheap()) {
         optionals.set(20);
       }
-      oprot.writeBitSet(optionals, 21);
+      if (struct.is_set_assigned_cpu()) {
+        optionals.set(21);
+      }
+      oprot.writeBitSet(optionals, 22);
       if (struct.is_set_name()) {
         oprot.writeString(struct.name);
       }
@@ -2604,6 +2703,9 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
           }
         }
       }
+      if (struct.is_set_storm_version()) {
+        oprot.writeString(struct.storm_version);
+      }
       if (struct.is_set_requested_memonheap()) {
         oprot.writeDouble(struct.requested_memonheap);
       }
@@ -2629,7 +2731,7 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
       TTupleProtocol iprot = (TTupleProtocol) prot;
       struct.id = iprot.readString();
       struct.set_id_isSet(true);
-      BitSet incoming = iprot.readBitSet(21);
+      BitSet incoming = iprot.readBitSet(22);
       if (incoming.get(0)) {
         struct.name = iprot.readString();
         struct.set_name_isSet(true);
@@ -2727,26 +2829,30 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
         struct.set_workers_isSet(true);
       }
       if (incoming.get(15)) {
+        struct.storm_version = iprot.readString();
+        struct.set_storm_version_isSet(true);
+      }
+      if (incoming.get(16)) {
         struct.requested_memonheap = iprot.readDouble();
         struct.set_requested_memonheap_isSet(true);
       }
-      if (incoming.get(16)) {
+      if (incoming.get(17)) {
         struct.requested_memoffheap = iprot.readDouble();
         struct.set_requested_memoffheap_isSet(true);
       }
-      if (incoming.get(17)) {
+      if (incoming.get(18)) {
         struct.requested_cpu = iprot.readDouble();
         struct.set_requested_cpu_isSet(true);
       }
-      if (incoming.get(18)) {
+      if (incoming.get(19)) {
         struct.assigned_memonheap = iprot.readDouble();
         struct.set_assigned_memonheap_isSet(true);
       }
-      if (incoming.get(19)) {
+      if (incoming.get(20)) {
         struct.assigned_memoffheap = iprot.readDouble();
         struct.set_assigned_memoffheap_isSet(true);
       }
-      if (incoming.get(20)) {
+      if (incoming.get(21)) {
         struct.assigned_cpu = iprot.readDouble();
         struct.set_assigned_cpu_isSet(true);
       }

http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-client/src/jvm/org/apache/storm/generated/TopologySummary.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/generated/TopologySummary.java b/storm-client/src/jvm/org/apache/storm/generated/TopologySummary.java
index 8b7cd75..39547e4 100644
--- a/storm-client/src/jvm/org/apache/storm/generated/TopologySummary.java
+++ b/storm-client/src/jvm/org/apache/storm/generated/TopologySummary.java
@@ -62,6 +62,7 @@ public class TopologySummary implements org.apache.thrift.TBase<TopologySummary,
   private static final org.apache.thrift.protocol.TField NUM_WORKERS_FIELD_DESC = new org.apache.thrift.protocol.TField("num_workers", org.apache.thrift.protocol.TType.I32, (short)5);
   private static final org.apache.thrift.protocol.TField UPTIME_SECS_FIELD_DESC = new org.apache.thrift.protocol.TField("uptime_secs", org.apache.thrift.protocol.TType.I32, (short)6);
   private static final org.apache.thrift.protocol.TField STATUS_FIELD_DESC = new org.apache.thrift.protocol.TField("status", org.apache.thrift.protocol.TType.STRING, (short)7);
+  private static final org.apache.thrift.protocol.TField STORM_VERSION_FIELD_DESC = new org.apache.thrift.protocol.TField("storm_version", org.apache.thrift.protocol.TType.STRING, (short)8);
   private static final org.apache.thrift.protocol.TField SCHED_STATUS_FIELD_DESC = new org.apache.thrift.protocol.TField("sched_status", org.apache.thrift.protocol.TType.STRING, (short)513);
   private static final org.apache.thrift.protocol.TField OWNER_FIELD_DESC = new org.apache.thrift.protocol.TField("owner", org.apache.thrift.protocol.TType.STRING, (short)514);
   private static final org.apache.thrift.protocol.TField REPLICATION_COUNT_FIELD_DESC = new org.apache.thrift.protocol.TField("replication_count", org.apache.thrift.protocol.TType.I32, (short)515);
@@ -85,6 +86,7 @@ public class TopologySummary implements org.apache.thrift.TBase<TopologySummary,
   private int num_workers; // required
   private int uptime_secs; // required
   private String status; // required
+  private String storm_version; // optional
   private String sched_status; // optional
   private String owner; // optional
   private int replication_count; // optional
@@ -104,6 +106,7 @@ public class TopologySummary implements org.apache.thrift.TBase<TopologySummary,
     NUM_WORKERS((short)5, "num_workers"),
     UPTIME_SECS((short)6, "uptime_secs"),
     STATUS((short)7, "status"),
+    STORM_VERSION((short)8, "storm_version"),
     SCHED_STATUS((short)513, "sched_status"),
     OWNER((short)514, "owner"),
     REPLICATION_COUNT((short)515, "replication_count"),
@@ -141,6 +144,8 @@ public class TopologySummary implements org.apache.thrift.TBase<TopologySummary,
           return UPTIME_SECS;
         case 7: // STATUS
           return STATUS;
+        case 8: // STORM_VERSION
+          return STORM_VERSION;
         case 513: // SCHED_STATUS
           return SCHED_STATUS;
         case 514: // OWNER
@@ -211,7 +216,7 @@ public class TopologySummary implements org.apache.thrift.TBase<TopologySummary,
   private static final int __ASSIGNED_MEMOFFHEAP_ISSET_ID = 9;
   private static final int __ASSIGNED_CPU_ISSET_ID = 10;
   private short __isset_bitfield = 0;
-  private static final _Fields optionals[] = {_Fields.SCHED_STATUS,_Fields.OWNER,_Fields.REPLICATION_COUNT,_Fields.REQUESTED_MEMONHEAP,_Fields.REQUESTED_MEMOFFHEAP,_Fields.REQUESTED_CPU,_Fields.ASSIGNED_MEMONHEAP,_Fields.ASSIGNED_MEMOFFHEAP,_Fields.ASSIGNED_CPU};
+  private static final _Fields optionals[] = {_Fields.STORM_VERSION,_Fields.SCHED_STATUS,_Fields.OWNER,_Fields.REPLICATION_COUNT,_Fields.REQUESTED_MEMONHEAP,_Fields.REQUESTED_MEMOFFHEAP,_Fields.REQUESTED_CPU,_Fields.ASSIGNED_MEMONHEAP,_Fields.ASSIGNED_MEMOFFHEAP,_Fields.ASSIGNED_CPU};
   public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
   static {
     Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -229,6 +234,8 @@ public class TopologySummary implements org.apache.thrift.TBase<TopologySummary,
         new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
     tmpMap.put(_Fields.STATUS, new org.apache.thrift.meta_data.FieldMetaData("status", org.apache.thrift.TFieldRequirementType.REQUIRED, 
         new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.STORM_VERSION, new org.apache.thrift.meta_data.FieldMetaData("storm_version", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
     tmpMap.put(_Fields.SCHED_STATUS, new org.apache.thrift.meta_data.FieldMetaData("sched_status", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
         new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
     tmpMap.put(_Fields.OWNER, new org.apache.thrift.meta_data.FieldMetaData("owner", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
@@ -295,6 +302,9 @@ public class TopologySummary implements org.apache.thrift.TBase<TopologySummary,
     if (other.is_set_status()) {
       this.status = other.status;
     }
+    if (other.is_set_storm_version()) {
+      this.storm_version = other.storm_version;
+    }
     if (other.is_set_sched_status()) {
       this.sched_status = other.sched_status;
     }
@@ -327,6 +337,7 @@ public class TopologySummary implements org.apache.thrift.TBase<TopologySummary,
     set_uptime_secs_isSet(false);
     this.uptime_secs = 0;
     this.status = null;
+    this.storm_version = null;
     this.sched_status = null;
     this.owner = null;
     set_replication_count_isSet(false);
@@ -502,6 +513,29 @@ public class TopologySummary implements org.apache.thrift.TBase<TopologySummary,
     }
   }
 
+  public String get_storm_version() {
+    return this.storm_version;
+  }
+
+  public void set_storm_version(String storm_version) {
+    this.storm_version = storm_version;
+  }
+
+  public void unset_storm_version() {
+    this.storm_version = null;
+  }
+
+  /** Returns true if field storm_version is set (has been assigned a value) and false otherwise */
+  public boolean is_set_storm_version() {
+    return this.storm_version != null;
+  }
+
+  public void set_storm_version_isSet(boolean value) {
+    if (!value) {
+      this.storm_version = null;
+    }
+  }
+
   public String get_sched_status() {
     return this.sched_status;
   }
@@ -760,6 +794,14 @@ public class TopologySummary implements org.apache.thrift.TBase<TopologySummary,
       }
       break;
 
+    case STORM_VERSION:
+      if (value == null) {
+        unset_storm_version();
+      } else {
+        set_storm_version((String)value);
+      }
+      break;
+
     case SCHED_STATUS:
       if (value == null) {
         unset_sched_status();
@@ -858,6 +900,9 @@ public class TopologySummary implements org.apache.thrift.TBase<TopologySummary,
     case STATUS:
       return get_status();
 
+    case STORM_VERSION:
+      return get_storm_version();
+
     case SCHED_STATUS:
       return get_sched_status();
 
@@ -910,6 +955,8 @@ public class TopologySummary implements org.apache.thrift.TBase<TopologySummary,
       return is_set_uptime_secs();
     case STATUS:
       return is_set_status();
+    case STORM_VERSION:
+      return is_set_storm_version();
     case SCHED_STATUS:
       return is_set_sched_status();
     case OWNER:
@@ -1008,6 +1055,15 @@ public class TopologySummary implements org.apache.thrift.TBase<TopologySummary,
         return false;
     }
 
+    boolean this_present_storm_version = true && this.is_set_storm_version();
+    boolean that_present_storm_version = true && that.is_set_storm_version();
+    if (this_present_storm_version || that_present_storm_version) {
+      if (!(this_present_storm_version && that_present_storm_version))
+        return false;
+      if (!this.storm_version.equals(that.storm_version))
+        return false;
+    }
+
     boolean this_present_sched_status = true && this.is_set_sched_status();
     boolean that_present_sched_status = true && that.is_set_sched_status();
     if (this_present_sched_status || that_present_sched_status) {
@@ -1131,6 +1187,11 @@ public class TopologySummary implements org.apache.thrift.TBase<TopologySummary,
     if (present_status)
       list.add(status);
 
+    boolean present_storm_version = true && (is_set_storm_version());
+    list.add(present_storm_version);
+    if (present_storm_version)
+      list.add(storm_version);
+
     boolean present_sched_status = true && (is_set_sched_status());
     list.add(present_sched_status);
     if (present_sched_status)
@@ -1257,6 +1318,16 @@ public class TopologySummary implements org.apache.thrift.TBase<TopologySummary,
         return lastComparison;
       }
     }
+    lastComparison = Boolean.valueOf(is_set_storm_version()).compareTo(other.is_set_storm_version());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_storm_version()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.storm_version, other.storm_version);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     lastComparison = Boolean.valueOf(is_set_sched_status()).compareTo(other.is_set_sched_status());
     if (lastComparison != 0) {
       return lastComparison;
@@ -1406,6 +1477,16 @@ public class TopologySummary implements org.apache.thrift.TBase<TopologySummary,
       sb.append(this.status);
     }
     first = false;
+    if (is_set_storm_version()) {
+      if (!first) sb.append(", ");
+      sb.append("storm_version:");
+      if (this.storm_version == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.storm_version);
+      }
+      first = false;
+    }
     if (is_set_sched_status()) {
       if (!first) sb.append(", ");
       sb.append("sched_status:");
@@ -1597,6 +1678,14 @@ public class TopologySummary implements org.apache.thrift.TBase<TopologySummary,
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
             }
             break;
+          case 8: // STORM_VERSION
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.storm_version = iprot.readString();
+              struct.set_storm_version_isSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
           case 513: // SCHED_STATUS
             if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
               struct.sched_status = iprot.readString();
@@ -1709,6 +1798,13 @@ public class TopologySummary implements org.apache.thrift.TBase<TopologySummary,
         oprot.writeString(struct.status);
         oprot.writeFieldEnd();
       }
+      if (struct.storm_version != null) {
+        if (struct.is_set_storm_version()) {
+          oprot.writeFieldBegin(STORM_VERSION_FIELD_DESC);
+          oprot.writeString(struct.storm_version);
+          oprot.writeFieldEnd();
+        }
+      }
       if (struct.sched_status != null) {
         if (struct.is_set_sched_status()) {
           oprot.writeFieldBegin(SCHED_STATUS_FIELD_DESC);
@@ -1783,34 +1879,40 @@ public class TopologySummary implements org.apache.thrift.TBase<TopologySummary,
       oprot.writeI32(struct.uptime_secs);
       oprot.writeString(struct.status);
       BitSet optionals = new BitSet();
-      if (struct.is_set_sched_status()) {
+      if (struct.is_set_storm_version()) {
         optionals.set(0);
       }
-      if (struct.is_set_owner()) {
+      if (struct.is_set_sched_status()) {
         optionals.set(1);
       }
-      if (struct.is_set_replication_count()) {
+      if (struct.is_set_owner()) {
         optionals.set(2);
       }
-      if (struct.is_set_requested_memonheap()) {
+      if (struct.is_set_replication_count()) {
         optionals.set(3);
       }
-      if (struct.is_set_requested_memoffheap()) {
+      if (struct.is_set_requested_memonheap()) {
         optionals.set(4);
       }
-      if (struct.is_set_requested_cpu()) {
+      if (struct.is_set_requested_memoffheap()) {
         optionals.set(5);
       }
-      if (struct.is_set_assigned_memonheap()) {
+      if (struct.is_set_requested_cpu()) {
         optionals.set(6);
       }
-      if (struct.is_set_assigned_memoffheap()) {
+      if (struct.is_set_assigned_memonheap()) {
         optionals.set(7);
       }
-      if (struct.is_set_assigned_cpu()) {
+      if (struct.is_set_assigned_memoffheap()) {
         optionals.set(8);
       }
-      oprot.writeBitSet(optionals, 9);
+      if (struct.is_set_assigned_cpu()) {
+        optionals.set(9);
+      }
+      oprot.writeBitSet(optionals, 10);
+      if (struct.is_set_storm_version()) {
+        oprot.writeString(struct.storm_version);
+      }
       if (struct.is_set_sched_status()) {
         oprot.writeString(struct.sched_status);
       }
@@ -1857,40 +1959,44 @@ public class TopologySummary implements org.apache.thrift.TBase<TopologySummary,
       struct.set_uptime_secs_isSet(true);
       struct.status = iprot.readString();
       struct.set_status_isSet(true);
-      BitSet incoming = iprot.readBitSet(9);
+      BitSet incoming = iprot.readBitSet(10);
       if (incoming.get(0)) {
+        struct.storm_version = iprot.readString();
+        struct.set_storm_version_isSet(true);
+      }
+      if (incoming.get(1)) {
         struct.sched_status = iprot.readString();
         struct.set_sched_status_isSet(true);
       }
-      if (incoming.get(1)) {
+      if (incoming.get(2)) {
         struct.owner = iprot.readString();
         struct.set_owner_isSet(true);
       }
-      if (incoming.get(2)) {
+      if (incoming.get(3)) {
         struct.replication_count = iprot.readI32();
         struct.set_replication_count_isSet(true);
       }
-      if (incoming.get(3)) {
+      if (incoming.get(4)) {
         struct.requested_memonheap = iprot.readDouble();
         struct.set_requested_memonheap_isSet(true);
       }
-      if (incoming.get(4)) {
+      if (incoming.get(5)) {
         struct.requested_memoffheap = iprot.readDouble();
         struct.set_requested_memoffheap_isSet(true);
       }
-      if (incoming.get(5)) {
+      if (incoming.get(6)) {
         struct.requested_cpu = iprot.readDouble();
         struct.set_requested_cpu_isSet(true);
       }
-      if (incoming.get(6)) {
+      if (incoming.get(7)) {
         struct.assigned_memonheap = iprot.readDouble();
         struct.set_assigned_memonheap_isSet(true);
       }
-      if (incoming.get(7)) {
+      if (incoming.get(8)) {
         struct.assigned_memoffheap = iprot.readDouble();
         struct.set_assigned_memoffheap_isSet(true);
       }
-      if (incoming.get(8)) {
+      if (incoming.get(9)) {
         struct.assigned_cpu = iprot.readDouble();
         struct.set_assigned_cpu_isSet(true);
       }

http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java b/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java
index 6542c7e..47cbd2d 100644
--- a/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java
+++ b/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java
@@ -158,7 +158,7 @@ public class TopologyBuilder {
 
         stormTopology.set_worker_hooks(_workerHooks);
 
-        return stormTopology;
+        return Utils.addVersions(stormTopology);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-client/src/jvm/org/apache/storm/utils/LocalState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/LocalState.java b/storm-client/src/jvm/org/apache/storm/utils/LocalState.java
index 2f0bb60..29310f5 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/LocalState.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/LocalState.java
@@ -84,7 +84,13 @@ public class LocalState {
 
     private TBase deserialize(ThriftSerializedObject obj, TDeserializer td) {
         try {
-            Class<?> clazz = Class.forName(obj.get_name());
+            Class<?> clazz;
+            try {
+                clazz = Class.forName(obj.get_name());
+            } catch (ClassNotFoundException ex) {
+                //Try to maintain rolling upgrade compatible with 0.10 releases
+                clazz = Class.forName(obj.get_name().replaceAll("^backtype\\.storm\\.", "org.apache.storm."));
+            }
             TBase instance = (TBase) clazz.newInstance();
             td.deserialize(instance, obj.get_bits());
             return instance;

http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-client/src/jvm/org/apache/storm/utils/SimpleVersion.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/SimpleVersion.java b/storm-client/src/jvm/org/apache/storm/utils/SimpleVersion.java
new file mode 100644
index 0000000..471b049
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/utils/SimpleVersion.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.utils;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Take a version string and parse out a Major.Minor version
+ */
+public class SimpleVersion implements Comparable <SimpleVersion> {
+    private final int _major;
+    private final int _minor;
+    
+    private static final Pattern VERSION_PATTERN = Pattern.compile("(\\d+)[.-_]+(\\d+).*");
+    
+    public SimpleVersion(String version) {
+        Matcher m = VERSION_PATTERN.matcher(version);
+        int maj = -1;
+        int min = -1;
+        if (!m.matches()) {
+            //Unknown should only happen during compilation or some unit tests.
+            if (!"Unknown".equals(version)) {
+                throw new IllegalArgumentException("Cannot parse " + version);
+            }
+        } else {
+            maj = Integer.valueOf(m.group(1));
+            min = Integer.valueOf(m.group(2));
+        }
+        _major = maj;
+        _minor = min;
+    }
+    
+    public int getMajor() {
+        return _major;
+    }
+    
+    public int getMinor() {
+        return _minor;
+    }
+    
+    @Override
+    public int hashCode() {
+        return (Integer.hashCode(_major) * 17) & Integer.hashCode(_minor);
+    }
+    
+    @Override
+    public boolean equals(Object o) {
+        if (o == this) {
+            return true;
+        }
+        
+        if (!(o instanceof SimpleVersion)) {
+            return false;
+        }
+        
+        return compareTo((SimpleVersion)o) == 0;
+    }
+    
+    @Override
+    public int compareTo(SimpleVersion o) {
+        int ret = Integer.compare(_major, o._major);
+        if (ret == 0) {
+            ret = Integer.compare(_minor, o._minor);
+        }
+        return ret;
+    }
+    
+    @Override
+    public String toString() {
+        return _major + "." + _minor;
+    }
+}