You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/05/22 21:49:59 UTC
[1/4] storm git commit: STORM-2448: Add in Storm and JDK versions
when submitting a topology
Repository: storm
Updated Branches:
refs/heads/master d7fb0fbc4 -> 9e31509d4
http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-client/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
index 498edcb..45bd123 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
@@ -18,37 +18,6 @@
package org.apache.storm.utils;
-import org.apache.storm.Config;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.input.ClassLoaderObjectInputStream;
-import org.apache.storm.blobstore.ClientBlobStore;
-import org.apache.storm.generated.ClusterSummary;
-import org.apache.storm.generated.ComponentCommon;
-import org.apache.storm.generated.ComponentObject;
-import org.apache.storm.generated.GlobalStreamId;
-import org.apache.storm.generated.InvalidTopologyException;
-import org.apache.storm.generated.Nimbus;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.generated.TopologyInfo;
-import org.apache.storm.generated.TopologySummary;
-import org.apache.storm.serialization.DefaultSerializationDelegate;
-import org.apache.storm.serialization.SerializationDelegate;
-import org.apache.thrift.TBase;
-import org.apache.thrift.TDeserializer;
-import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Id;
-import org.json.simple.JSONValue;
-import org.json.simple.parser.ParseException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.yaml.snakeyaml.Yaml;
-import org.yaml.snakeyaml.constructor.SafeConstructor;
-
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -74,6 +43,7 @@ import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
@@ -82,15 +52,50 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.Callable;
+import java.util.jar.JarEntry;
+import java.util.jar.JarFile;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.input.ClassLoaderObjectInputStream;
+import org.apache.storm.Config;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.generated.ClusterSummary;
+import org.apache.storm.generated.ComponentCommon;
+import org.apache.storm.generated.ComponentObject;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.generated.Nimbus;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.TopologyInfo;
+import org.apache.storm.generated.TopologySummary;
+import org.apache.storm.serialization.DefaultSerializationDelegate;
+import org.apache.storm.serialization.SerializationDelegate;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
+import org.json.simple.JSONValue;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+import org.yaml.snakeyaml.constructor.SafeConstructor;
+
+import com.google.common.annotations.VisibleForTesting;
+
public class Utils {
public static final Logger LOG = LoggerFactory.getLogger(Utils.class);
public static final String DEFAULT_STREAM_ID = "default";
@@ -1267,4 +1272,147 @@ public class Utils {
return Time.deltaSecs(startTime);
}
}
+
+ /**
+ * Add version information to the given topology
+ * @param topology the topology being submitted (MIGHT BE MODIFIED)
+ * @return topology
+ */
+ public static StormTopology addVersions(StormTopology topology) {
+ String stormVersion = VersionInfo.getVersion();
+ if (stormVersion != null &&
+ !"Unknown".equalsIgnoreCase(stormVersion) &&
+ !topology.is_set_storm_version()) {
+ topology.set_storm_version(stormVersion);
+ }
+
+ String jdkVersion = System.getProperty("java.version");
+ if (jdkVersion != null && !topology.is_set_jdk_version()) {
+ topology.set_jdk_version(jdkVersion);
+ }
+ return topology;
+ }
+
+ /**
+ * Get a map of version to classpath from the conf Config.SUPERVISOR_WORKER_VERSION_CLASSPATH_MAP
+ * @param conf what to read it out of
+ * @param currentCP the current classpath for this version of storm (not included in the conf, but returned by this)
+ * @return the map
+ */
+ public static NavigableMap<SimpleVersion, List<String>> getConfiguredClasspathVersions(Map<String, Object> conf, List<String> currentCP) {
+ TreeMap<SimpleVersion, List<String>> ret = new TreeMap<>();
+ Map<String, String> fromConf = (Map<String, String>) conf.getOrDefault(Config.SUPERVISOR_WORKER_VERSION_CLASSPATH_MAP, Collections.emptyMap());
+ for (Map.Entry<String, String> entry: fromConf.entrySet()) {
+ ret.put(new SimpleVersion(entry.getKey()), Arrays.asList(entry.getValue().split(File.pathSeparator)));
+ }
+ ret.put(VersionInfo.OUR_VERSION, currentCP);
+ return ret;
+ }
+
+ /**
+ * Get a map of version to worker main from the conf Config.SUPERVISOR_WORKER_VERSION_MAIN_MAP
+ * @param conf what to read it out of
+ * @return the map
+ */
+ public static NavigableMap<SimpleVersion, String> getConfiguredWorkerMainVersions(Map<String, Object> conf) {
+ TreeMap<SimpleVersion, String> ret = new TreeMap<>();
+ Map<String, String> fromConf = (Map<String, String>) conf.getOrDefault(Config.SUPERVISOR_WORKER_VERSION_MAIN_MAP, Collections.emptyMap());
+ for (Map.Entry<String, String> entry: fromConf.entrySet()) {
+ ret.put(new SimpleVersion(entry.getKey()), entry.getValue());
+ }
+
+ ret.put(VersionInfo.OUR_VERSION, "org.apache.storm.daemon.worker.Worker");
+ return ret;
+ }
+
+
+ /**
+ * Get a map of version to worker log writer from the conf Config.SUPERVISOR_WORKER_VERSION_LOGWRITER_MAP
+ * @param conf what to read it out of
+ * @return the map
+ */
+ public static NavigableMap<SimpleVersion, String> getConfiguredWorkerLogWriterVersions(Map<String, Object> conf) {
+ TreeMap<SimpleVersion, String> ret = new TreeMap<>();
+ Map<String, String> fromConf = (Map<String, String>) conf.getOrDefault(Config.SUPERVISOR_WORKER_VERSION_LOGWRITER_MAP, Collections.emptyMap());
+ for (Map.Entry<String, String> entry: fromConf.entrySet()) {
+ ret.put(new SimpleVersion(entry.getKey()), entry.getValue());
+ }
+
+ ret.put(VersionInfo.OUR_VERSION, "org.apache.storm.LogWriter");
+ return ret;
+ }
+
+
+ public static <T> T getCompatibleVersion(NavigableMap<SimpleVersion, T> versionedMap, SimpleVersion desiredVersion, String what, T defaultValue) {
+ Entry<SimpleVersion, T> ret = versionedMap.ceilingEntry(desiredVersion);
+ if (ret == null || ret.getKey().getMajor() != desiredVersion.getMajor()) {
+ //Could not find a "fully" compatible version. Look to see if there is a possibly compatible version right below it
+ ret = versionedMap.floorEntry(desiredVersion);
+ if (ret == null || ret.getKey().getMajor() != desiredVersion.getMajor()) {
+ if (defaultValue != null) {
+ LOG.warn("Could not find any compatible {} falling back to using {}", what, defaultValue);
+ }
+ return defaultValue;
+ }
+ LOG.warn("Could not find a higer compatible version for {} {}, using {} instead", what, desiredVersion, ret.getKey());
+ }
+ return ret.getValue();
+ }
+
+ @SuppressWarnings("unchecked")
+ private static Map<String, Object> readConfIgnoreNotFound(Yaml yaml, File f) throws IOException {
+ Map<String, Object> ret = null;
+ if (f.exists()) {
+ try (FileReader fr = new FileReader(f)) {
+ ret = (Map<String, Object>) yaml.load(fr);
+ }
+ }
+ return ret;
+ }
+
+ public static Map<String, Object> getConfigFromClasspath(List<String> cp, Map<String, Object> conf) throws IOException {
+ if (cp == null || cp.isEmpty()) {
+ return conf;
+ }
+ Yaml yaml = new Yaml(new SafeConstructor());
+ Map<String, Object> defaultsConf = null;
+ Map<String, Object> stormConf = null;
+ for (String part: cp) {
+ File f = new File(part);
+ if (f.isDirectory()) {
+ if (defaultsConf == null) {
+ defaultsConf = readConfIgnoreNotFound(yaml, new File(f, "defaults.yaml"));
+ }
+
+ if (stormConf == null) {
+ stormConf = readConfIgnoreNotFound(yaml, new File(f, "storm.yaml"));
+ }
+ } else {
+ //Lets assume it is a jar file
+ try (JarFile jarFile = new JarFile(f)) {
+ Enumeration<JarEntry> jarEnums = jarFile.entries();
+ while (jarEnums.hasMoreElements()) {
+ JarEntry entry = jarEnums.nextElement();
+ if (!entry.isDirectory()) {
+ if (defaultsConf == null && entry.getName().equals("defaults.yaml")) {
+ try (InputStream in = jarFile.getInputStream(entry)) {
+ defaultsConf = (Map<String, Object>) yaml.load(new InputStreamReader(in));
+ }
+ }
+
+ if (stormConf == null && entry.getName().equals("storm.yaml")) {
+ try (InputStream in = jarFile.getInputStream(entry)) {
+ stormConf = (Map<String, Object>) yaml.load(new InputStreamReader(in));
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ if (stormConf != null) {
+ defaultsConf.putAll(stormConf);
+ }
+ return defaultsConf;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-client/src/jvm/org/apache/storm/utils/VersionInfo.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/VersionInfo.java b/storm-client/src/jvm/org/apache/storm/utils/VersionInfo.java
index b3c21d5..f60586f 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/VersionInfo.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/VersionInfo.java
@@ -23,7 +23,8 @@ import java.io.InputStream;
import java.util.Properties;
public class VersionInfo {
-
+ private static final VersionInfo COMMON_VERSION_INFO = new VersionInfo("storm-core");
+ public static final SimpleVersion OUR_VERSION = new SimpleVersion(COMMON_VERSION_INFO._getVersion());
private Properties info;
protected VersionInfo(String component) {
@@ -85,9 +86,6 @@ public class VersionInfo {
" source checksum " + _getSrcChecksum();
}
-
- private static final VersionInfo COMMON_VERSION_INFO = new VersionInfo("storm-core");
-
public static String getVersion() {
return COMMON_VERSION_INFO._getVersion();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-client/src/py/storm/ttypes.py
----------------------------------------------------------------------
diff --git a/storm-client/src/py/storm/ttypes.py b/storm-client/src/py/storm/ttypes.py
index e9085e6..d47483f 100644
--- a/storm-client/src/py/storm/ttypes.py
+++ b/storm-client/src/py/storm/ttypes.py
@@ -1414,6 +1414,8 @@ class StormTopology:
- worker_hooks
- dependency_jars
- dependency_artifacts
+ - storm_version
+ - jdk_version
"""
thrift_spec = (
@@ -1424,15 +1426,19 @@ class StormTopology:
(4, TType.LIST, 'worker_hooks', (TType.STRING,None), None, ), # 4
(5, TType.LIST, 'dependency_jars', (TType.STRING,None), None, ), # 5
(6, TType.LIST, 'dependency_artifacts', (TType.STRING,None), None, ), # 6
+ (7, TType.STRING, 'storm_version', None, None, ), # 7
+ (8, TType.STRING, 'jdk_version', None, None, ), # 8
)
- def __init__(self, spouts=None, bolts=None, state_spouts=None, worker_hooks=None, dependency_jars=None, dependency_artifacts=None,):
+ def __init__(self, spouts=None, bolts=None, state_spouts=None, worker_hooks=None, dependency_jars=None, dependency_artifacts=None, storm_version=None, jdk_version=None,):
self.spouts = spouts
self.bolts = bolts
self.state_spouts = state_spouts
self.worker_hooks = worker_hooks
self.dependency_jars = dependency_jars
self.dependency_artifacts = dependency_artifacts
+ self.storm_version = storm_version
+ self.jdk_version = jdk_version
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -1509,6 +1515,16 @@ class StormTopology:
iprot.readListEnd()
else:
iprot.skip(ftype)
+ elif fid == 7:
+ if ftype == TType.STRING:
+ self.storm_version = iprot.readString().decode('utf-8')
+ else:
+ iprot.skip(ftype)
+ elif fid == 8:
+ if ftype == TType.STRING:
+ self.jdk_version = iprot.readString().decode('utf-8')
+ else:
+ iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
@@ -1564,6 +1580,14 @@ class StormTopology:
oprot.writeString(iter86.encode('utf-8'))
oprot.writeListEnd()
oprot.writeFieldEnd()
+ if self.storm_version is not None:
+ oprot.writeFieldBegin('storm_version', TType.STRING, 7)
+ oprot.writeString(self.storm_version.encode('utf-8'))
+ oprot.writeFieldEnd()
+ if self.jdk_version is not None:
+ oprot.writeFieldBegin('jdk_version', TType.STRING, 8)
+ oprot.writeString(self.jdk_version.encode('utf-8'))
+ oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
@@ -1585,6 +1609,8 @@ class StormTopology:
value = (value * 31) ^ hash(self.worker_hooks)
value = (value * 31) ^ hash(self.dependency_jars)
value = (value * 31) ^ hash(self.dependency_artifacts)
+ value = (value * 31) ^ hash(self.storm_version)
+ value = (value * 31) ^ hash(self.jdk_version)
return value
def __repr__(self):
@@ -2028,6 +2054,7 @@ class TopologySummary:
- num_workers
- uptime_secs
- status
+ - storm_version
- sched_status
- owner
- replication_count
@@ -2048,7 +2075,7 @@ class TopologySummary:
(5, TType.I32, 'num_workers', None, None, ), # 5
(6, TType.I32, 'uptime_secs', None, None, ), # 6
(7, TType.STRING, 'status', None, None, ), # 7
- None, # 8
+ (8, TType.STRING, 'storm_version', None, None, ), # 8
None, # 9
None, # 10
None, # 11
@@ -2569,7 +2596,7 @@ class TopologySummary:
(526, TType.DOUBLE, 'assigned_cpu', None, None, ), # 526
)
- def __init__(self, id=None, name=None, num_tasks=None, num_executors=None, num_workers=None, uptime_secs=None, status=None, sched_status=None, owner=None, replication_count=None, requested_memonheap=None, requested_memoffheap=None, requested_cpu=None, assigned_memonheap=None, assigned_memoffheap=None, assigned_cpu=None,):
+ def __init__(self, id=None, name=None, num_tasks=None, num_executors=None, num_workers=None, uptime_secs=None, status=None, storm_version=None, sched_status=None, owner=None, replication_count=None, requested_memonheap=None, requested_memoffheap=None, requested_cpu=None, assigned_memonheap=None, assigned_memoffheap=None, assigned_cpu=None,):
self.id = id
self.name = name
self.num_tasks = num_tasks
@@ -2577,6 +2604,7 @@ class TopologySummary:
self.num_workers = num_workers
self.uptime_secs = uptime_secs
self.status = status
+ self.storm_version = storm_version
self.sched_status = sched_status
self.owner = owner
self.replication_count = replication_count
@@ -2631,6 +2659,11 @@ class TopologySummary:
self.status = iprot.readString().decode('utf-8')
else:
iprot.skip(ftype)
+ elif fid == 8:
+ if ftype == TType.STRING:
+ self.storm_version = iprot.readString().decode('utf-8')
+ else:
+ iprot.skip(ftype)
elif fid == 513:
if ftype == TType.STRING:
self.sched_status = iprot.readString().decode('utf-8')
@@ -2714,6 +2747,10 @@ class TopologySummary:
oprot.writeFieldBegin('status', TType.STRING, 7)
oprot.writeString(self.status.encode('utf-8'))
oprot.writeFieldEnd()
+ if self.storm_version is not None:
+ oprot.writeFieldBegin('storm_version', TType.STRING, 8)
+ oprot.writeString(self.storm_version.encode('utf-8'))
+ oprot.writeFieldEnd()
if self.sched_status is not None:
oprot.writeFieldBegin('sched_status', TType.STRING, 513)
oprot.writeString(self.sched_status.encode('utf-8'))
@@ -2780,6 +2817,7 @@ class TopologySummary:
value = (value * 31) ^ hash(self.num_workers)
value = (value * 31) ^ hash(self.uptime_secs)
value = (value * 31) ^ hash(self.status)
+ value = (value * 31) ^ hash(self.storm_version)
value = (value * 31) ^ hash(self.sched_status)
value = (value * 31) ^ hash(self.owner)
value = (value * 31) ^ hash(self.replication_count)
@@ -4298,6 +4336,7 @@ class TopologyInfo:
- status
- errors
- component_debug
+ - storm_version
- sched_status
- owner
- replication_count
@@ -4318,7 +4357,7 @@ class TopologyInfo:
(5, TType.STRING, 'status', None, None, ), # 5
(6, TType.MAP, 'errors', (TType.STRING,None,TType.LIST,(TType.STRUCT,(ErrorInfo, ErrorInfo.thrift_spec))), None, ), # 6
(7, TType.MAP, 'component_debug', (TType.STRING,None,TType.STRUCT,(DebugOptions, DebugOptions.thrift_spec)), None, ), # 7
- None, # 8
+ (8, TType.STRING, 'storm_version', None, None, ), # 8
None, # 9
None, # 10
None, # 11
@@ -4839,7 +4878,7 @@ class TopologyInfo:
(526, TType.DOUBLE, 'assigned_cpu', None, None, ), # 526
)
- def __init__(self, id=None, name=None, uptime_secs=None, executors=None, status=None, errors=None, component_debug=None, sched_status=None, owner=None, replication_count=None, requested_memonheap=None, requested_memoffheap=None, requested_cpu=None, assigned_memonheap=None, assigned_memoffheap=None, assigned_cpu=None,):
+ def __init__(self, id=None, name=None, uptime_secs=None, executors=None, status=None, errors=None, component_debug=None, storm_version=None, sched_status=None, owner=None, replication_count=None, requested_memonheap=None, requested_memoffheap=None, requested_cpu=None, assigned_memonheap=None, assigned_memoffheap=None, assigned_cpu=None,):
self.id = id
self.name = name
self.uptime_secs = uptime_secs
@@ -4847,6 +4886,7 @@ class TopologyInfo:
self.status = status
self.errors = errors
self.component_debug = component_debug
+ self.storm_version = storm_version
self.sched_status = sched_status
self.owner = owner
self.replication_count = replication_count
@@ -4926,6 +4966,11 @@ class TopologyInfo:
iprot.readMapEnd()
else:
iprot.skip(ftype)
+ elif fid == 8:
+ if ftype == TType.STRING:
+ self.storm_version = iprot.readString().decode('utf-8')
+ else:
+ iprot.skip(ftype)
elif fid == 513:
if ftype == TType.STRING:
self.sched_status = iprot.readString().decode('utf-8')
@@ -5023,6 +5068,10 @@ class TopologyInfo:
viter328.write(oprot)
oprot.writeMapEnd()
oprot.writeFieldEnd()
+ if self.storm_version is not None:
+ oprot.writeFieldBegin('storm_version', TType.STRING, 8)
+ oprot.writeString(self.storm_version.encode('utf-8'))
+ oprot.writeFieldEnd()
if self.sched_status is not None:
oprot.writeFieldBegin('sched_status', TType.STRING, 513)
oprot.writeString(self.sched_status.encode('utf-8'))
@@ -5087,6 +5136,7 @@ class TopologyInfo:
value = (value * 31) ^ hash(self.status)
value = (value * 31) ^ hash(self.errors)
value = (value * 31) ^ hash(self.component_debug)
+ value = (value * 31) ^ hash(self.storm_version)
value = (value * 31) ^ hash(self.sched_status)
value = (value * 31) ^ hash(self.owner)
value = (value * 31) ^ hash(self.replication_count)
@@ -6668,6 +6718,7 @@ class TopologyPageInfo:
- debug_options
- replication_count
- workers
+ - storm_version
- requested_memonheap
- requested_memoffheap
- requested_cpu
@@ -6694,7 +6745,7 @@ class TopologyPageInfo:
(14, TType.STRUCT, 'debug_options', (DebugOptions, DebugOptions.thrift_spec), None, ), # 14
(15, TType.I32, 'replication_count', None, None, ), # 15
(16, TType.LIST, 'workers', (TType.STRUCT,(WorkerSummary, WorkerSummary.thrift_spec)), None, ), # 16
- None, # 17
+ (17, TType.STRING, 'storm_version', None, None, ), # 17
None, # 18
None, # 19
None, # 20
@@ -7206,7 +7257,7 @@ class TopologyPageInfo:
(526, TType.DOUBLE, 'assigned_cpu', None, None, ), # 526
)
- def __init__(self, id=None, name=None, uptime_secs=None, status=None, num_tasks=None, num_workers=None, num_executors=None, topology_conf=None, id_to_spout_agg_stats=None, id_to_bolt_agg_stats=None, sched_status=None, topology_stats=None, owner=None, debug_options=None, replication_count=None, workers=None, requested_memonheap=None, requested_memoffheap=None, requested_cpu=None, assigned_memonheap=None, assigned_memoffheap=None, assigned_cpu=None,):
+ def __init__(self, id=None, name=None, uptime_secs=None, status=None, num_tasks=None, num_workers=None, num_executors=None, topology_conf=None, id_to_spout_agg_stats=None, id_to_bolt_agg_stats=None, sched_status=None, topology_stats=None, owner=None, debug_options=None, replication_count=None, workers=None, storm_version=None, requested_memonheap=None, requested_memoffheap=None, requested_cpu=None, assigned_memonheap=None, assigned_memoffheap=None, assigned_cpu=None,):
self.id = id
self.name = name
self.uptime_secs = uptime_secs
@@ -7223,6 +7274,7 @@ class TopologyPageInfo:
self.debug_options = debug_options
self.replication_count = replication_count
self.workers = workers
+ self.storm_version = storm_version
self.requested_memonheap = requested_memonheap
self.requested_memoffheap = requested_memoffheap
self.requested_cpu = requested_cpu
@@ -7341,6 +7393,11 @@ class TopologyPageInfo:
iprot.readListEnd()
else:
iprot.skip(ftype)
+ elif fid == 17:
+ if ftype == TType.STRING:
+ self.storm_version = iprot.readString().decode('utf-8')
+ else:
+ iprot.skip(ftype)
elif fid == 521:
if ftype == TType.DOUBLE:
self.requested_memonheap = iprot.readDouble()
@@ -7456,6 +7513,10 @@ class TopologyPageInfo:
iter430.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
+ if self.storm_version is not None:
+ oprot.writeFieldBegin('storm_version', TType.STRING, 17)
+ oprot.writeString(self.storm_version.encode('utf-8'))
+ oprot.writeFieldEnd()
if self.requested_memonheap is not None:
oprot.writeFieldBegin('requested_memonheap', TType.DOUBLE, 521)
oprot.writeDouble(self.requested_memonheap)
@@ -7507,6 +7568,7 @@ class TopologyPageInfo:
value = (value * 31) ^ hash(self.debug_options)
value = (value * 31) ^ hash(self.replication_count)
value = (value * 31) ^ hash(self.workers)
+ value = (value * 31) ^ hash(self.storm_version)
value = (value * 31) ^ hash(self.requested_memonheap)
value = (value * 31) ^ hash(self.requested_memoffheap)
value = (value * 31) ^ hash(self.requested_cpu)
http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-client/src/storm.thrift
----------------------------------------------------------------------
diff --git a/storm-client/src/storm.thrift b/storm-client/src/storm.thrift
index cb2239d..ee02d1b 100644
--- a/storm-client/src/storm.thrift
+++ b/storm-client/src/storm.thrift
@@ -120,6 +120,8 @@ struct StormTopology {
4: optional list<binary> worker_hooks;
5: optional list<string> dependency_jars;
6: optional list<string> dependency_artifacts;
+ 7: optional string storm_version;
+ 8: optional string jdk_version;
}
exception AlreadyAliveException {
@@ -154,6 +156,7 @@ struct TopologySummary {
5: required i32 num_workers;
6: required i32 uptime_secs;
7: required string status;
+ 8: optional string storm_version;
513: optional string sched_status;
514: optional string owner;
515: optional i32 replication_count;
@@ -255,6 +258,7 @@ struct TopologyInfo {
5: required string status;
6: required map<string, list<ErrorInfo>> errors;
7: optional map<string, DebugOptions> component_debug;
+ 8: optional string storm_version;
513: optional string sched_status;
514: optional string owner;
515: optional i32 replication_count;
@@ -352,6 +356,7 @@ struct TopologyPageInfo {
14: optional DebugOptions debug_options;
15: optional i32 replication_count;
16: optional list<WorkerSummary> workers;
+17: optional string storm_version;
521: optional double requested_memonheap;
522: optional double requested_memoffheap;
523: optional double requested_cpu;
http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-core/src/clj/org/apache/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj b/storm-core/src/clj/org/apache/storm/ui/core.clj
index 2a07fdc..d1bdee8 100644
--- a/storm-core/src/clj/org/apache/storm/ui/core.clj
+++ b/storm-core/src/clj/org/apache/storm/ui/core.clj
@@ -554,7 +554,8 @@
"assignedMemOnHeap" (.get_assigned_memonheap t)
"assignedMemOffHeap" (.get_assigned_memoffheap t)
"assignedTotalMem" (+ (.get_assigned_memonheap t) (.get_assigned_memoffheap t))
- "assignedCpu" (.get_assigned_cpu t)})
+ "assignedCpu" (.get_assigned_cpu t)
+ "stormVersion" (.get_storm_version t)})
"schedulerDisplayResource" (*STORM-CONF* SCHEDULER-DISPLAY-RESOURCE)}))
(defn topology-stats [window stats]
@@ -694,7 +695,8 @@
"configuration" (.get_topology_conf topo-info)
"debug" (or debugEnabled false)
"samplingPct" (or samplingPct 10)
- "replicationCount" (.get_replication_count topo-info)}))
+ "replicationCount" (.get_replication_count topo-info)
+ "stormVersion" (.get_storm_version topo-info)}))
(defn exec-host-port
[executors]
http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-core/src/ui/public/templates/index-page-template.html
----------------------------------------------------------------------
diff --git a/storm-core/src/ui/public/templates/index-page-template.html b/storm-core/src/ui/public/templates/index-page-template.html
index ed589d9..1802452 100644
--- a/storm-core/src/ui/public/templates/index-page-template.html
+++ b/storm-core/src/ui/public/templates/index-page-template.html
@@ -213,6 +213,11 @@
Scheduler Info
</span>
</th>
+ <th>
+ <span data-toggle="tooltip" data-placement="left" title="The version of the storm client that this topology request (was launched with)">
+ Storm Version
+ </span>
+ </th>
</tr>
</thead>
<tbody>
@@ -231,6 +236,7 @@
<td>{{assignedCpu}}</td>
{{/schedulerDisplayResource}}
<td>{{schedulerInfo}}</td>
+ <td>{{stormVersion}}</td>
</tr>
{{/topologies}}
</tbody>
http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-core/src/ui/public/templates/topology-page-template.html
----------------------------------------------------------------------
diff --git a/storm-core/src/ui/public/templates/topology-page-template.html b/storm-core/src/ui/public/templates/topology-page-template.html
index 2106d6c..0d45de0 100644
--- a/storm-core/src/ui/public/templates/topology-page-template.html
+++ b/storm-core/src/ui/public/templates/topology-page-template.html
@@ -80,6 +80,11 @@
Scheduler Info
</span>
</th>
+ <th>
+ <span data-toggle="tooltip" data-placement="left" title="The version of the storm client that this topology request (was launched with)">
+ Storm Version
+ </span>
+ </th>
</tr>
</thead>
<tbody>
@@ -98,6 +103,7 @@
<td>{{assignedCpu}}</td>
{{/schedulerDisplayResource}}
<td>{{schedulerInfo}}</td>
+ <td>{{stormVersion}}</td>
</tr>
</tbody>
</table>
http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index e91a9b0..bfa8bd6 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -37,7 +37,7 @@
TopologyInitialStatus TopologyStatus AlreadyAliveException KillOptions RebalanceOptions
InvalidTopologyException AuthorizationException
LogConfig LogLevel LogLevelAction Assignment NodeInfo])
- (:import [java.util HashMap HashSet Optional])
+ (:import [java.util Map HashMap HashSet Optional])
(:import [java.io File])
(:import [javax.security.auth Subject])
(:import [org.apache.storm.utils Time Time$SimulatedTime IPredicate StormCommonInstaller Utils$UptimeComputer ReflectionUtils Utils ConfigUtils ServerConfigUtils]
@@ -1338,8 +1338,7 @@
(.thenReturn (Mockito/when (.readTopologyConf blob-store (Mockito/any String) (Mockito/anyObject))) expected-conf)
(.thenReturn (Mockito/when (.readTopology blob-store (Mockito/any String) (Mockito/anyObject))) nil)
(testing "getTopologyConf calls check-authorization! with the correct parameters."
- (let [expected-operation "getTopologyConf"
- expected-conf-json (JSONValue/toJSONString expected-conf)]
+ (let [expected-operation "getTopologyConf"]
(try
(is (= expected-conf
(->> (.getTopologyConf nimbus topology-id)
@@ -1347,7 +1346,8 @@
clojurify-structure)))
(catch NotAliveException e)
(finally
- (.checkAuthorization (Mockito/verify nimbus) topology-name expected-conf expected-operation)))))
+ (.checkAuthorization (Mockito/verify nimbus) nil nil "getClusterInfo")
+ (.checkAuthorization (Mockito/verify nimbus) (Mockito/eq topology-name) (Mockito/any Map) (Mockito/eq expected-operation))))))
(testing "getTopology calls check-authorization! with the correct parameters."
(let [expected-operation "getTopology"
@@ -1360,9 +1360,9 @@
(.getTopology nimbus topology-id)
(catch NotAliveException e)
(finally
- (.checkAuthorization (Mockito/verify nimbus) topology-name expected-conf expected-operation)
+ (.checkAuthorization (Mockito/verify nimbus) (Mockito/eq topology-name) (Mockito/any Map) (Mockito/eq expected-operation))
(. (Mockito/verify common-spy)
- (systemTopologyImpl (Matchers/eq expected-conf)
+ (systemTopologyImpl (Matchers/any Map)
(Matchers/any))))))))
(testing "getUserTopology calls check-authorization with the correct parameters."
@@ -1371,7 +1371,7 @@
(.getUserTopology nimbus topology-id)
(catch NotAliveException e)
(finally
- (.checkAuthorization (Mockito/verify nimbus) topology-name expected-conf expected-operation)
+ (.checkAuthorization (Mockito/verify nimbus) (Mockito/eq topology-name) (Mockito/any Map) (Mockito/eq expected-operation))
;;One for this time and one for getTopology call
(.readTopology (Mockito/verify blob-store (Mockito/times 2)) (Mockito/eq topology-id) (Mockito/anyObject))))))))))
@@ -1417,8 +1417,9 @@
(.getSupervisorPageInfo nimbus "super1" nil true)
;; afterwards, it should get called twice
- (.checkAuthorization (Mockito/verify nimbus) expected-name expected-conf "getSupervisorPageInfo")
- (.checkAuthorization (Mockito/verify nimbus) expected-name expected-conf "getTopology")))))
+ (.checkAuthorization (Mockito/verify nimbus) (Mockito/eq expected-name) (Mockito/any Map) (Mockito/eq "getSupervisorPageInfo"))
+ (.checkAuthorization (Mockito/verify nimbus) nil nil "getClusterInfo")
+ (.checkAuthorization (Mockito/verify nimbus) (Mockito/eq expected-name) (Mockito/any Map) (Mockito/eq "getTopology"))))))
(deftest test-nimbus-iface-getTopology-methods-throw-correctly
(with-open [cluster (LocalCluster. )]
http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-server/src/main/java/org/apache/storm/LocalCluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/LocalCluster.java b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
index 1330594..25a3ec6 100644
--- a/storm-server/src/main/java/org/apache/storm/LocalCluster.java
+++ b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
@@ -531,7 +531,7 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
if (!Utils.isValidConf(conf)) {
throw new IllegalArgumentException("Topology conf is not json-serializable");
}
- getNimbus().submitTopology(topologyName, null, JSONValue.toJSONString(conf), topology);
+ getNimbus().submitTopology(topologyName, null, JSONValue.toJSONString(conf), Utils.addVersions(topology));
ISubmitterHook hook = (ISubmitterHook) Utils.getConfiguredClass(conf, Config.STORM_TOPOLOGY_SUBMISSION_NOTIFIER_PLUGIN);
if (hook != null) {
@@ -551,22 +551,20 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
if (!Utils.isValidConf(conf)) {
throw new IllegalArgumentException("Topology conf is not json-serializable");
}
- getNimbus().submitTopologyWithOpts(topologyName, null, JSONValue.toJSONString(conf), topology, submitOpts);
+ getNimbus().submitTopologyWithOpts(topologyName, null, JSONValue.toJSONString(conf), Utils.addVersions(topology), submitOpts);
return new LocalTopology(topologyName, topology);
}
@Override
public LocalTopology submitTopology(String topologyName, Map<String, Object> conf, TrackedTopology topology)
throws TException {
- submitTopology(topologyName, conf, topology.getTopology());
- return new LocalTopology(topologyName, topology.getTopology());
+ return submitTopology(topologyName, conf, topology.getTopology());
}
@Override
public LocalTopology submitTopologyWithOpts(String topologyName, Map<String, Object> conf, TrackedTopology topology, SubmitOptions submitOpts)
throws TException {
- submitTopologyWithOpts(topologyName, conf, topology.getTopology(), submitOpts);
- return new LocalTopology(topologyName, topology.getTopology());
+ return submitTopologyWithOpts(topologyName, conf, topology.getTopology(), submitOpts);
}
@Override
@@ -611,25 +609,21 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
}
}
-
@Override
public String getTopologyConf(String id) throws TException {
return getNimbus().getTopologyConf(id);
}
-
@Override
public StormTopology getTopology(String id) throws TException {
return getNimbus().getTopology(id);
}
-
@Override
public ClusterSummary getClusterInfo() throws TException {
return getNimbus().getClusterInfo();
}
-
@Override
public TopologyInfo getTopologyInfo(String id) throws TException {
return getNimbus().getTopologyInfo(id);
http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 032bf7c..f569e9b 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -39,6 +39,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.NavigableMap;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -157,6 +158,7 @@ import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ReflectionUtils;
import org.apache.storm.utils.ServerConfigUtils;
import org.apache.storm.utils.ServerUtils;
+import org.apache.storm.utils.SimpleVersion;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.TimeCacheMap;
import org.apache.storm.utils.TupleUtils;
@@ -1022,6 +1024,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
}
private final Map<String, Object> conf;
+ private final NavigableMap<SimpleVersion, List<String>> supervisorClasspaths;
private final NimbusInfo nimbusHostPortInfo;
private final INimbus inimbus;
private IAuthorizer authorizationHandler;
@@ -1128,6 +1131,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
}
this.groupMapper = groupMapper;
this.principalToLocal = AuthUtils.GetPrincipalToLocalPlugin(conf);
+ this.supervisorClasspaths = Collections.unmodifiableNavigableMap(Utils.getConfiguredClasspathVersions(conf, EMPTY_STRING_LIST));// We don't use the classpath part of this, so just an empty list
}
Map<String, Object> getConf() {
@@ -1975,6 +1979,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
private boolean isAuthorized(String operation, String topoId) throws NotAliveException, AuthorizationException, IOException {
Map<String, Object> topoConf = tryReadTopoConf(topoId, blobStore);
+ topoConf = merge(conf, topoConf);
String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
try {
checkAuthorization(topoName, topoConf, operation);
@@ -2155,7 +2160,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
List<String> assignedIds = state.activeStorms();
if (assignedIds != null) {
for (String id: assignedIds) {
- Map<String, Object> topoConf = Collections.unmodifiableMap(tryReadTopoConf(id, store));
+ Map<String, Object> topoConf = Collections.unmodifiableMap(merge(conf, tryReadTopoConf(id, store)));
synchronized(lock) {
Credentials origCreds = state.credentials(id, null);
if (origCreds != null) {
@@ -2268,7 +2273,14 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
TopologySummary summary = new TopologySummary(topoId, base.get_name(), numTasks, numExecutors, numWorkers,
Time.deltaSecs(base.get_launch_time_secs()), extractStatusStr(base));
-
+ try {
+ StormTopology topo = tryReadTopology(topoId, blobStore);
+ if (topo != null && topo.is_set_storm_version()) {
+ summary.set_storm_version(topo.get_storm_version());
+ }
+ } catch (NotAliveException e) {
+ //Ignored it is not set
+ }
if (base.is_set_owner()) {
summary.set_owner(base.get_owner());
}
@@ -2520,7 +2532,22 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
if (!(Boolean)conf.getOrDefault(DaemonConfig.STORM_TOPOLOGY_CLASSPATH_BEGINNING_ENABLED, false)) {
topoConf.remove(Config.TOPOLOGY_CLASSPATH_BEGINNING);
}
- Map<String, Object> totalConf = merge(conf, topoConf);
+ String topoVersionString = topology.get_storm_version();
+ if (topoVersionString == null) {
+ topoVersionString = (String)conf.getOrDefault(Config.SUPERVISOR_WORKER_DEFAULT_VERSION, VersionInfo.getVersion());
+ }
+ //Check if we can run a topology with that version of storm.
+ SimpleVersion topoVersion = new SimpleVersion(topoVersionString);
+ List<String> cp = Utils.getCompatibleVersion(supervisorClasspaths, topoVersion, "classpath", null);
+ if (cp == null) {
+ throw new InvalidTopologyException("Topology submitted with storm version " + topoVersionString
+ + " but could not find a configured compatible version to use " + supervisorClasspaths.keySet());
+ }
+ Map<String, Object> otherConf = Utils.getConfigFromClasspath(cp, conf);
+ Map<String, Object> totalConfToSave = merge(otherConf, topoConf);
+ Map<String, Object> totalConf = merge(totalConfToSave, conf);
+ //When reading the conf in nimbus we want to fall back to our own settings
+ // if the other config does not have it set.
topology = normalizeTopology(totalConf, topology);
IStormClusterState state = stormClusterState;
@@ -2541,7 +2568,10 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
!Utils.isZkAuthenticationConfiguredTopology(topoConf)) {
throw new IllegalArgumentException("The cluster is configured for zookeeper authentication, but no payload was provided.");
}
- LOG.info("Received topology submission for {} with conf {}", topoName, Utils.redactValue(topoConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD));
+ LOG.info("Received topology submission for {} (storm-{} JDK-{}) with conf {}", topoName,
+ topoVersionString, topology.get_jdk_version(),
+ Utils.redactValue(topoConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD));
+
// lock protects against multiple topologies being submitted at once and
// cleanup thread killing topology in b/w assignment and starting the topology
synchronized(submitLock) {
@@ -2551,7 +2581,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
state.setCredentials(topoId, new Credentials(creds), topoConf);
}
LOG.info("uploadedJar {}", uploadedJarLocation);
- setupStormCode(conf, topoId, uploadedJarLocation, totalConf, topology);
+ setupStormCode(conf, topoId, uploadedJarLocation, totalConfToSave, topology);
waitForDesiredCodeReplication(totalConf, topoId);
state.setupHeatbeats(topoId);
if (ObjectReader.getBoolean(totalConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false)) {
@@ -2594,6 +2624,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
assertTopoActive(topoName, true);
try {
Map<String, Object> topoConf = tryReadTopoConfFromName(topoName);
+ topoConf = merge(conf, topoConf);
final String operation = "killTopology";
checkAuthorization(topoName, topoConf, operation);
Integer waitAmount = null;
@@ -2617,6 +2648,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
activateCalls.mark();
try {
Map<String, Object> topoConf = tryReadTopoConfFromName(topoName);
+ topoConf = merge(conf, topoConf);
final String operation = "activate";
checkAuthorization(topoName, topoConf, operation);
transitionName(topoName, TopologyActions.ACTIVATE, null, true);
@@ -2635,6 +2667,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
deactivateCalls.mark();
try {
Map<String, Object> topoConf = tryReadTopoConfFromName(topoName);
+ topoConf = merge(conf, topoConf);
final String operation = "deactivate";
checkAuthorization(topoName, topoConf, operation);
transitionName(topoName, TopologyActions.INACTIVATE, null, true);
@@ -2655,6 +2688,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
assertTopoActive(topoName, true);
try {
Map<String, Object> topoConf = tryReadTopoConfFromName(topoName);
+ topoConf = merge(conf, topoConf);
final String operation = "rebalance";
checkAuthorization(topoName, topoConf, operation);
Map<String, Integer> execOverrides = options.is_set_num_executors() ? options.get_num_executors() : Collections.emptyMap();
@@ -2679,6 +2713,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
try {
setLogConfigCalls.mark();
Map<String, Object> topoConf = tryReadTopoConf(topoId, blobStore);
+ topoConf = merge(conf, topoConf);
String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
checkAuthorization(topoName, topoConf, "setLogConfig");
IStormClusterState state = stormClusterState;
@@ -2732,6 +2767,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
try {
getLogConfigCalls.mark();
Map<String, Object> topoConf = tryReadTopoConf(topoId, blobStore);
+ topoConf = merge(conf, topoConf);
String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
checkAuthorization(topoName, topoConf, "getLogConfig");
IStormClusterState state = stormClusterState;
@@ -2757,6 +2793,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
IStormClusterState state = stormClusterState;
String topoId = toTopoId(topoName);
Map<String, Object> topoConf = tryReadTopoConf(topoId, blobStore);
+ topoConf = merge(conf, topoConf);
// make sure samplingPct is within bounds.
double spct = Math.max(Math.min(samplingPercentage, 100.0), 0.0);
// while disabling we retain the sampling pct.
@@ -2797,6 +2834,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
try {
setWorkerProfilerCalls.mark();
Map<String, Object> topoConf = tryReadTopoConf(topoId, blobStore);
+ topoConf = merge(conf, topoConf);
String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
checkAuthorization(topoName, topoConf, "setWorkerProfiler");
IStormClusterState state = stormClusterState;
@@ -2868,6 +2906,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
throw new NotAliveException(topoName + " is not alive");
}
Map<String, Object> topoConf = tryReadTopoConf(topoId, blobStore);
+ topoConf = merge(conf, topoConf);
if (credentials == null) {
credentials = new Credentials(Collections.emptyMap());
}
@@ -3364,6 +3403,9 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
}
TopologyInfo topoInfo = new TopologyInfo(topoId, common.topoName, Time.deltaSecs(common.launchTimeSecs),
summaries, extractStatusStr(common.base), errors);
+ if (common.topology.is_set_storm_version()) {
+ topoInfo.set_storm_version(common.topology.get_storm_version());
+ }
if (common.base.is_set_owner()) {
topoInfo.set_owner(common.base.get_owner());
}
@@ -3407,7 +3449,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
Map<List<Integer>, Map<String, Object>> beats = common.beats;
Map<Integer, String> taskToComp = common.taskToComponent;
StormTopology topology = common.topology;
- Map<String, Object> topoConf = common.topoConf;
+ Map<String, Object> topoConf = merge(conf, common.topoConf);
StormBase base = common.base;
if (base == null) {
throw new NotAliveException(topoId);
@@ -3444,6 +3486,10 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
includeSys,
state);
+ if (topology.is_set_storm_version()) {
+ topoPageInfo.set_storm_version(topology.get_storm_version());
+ }
+
Map<String, Map<String, Double>> spoutResources = ResourceUtils.getSpoutsResources(topology, topoConf);
for (Entry<String, ComponentAggregateStats> entry: topoPageInfo.get_id_to_spout_agg_stats().entrySet()) {
CommonAggregateStats commonStats = entry.getValue().get_common_stats();
@@ -3576,6 +3622,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
}
StormTopology topology = info.topology;
Map<String, Object> topoConf = info.topoConf;
+ topoConf = merge(conf, topoConf);
Assignment assignment = info.assignment;
Map<List<Long>, List<Object>> exec2NodePort = new HashMap<>();
Map<String, String> nodeToHost;
@@ -3652,8 +3699,9 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
try {
getTopologyConfCalls.mark();
Map<String, Object> topoConf = tryReadTopoConf(id, blobStore);
- String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
- checkAuthorization(topoName, topoConf, "getTopologyConf");
+ Map<String, Object> checkConf = merge(conf, topoConf);
+ String topoName = (String) checkConf.get(Config.TOPOLOGY_NAME);
+ checkAuthorization(topoName, checkConf, "getTopologyConf");
return JSONValue.toJSONString(topoConf);
} catch (Exception e) {
LOG.warn("Get topo conf exception. (topology id='{}')", id, e);
@@ -3669,6 +3717,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
try {
getTopologyCalls.mark();
Map<String, Object> topoConf = tryReadTopoConf(id, blobStore);
+ topoConf = merge(conf, topoConf);
String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
checkAuthorization(topoName, topoConf, "getTopology");
return StormCommon.systemTopology(topoConf, tryReadTopology(id, blobStore));
@@ -3686,6 +3735,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
try {
getUserTopologyCalls.mark();
Map<String, Object> topoConf = tryReadTopoConf(id, blobStore);
+ topoConf = merge(conf, topoConf);
String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
checkAuthorization(topoName, topoConf, "getUserTopology");
return tryReadTopology(id, blobStore);
@@ -3710,6 +3760,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
boolean isAdmin = adminUsers.contains(user);
for (String topoId: assignedIds) {
Map<String, Object> topoConf = tryReadTopoConf(topoId, store);
+ topoConf = merge(conf, topoConf);
List<String> groups = ServerConfigUtils.getTopoLogsGroups(topoConf);
List<String> topoLogUsers = ServerConfigUtils.getTopoLogsUsers(topoConf);
if (user == null || isAdmin ||
http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
index cd9ca97..0307baa 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
@@ -33,6 +33,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.NavigableMap;
import java.util.stream.Collectors;
import org.apache.commons.lang.StringUtils;
@@ -45,11 +46,13 @@ import org.apache.storm.generated.ProfileRequest;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.WorkerResources;
import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ServerConfigUtils;
import org.apache.storm.utils.ServerUtils;
+import org.apache.storm.utils.SimpleVersion;
import org.apache.storm.utils.Utils;
-import org.apache.storm.utils.ObjectReader;
-import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.VersionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -344,7 +347,7 @@ public class BasicContainer extends Container {
return Paths.get(dir.toString(), "*").toString();
}
- protected List<String> frameworkClasspath() {
+ protected List<String> frameworkClasspath(SimpleVersion topoVersion) {
File stormWorkerLibDir = new File(_stormHome, "lib-worker");
String topoConfDir =
System.getenv("STORM_CONF_DIR") != null ?
@@ -358,7 +361,32 @@ public class BasicContainer extends Container {
pathElements.add(extcp);
pathElements.add(topoConfDir);
- return pathElements;
+ NavigableMap<SimpleVersion, List<String>> classpaths = Utils.getConfiguredClasspathVersions(_conf, pathElements);
+
+ return Utils.getCompatibleVersion(classpaths, topoVersion, "classpath", pathElements);
+ }
+
+ protected String getWorkerMain(SimpleVersion topoVersion) {
+ String defaultWorkerGuess = "org.apache.storm.daemon.worker.Worker";
+ if (topoVersion.getMajor() == 0) {
+ //Prior to the org.apache change
+ defaultWorkerGuess = "backtype.storm.daemon.worker";
+ } else if (topoVersion.getMajor() == 1) {
+ //Have not moved to a java worker yet
+ defaultWorkerGuess = "org.apache.storm.daemon.worker";
+ }
+ NavigableMap<SimpleVersion,String> mains = Utils.getConfiguredWorkerMainVersions(_conf);
+ return Utils.getCompatibleVersion(mains, topoVersion, "worker main class", defaultWorkerGuess);
+ }
+
+ protected String getWorkerLogWriter(SimpleVersion topoVersion) {
+ String defaultGuess = "org.apache.storm.LogWriter";
+ if (topoVersion.getMajor() == 0) {
+ //Prior to the org.apache change
+ defaultGuess = "backtype.storm.LogWriter";
+ }
+ NavigableMap<SimpleVersion,String> mains = Utils.getConfiguredWorkerLogWriterVersions(_conf);
+ return Utils.getCompatibleVersion(mains, topoVersion, "worker log writer class", defaultGuess);
}
@SuppressWarnings("unchecked")
@@ -375,12 +403,13 @@ public class BasicContainer extends Container {
* Compute the classpath for the worker process
* @param stormJar the topology jar
* @param dependencyLocations any dependencies from the topology
+ * @param stormVerison the version of the storm framework to use
* @return the full classpath
*/
- protected String getWorkerClassPath(String stormJar, List<String> dependencyLocations) {
+ protected String getWorkerClassPath(String stormJar, List<String> dependencyLocations, SimpleVersion topoVersion) {
List<String> workercp = new ArrayList<>();
workercp.addAll(asStringList(_topoConf.get(Config.TOPOLOGY_CLASSPATH_BEGINNING)));
- workercp.addAll(frameworkClasspath());
+ workercp.addAll(frameworkClasspath(topoVersion));
workercp.add(stormJar);
workercp.addAll(dependencyLocations);
workercp.addAll(asStringList(_topoConf.get(Config.TOPOLOGY_CLASSPATH)));
@@ -472,14 +501,16 @@ public class BasicContainer extends Container {
return log4jConfigurationDir + Utils.FILE_PATH_SEPARATOR + "worker.xml";
}
- private static class DependencyLocations {
- private List<String> _data = null;
+ private static class TopologyMetaData {
+ private boolean _dataCached = false;
+ private List<String> _depLocs = null;
+ private String _stormVersion = null;
private final Map<String, Object> _conf;
private final String _topologyId;
private final AdvancedFSOps _ops;
private final String _stormRoot;
- public DependencyLocations(final Map<String, Object> conf, final String topologyId, final AdvancedFSOps ops, final String stormRoot) {
+ public TopologyMetaData(final Map<String, Object> conf, final String topologyId, final AdvancedFSOps ops, final String stormRoot) {
_conf = conf;
_topologyId = topologyId;
_ops = ops;
@@ -488,16 +519,15 @@ public class BasicContainer extends Container {
public String toString() {
List<String> data;
+ String stormVersion;
synchronized(this) {
- data = _data;
+ data = _depLocs;
+ stormVersion = _stormVersion;
}
- return "DEP_LOCS for " + _topologyId +" => " + data;
+ return "META for " + _topologyId +" DEP_LOCS => " + data + " STORM_VERSION => " + stormVersion;
}
- public synchronized List<String> get() throws IOException {
- if (_data != null) {
- return _data;
- }
+ private synchronized void readData() throws IOException {
final StormTopology stormTopology = ConfigUtils.readSupervisorTopology(_conf, _topologyId, _ops);
final List<String> dependencyLocations = new ArrayList<>();
if (stormTopology.get_dependency_jars() != null) {
@@ -511,27 +541,42 @@ public class BasicContainer extends Container {
dependencyLocations.add(new File(_stormRoot, dependency).getAbsolutePath());
}
}
- _data = dependencyLocations;
- return _data;
+ _depLocs = dependencyLocations;
+ _stormVersion = stormTopology.get_storm_version();
+ _dataCached = true;
+ }
+
+ public synchronized List<String> getDepLocs() throws IOException {
+ if (!_dataCached) {
+ readData();
+ }
+ return _depLocs;
+ }
+
+ public synchronized String getStormVersion() throws IOException {
+ if (!_dataCached) {
+ readData();
+ }
+ return _stormVersion;
}
}
- static class DepLRUCache {
+ static class TopoMetaLRUCache {
public final int _maxSize = 100; //We could make this configurable in the future...
@SuppressWarnings("serial")
- private LinkedHashMap<String, DependencyLocations> _cache = new LinkedHashMap<String, DependencyLocations>() {
+ private LinkedHashMap<String, TopologyMetaData> _cache = new LinkedHashMap<String, TopologyMetaData>() {
@Override
- protected boolean removeEldestEntry(Map.Entry<String,DependencyLocations> eldest) {
+ protected boolean removeEldestEntry(Map.Entry<String,TopologyMetaData> eldest) {
return (size() > _maxSize);
}
};
- public synchronized DependencyLocations get(final Map<String, Object> conf, final String topologyId, final AdvancedFSOps ops, String stormRoot) {
+ public synchronized TopologyMetaData get(final Map<String, Object> conf, final String topologyId, final AdvancedFSOps ops, String stormRoot) {
//Only go off of the topology id for now.
- DependencyLocations dl = _cache.get(topologyId);
+ TopologyMetaData dl = _cache.get(topologyId);
if (dl == null) {
- _cache.putIfAbsent(topologyId, new DependencyLocations(conf, topologyId, ops, stormRoot));
+ _cache.putIfAbsent(topologyId, new TopologyMetaData(conf, topologyId, ops, stormRoot));
dl = _cache.get(topologyId);
}
return dl;
@@ -542,10 +587,14 @@ public class BasicContainer extends Container {
}
}
- static final DepLRUCache DEP_LOC_CACHE = new DepLRUCache();
+ static final TopoMetaLRUCache TOPO_META_CACHE = new TopoMetaLRUCache();
public static List<String> getDependencyLocationsFor(final Map<String, Object> conf, final String topologyId, final AdvancedFSOps ops, String stormRoot) throws IOException {
- return DEP_LOC_CACHE.get(conf, topologyId, ops, stormRoot).get();
+ return TOPO_META_CACHE.get(conf, topologyId, ops, stormRoot).getDepLocs();
+ }
+
+ public static String getStormVersionFor(final Map<String, Object> conf, final String topologyId, final AdvancedFSOps ops, String stormRoot) throws IOException {
+ return TOPO_META_CACHE.get(conf, topologyId, ops, stormRoot).getStormVersion();
}
/**
@@ -555,10 +604,10 @@ public class BasicContainer extends Container {
* @return the classpath for the topology as command line arguments.
* @throws IOException on any error.
*/
- private List<String> getClassPathParams(final String stormRoot) throws IOException {
+ private List<String> getClassPathParams(final String stormRoot, final SimpleVersion topoVersion) throws IOException {
final String stormJar = ConfigUtils.supervisorStormJarPath(stormRoot);
final List<String> dependencyLocations = getDependencyLocationsFor(_conf, _topologyId, _ops, stormRoot);
- final String workerClassPath = getWorkerClassPath(stormJar, dependencyLocations);
+ final String workerClassPath = getWorkerClassPath(stormJar, dependencyLocations, topoVersion);
List<String> classPathParams = new ArrayList<>();
classPathParams.add("-cp");
@@ -636,17 +685,25 @@ public class BasicContainer extends Container {
final String stormOptions = ConfigUtils.concatIfNotNull(System.getProperty("storm.options"));
final String topoConfFile = ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file"));
final String workerTmpDir = ConfigUtils.workerTmpRoot(_conf, _workerId);
+ String topoVersionString = getStormVersionFor(_conf, _topologyId, _ops, stormRoot);
+ if (topoVersionString == null) {
+ topoVersionString = (String)_conf.getOrDefault(Config.SUPERVISOR_WORKER_DEFAULT_VERSION, VersionInfo.getVersion());
+ }
+ final SimpleVersion topoVersion = new SimpleVersion(topoVersionString);
- List<String> classPathParams = getClassPathParams(stormRoot);
+ List<String> classPathParams = getClassPathParams(stormRoot, topoVersion);
List<String> commonParams = getCommonParams();
List<String> commandList = new ArrayList<>();
- //Log Writer Command...
- commandList.add(javaCmd);
- commandList.addAll(classPathParams);
- commandList.addAll(substituteChildopts(_topoConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS)));
- commandList.addAll(commonParams);
- commandList.add("org.apache.storm.LogWriter"); //The LogWriter in turn launches the actual worker.
+ String logWriter = getWorkerLogWriter(topoVersion);
+ if (logWriter != null) {
+ //Log Writer Command...
+ commandList.add(javaCmd);
+ commandList.addAll(classPathParams);
+ commandList.addAll(substituteChildopts(_topoConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS)));
+ commandList.addAll(commonParams);
+ commandList.add(logWriter); //The LogWriter in turn launches the actual worker.
+ }
//Worker Command...
commandList.add(javaCmd);
@@ -663,7 +720,7 @@ public class BasicContainer extends Container {
commandList.add("-Dstorm.options=" + stormOptions);
commandList.add("-Djava.io.tmpdir=" + workerTmpDir);
commandList.addAll(classPathParams);
- commandList.add("org.apache.storm.daemon.worker.Worker");
+ commandList.add(getWorkerMain(topoVersion));
commandList.add(_topologyId);
commandList.add(_supervisorId);
commandList.add(String.valueOf(_port));
http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
index 51cca0e..dd267e3 100644
--- a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
+++ b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
@@ -37,6 +37,7 @@ import org.apache.storm.generated.LocalAssignment;
import org.apache.storm.generated.ProfileAction;
import org.apache.storm.generated.ProfileRequest;
import org.apache.storm.generated.StormTopology;
+import org.apache.storm.utils.SimpleVersion;
import org.apache.storm.utils.Utils;
import org.apache.storm.utils.LocalState;
import org.junit.Test;
@@ -102,7 +103,7 @@ public class BasicContainerTest {
}
@Override
- protected List<String> frameworkClasspath() {
+ protected List<String> frameworkClasspath(SimpleVersion version) {
//We are not really running anything so make this
// simple to check for
return Arrays.asList("FRAMEWORK_CP");
[4/4] storm git commit: Added STORM-2448 to Changelog
Posted by bo...@apache.org.
Added STORM-2448 to Changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9e31509d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9e31509d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9e31509d
Branch: refs/heads/master
Commit: 9e31509d47c4e91c1009f55c7ccf321d7d7e63aa
Parents: e50c907
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Mon May 22 16:40:16 2017 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Mon May 22 16:40:16 2017 -0500
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/9e31509d/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index f3fba33..7fed540 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 2.0.0
+ * STORM-2448: Add in Storm and JDK versions when submitting a topology.
* STORM-2503: Fix lgtm.com alerts on equality and comparison operations.
* STORM-2499: Add Serialization plugin for EventHub System Properties
* STORM-2520: AutoHDFS should prefer cluster-wise hdfs kerberos principal
[3/4] storm git commit: Merge branch 'STORM-2448' of
https://github.com/revans2/incubator-storm into STORM-2448
Posted by bo...@apache.org.
Merge branch 'STORM-2448' of https://github.com/revans2/incubator-storm into STORM-2448
STORM-2448: Add in Storm and JDK versions when submitting a topology
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e50c907e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e50c907e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e50c907e
Branch: refs/heads/master
Commit: e50c907e3ce17d9a8c71d43104ae36245d715ba4
Parents: d7fb0fb 22d1fe3
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Mon May 22 16:20:02 2017 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Mon May 22 16:20:02 2017 -0500
----------------------------------------------------------------------
.../src/jvm/org/apache/storm/Config.java | 45 ++++
.../jvm/org/apache/storm/StormSubmitter.java | 2 +-
.../org/apache/storm/daemon/StormCommon.java | 75 +++----
.../apache/storm/generated/StormTopology.java | 220 ++++++++++++++++++-
.../apache/storm/generated/TopologyInfo.java | 146 ++++++++++--
.../storm/generated/TopologyPageInfo.java | 134 +++++++++--
.../apache/storm/generated/TopologySummary.java | 146 ++++++++++--
.../apache/storm/topology/TopologyBuilder.java | 2 +-
.../jvm/org/apache/storm/utils/LocalState.java | 8 +-
.../org/apache/storm/utils/SimpleVersion.java | 88 ++++++++
.../src/jvm/org/apache/storm/utils/Utils.java | 210 +++++++++++++++---
.../jvm/org/apache/storm/utils/VersionInfo.java | 6 +-
storm-client/src/py/storm/ttypes.py | 76 ++++++-
storm-client/src/storm.thrift | 5 +
storm-core/src/clj/org/apache/storm/ui/core.clj | 6 +-
.../public/templates/index-page-template.html | 6 +
.../templates/topology-page-template.html | 6 +
.../test/clj/org/apache/storm/nimbus_test.clj | 19 +-
.../java/org/apache/storm/LocalCluster.java | 14 +-
.../org/apache/storm/daemon/nimbus/Nimbus.java | 67 +++++-
.../storm/daemon/supervisor/BasicContainer.java | 127 ++++++++---
.../daemon/supervisor/BasicContainerTest.java | 3 +-
22 files changed, 1203 insertions(+), 208 deletions(-)
----------------------------------------------------------------------
[2/4] storm git commit: STORM-2448: Add in Storm and JDK versions
when submitting a topology
Posted by bo...@apache.org.
STORM-2448: Add in Storm and JDK versions when submitting a topology
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/22d1fe38
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/22d1fe38
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/22d1fe38
Branch: refs/heads/master
Commit: 22d1fe3881f0d62feddcf1cc7b8fcd3e9fed0360
Parents: d7fb0fb
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Apr 4 12:34:49 2017 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon May 22 16:00:03 2017 -0500
----------------------------------------------------------------------
.../src/jvm/org/apache/storm/Config.java | 45 ++++
.../jvm/org/apache/storm/StormSubmitter.java | 2 +-
.../org/apache/storm/daemon/StormCommon.java | 75 +++----
.../apache/storm/generated/StormTopology.java | 220 ++++++++++++++++++-
.../apache/storm/generated/TopologyInfo.java | 146 ++++++++++--
.../storm/generated/TopologyPageInfo.java | 134 +++++++++--
.../apache/storm/generated/TopologySummary.java | 146 ++++++++++--
.../apache/storm/topology/TopologyBuilder.java | 2 +-
.../jvm/org/apache/storm/utils/LocalState.java | 8 +-
.../org/apache/storm/utils/SimpleVersion.java | 88 ++++++++
.../src/jvm/org/apache/storm/utils/Utils.java | 210 +++++++++++++++---
.../jvm/org/apache/storm/utils/VersionInfo.java | 6 +-
storm-client/src/py/storm/ttypes.py | 76 ++++++-
storm-client/src/storm.thrift | 5 +
storm-core/src/clj/org/apache/storm/ui/core.clj | 6 +-
.../public/templates/index-page-template.html | 6 +
.../templates/topology-page-template.html | 6 +
.../test/clj/org/apache/storm/nimbus_test.clj | 19 +-
.../java/org/apache/storm/LocalCluster.java | 14 +-
.../org/apache/storm/daemon/nimbus/Nimbus.java | 67 +++++-
.../storm/daemon/supervisor/BasicContainer.java | 127 ++++++++---
.../daemon/supervisor/BasicContainerTest.java | 3 +-
22 files changed, 1203 insertions(+), 208 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-client/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index e85841c..b1f0381 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -1321,6 +1321,51 @@ public class Config extends HashMap<String, Object> {
public static final String SUPERVISOR_WORKER_LAUNCHER = "supervisor.worker.launcher";
/**
+ * Map a version of storm to a worker classpath that can be used to run it.
+ * This allows the supervisor to select an available version of storm that is compatible with what a
+ * topology was launched with.
+ *
+ * Only the major and minor version numbers are used, although this may change in the
+ * future. The code will first try to find a version that is the same or higher than the requested version,
+ * but with the same major version number. If it cannot it will fall back to using one with a lower
+ * minor version, but in some cases this might fail as some features may be missing.
+ *
+ * Because of how this selection process works please don't include two releases
+ * with the same major and minor versions as it is undefined which will be selected. Also it is good
+ * practice to just include one release for each major version you want to support unless the
+ * minor versions are truly not compatible with each other. This is to avoid
+ * maintenance and testing overhead.
+ *
+ * This config needs to be set on all supervisors and on nimbus. In general this can be the output of
+ * calling storm classpath on the version you want and adding in an entry for the config directory for
+ * that release. You should modify the storm.yaml of each of these versions to match the features
+ * and settings you want on the main version.
+ */
+ @isMapEntryType(keyType = String.class, valueType = String.class)
+ public static final String SUPERVISOR_WORKER_VERSION_CLASSPATH_MAP = "supervisor.worker.version.classpath.map";
+
+ /**
+ * Map a version of storm to a worker's main class. In most cases storm should have correct defaults and
+ * just setting SUPERVISOR_WORKER_VERSION_CLASSPATH_MAP is enough.
+ */
+ @isMapEntryType(keyType = String.class, valueType = String.class)
+ public static final String SUPERVISOR_WORKER_VERSION_MAIN_MAP = "supervisor.worker.version.main.map";
+
+ /**
+ * Map a version of storm to a worker's logwriter class. In most cases storm should have correct defaults and
+ * just setting SUPERVISOR_WORKER_VERSION_CLASSPATH_MAP is enough.
+ */
+ @isMapEntryType(keyType = String.class, valueType = String.class)
+ public static final String SUPERVISOR_WORKER_VERSION_LOGWRITER_MAP = "supervisor.worker.version.logwriter.map";
+
+ /**
+ * The version of storm to assume a topology should run as if not version is given by the client when
+ * submitting the topology.
+ */
+ @isString
+ public static final String SUPERVISOR_WORKER_DEFAULT_VERSION = "supervisor.worker.default.version";
+
+ /**
* A directory on the local filesystem used by Storm for any local
* filesystem usage it needs. The directory must exist and the Storm daemons must
* have permission to read/write from this location.
http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/StormSubmitter.java b/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
index 423c679..788b1a4 100644
--- a/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
+++ b/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
@@ -322,7 +322,7 @@ public class StormSubmitter {
try {
String jar = submitJarAs(conf, System.getProperty("storm.jar"), progressListener, client);
LOG.info("Submitting topology {} in distributed mode with conf {}", name, serConf);
-
+ Utils.addVersions(topology);
if (opts != null) {
client.getClient().submitTopologyWithOpts(name, jar, serConf, topology, opts);
} else {
http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java b/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
index 3b7167d..3f85a13 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
@@ -17,6 +17,16 @@
*/
package org.apache.storm.daemon;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
import org.apache.commons.lang.StringUtils;
import org.apache.storm.Config;
import org.apache.storm.Constants;
@@ -42,6 +52,7 @@ import org.apache.storm.task.IBolt;
import org.apache.storm.task.WorkerTopologyContext;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.ThriftTopologyUtils;
import org.apache.storm.utils.Utils;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ThriftTopologyUtils;
@@ -49,16 +60,6 @@ import org.json.simple.JSONValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-
public class StormCommon {
// A singleton instance allows us to mock delegated static methods in our
// tests by subclassing.
@@ -105,32 +106,30 @@ public class StormCommon {
}
}
- @SuppressWarnings("unchecked")
- private static void validateIds(StormTopology topology) throws InvalidTopologyException {
- List<String> componentIds = new ArrayList<>();
-
- for (StormTopology._Fields field : Thrift.getTopologyFields()) {
- if (!ThriftTopologyUtils.isWorkerHook(field) && !ThriftTopologyUtils.isDependencies(field)) {
- Object value = topology.getFieldValue(field);
- Map<String, Object> componentMap = (Map<String, Object>) value;
- componentIds.addAll(componentMap.keySet());
-
- for (String id : componentMap.keySet()) {
- if (Utils.isSystemId(id)) {
- throw new InvalidTopologyException(id + " is not a valid component id.");
- }
- }
- for (Object componentObj : componentMap.values()) {
- ComponentCommon common = getComponentCommon(componentObj);
- Set<String> streamIds = common.get_streams().keySet();
- for (String id : streamIds) {
- if (Utils.isSystemId(id)) {
- throw new InvalidTopologyException(id + " is not a valid stream id.");
- }
- }
+ private static Set<String> validateIds(Map<String, ? extends Object> componentMap) throws InvalidTopologyException {
+ Set<String> keys = componentMap.keySet();
+ for (String id : keys) {
+ if (Utils.isSystemId(id)) {
+ throw new InvalidTopologyException(id + " is not a valid component id.");
+ }
+ }
+ for (Object componentObj : componentMap.values()) {
+ ComponentCommon common = getComponentCommon(componentObj);
+ Set<String> streamIds = common.get_streams().keySet();
+ for (String id : streamIds) {
+ if (Utils.isSystemId(id)) {
+ throw new InvalidTopologyException(id + " is not a valid stream id.");
}
}
}
+ return keys;
+ }
+
+ private static void validateIds(StormTopology topology) throws InvalidTopologyException {
+ List<String> componentIds = new ArrayList<>();
+ componentIds.addAll(validateIds(topology.get_bolts()));
+ componentIds.addAll(validateIds(topology.get_spouts()));
+ componentIds.addAll(validateIds(topology.get_state_spouts()));
List<String> offending = Utils.getRepeat(componentIds);
if (!offending.isEmpty()) {
@@ -146,15 +145,11 @@ public class StormCommon {
}
}
- @SuppressWarnings("unchecked")
public static Map<String, Object> allComponents(StormTopology topology) {
Map<String, Object> components = new HashMap<>();
- List<StormTopology._Fields> topologyFields = Arrays.asList(Thrift.getTopologyFields());
- for (StormTopology._Fields field : topologyFields) {
- if (!ThriftTopologyUtils.isWorkerHook(field) && !ThriftTopologyUtils.isDependencies(field)) {
- components.putAll(((Map) topology.getFieldValue(field)));
- }
- }
+ components.putAll(topology.get_bolts());
+ components.putAll(topology.get_spouts());
+ components.putAll(topology.get_state_spouts());
return components;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-client/src/jvm/org/apache/storm/generated/StormTopology.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/generated/StormTopology.java b/storm-client/src/jvm/org/apache/storm/generated/StormTopology.java
index caec6c6..6241d7b 100644
--- a/storm-client/src/jvm/org/apache/storm/generated/StormTopology.java
+++ b/storm-client/src/jvm/org/apache/storm/generated/StormTopology.java
@@ -61,6 +61,8 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
private static final org.apache.thrift.protocol.TField WORKER_HOOKS_FIELD_DESC = new org.apache.thrift.protocol.TField("worker_hooks", org.apache.thrift.protocol.TType.LIST, (short)4);
private static final org.apache.thrift.protocol.TField DEPENDENCY_JARS_FIELD_DESC = new org.apache.thrift.protocol.TField("dependency_jars", org.apache.thrift.protocol.TType.LIST, (short)5);
private static final org.apache.thrift.protocol.TField DEPENDENCY_ARTIFACTS_FIELD_DESC = new org.apache.thrift.protocol.TField("dependency_artifacts", org.apache.thrift.protocol.TType.LIST, (short)6);
+ private static final org.apache.thrift.protocol.TField STORM_VERSION_FIELD_DESC = new org.apache.thrift.protocol.TField("storm_version", org.apache.thrift.protocol.TType.STRING, (short)7);
+ private static final org.apache.thrift.protocol.TField JDK_VERSION_FIELD_DESC = new org.apache.thrift.protocol.TField("jdk_version", org.apache.thrift.protocol.TType.STRING, (short)8);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
@@ -74,6 +76,8 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
private List<ByteBuffer> worker_hooks; // optional
private List<String> dependency_jars; // optional
private List<String> dependency_artifacts; // optional
+ private String storm_version; // optional
+ private String jdk_version; // optional
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -82,7 +86,9 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
STATE_SPOUTS((short)3, "state_spouts"),
WORKER_HOOKS((short)4, "worker_hooks"),
DEPENDENCY_JARS((short)5, "dependency_jars"),
- DEPENDENCY_ARTIFACTS((short)6, "dependency_artifacts");
+ DEPENDENCY_ARTIFACTS((short)6, "dependency_artifacts"),
+ STORM_VERSION((short)7, "storm_version"),
+ JDK_VERSION((short)8, "jdk_version");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -109,6 +115,10 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
return DEPENDENCY_JARS;
case 6: // DEPENDENCY_ARTIFACTS
return DEPENDENCY_ARTIFACTS;
+ case 7: // STORM_VERSION
+ return STORM_VERSION;
+ case 8: // JDK_VERSION
+ return JDK_VERSION;
default:
return null;
}
@@ -149,7 +159,7 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
}
// isset id assignments
- private static final _Fields optionals[] = {_Fields.WORKER_HOOKS,_Fields.DEPENDENCY_JARS,_Fields.DEPENDENCY_ARTIFACTS};
+ private static final _Fields optionals[] = {_Fields.WORKER_HOOKS,_Fields.DEPENDENCY_JARS,_Fields.DEPENDENCY_ARTIFACTS,_Fields.STORM_VERSION,_Fields.JDK_VERSION};
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -174,6 +184,10 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
tmpMap.put(_Fields.DEPENDENCY_ARTIFACTS, new org.apache.thrift.meta_data.FieldMetaData("dependency_artifacts", org.apache.thrift.TFieldRequirementType.OPTIONAL,
new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))));
+ tmpMap.put(_Fields.STORM_VERSION, new org.apache.thrift.meta_data.FieldMetaData("storm_version", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.JDK_VERSION, new org.apache.thrift.meta_data.FieldMetaData("jdk_version", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(StormTopology.class, metaDataMap);
}
@@ -253,6 +267,12 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
List<String> __this__dependency_artifacts = new ArrayList<String>(other.dependency_artifacts);
this.dependency_artifacts = __this__dependency_artifacts;
}
+ if (other.is_set_storm_version()) {
+ this.storm_version = other.storm_version;
+ }
+ if (other.is_set_jdk_version()) {
+ this.jdk_version = other.jdk_version;
+ }
}
public StormTopology deepCopy() {
@@ -267,6 +287,8 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
this.worker_hooks = null;
this.dependency_jars = null;
this.dependency_artifacts = null;
+ this.storm_version = null;
+ this.jdk_version = null;
}
public int get_spouts_size() {
@@ -485,6 +507,52 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
}
}
+ public String get_storm_version() {
+ return this.storm_version;
+ }
+
+ public void set_storm_version(String storm_version) {
+ this.storm_version = storm_version;
+ }
+
+ public void unset_storm_version() {
+ this.storm_version = null;
+ }
+
+ /** Returns true if field storm_version is set (has been assigned a value) and false otherwise */
+ public boolean is_set_storm_version() {
+ return this.storm_version != null;
+ }
+
+ public void set_storm_version_isSet(boolean value) {
+ if (!value) {
+ this.storm_version = null;
+ }
+ }
+
+ public String get_jdk_version() {
+ return this.jdk_version;
+ }
+
+ public void set_jdk_version(String jdk_version) {
+ this.jdk_version = jdk_version;
+ }
+
+ public void unset_jdk_version() {
+ this.jdk_version = null;
+ }
+
+ /** Returns true if field jdk_version is set (has been assigned a value) and false otherwise */
+ public boolean is_set_jdk_version() {
+ return this.jdk_version != null;
+ }
+
+ public void set_jdk_version_isSet(boolean value) {
+ if (!value) {
+ this.jdk_version = null;
+ }
+ }
+
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case SPOUTS:
@@ -535,6 +603,22 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
}
break;
+ case STORM_VERSION:
+ if (value == null) {
+ unset_storm_version();
+ } else {
+ set_storm_version((String)value);
+ }
+ break;
+
+ case JDK_VERSION:
+ if (value == null) {
+ unset_jdk_version();
+ } else {
+ set_jdk_version((String)value);
+ }
+ break;
+
}
}
@@ -558,6 +642,12 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
case DEPENDENCY_ARTIFACTS:
return get_dependency_artifacts();
+ case STORM_VERSION:
+ return get_storm_version();
+
+ case JDK_VERSION:
+ return get_jdk_version();
+
}
throw new IllegalStateException();
}
@@ -581,6 +671,10 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
return is_set_dependency_jars();
case DEPENDENCY_ARTIFACTS:
return is_set_dependency_artifacts();
+ case STORM_VERSION:
+ return is_set_storm_version();
+ case JDK_VERSION:
+ return is_set_jdk_version();
}
throw new IllegalStateException();
}
@@ -652,6 +746,24 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
return false;
}
+ boolean this_present_storm_version = true && this.is_set_storm_version();
+ boolean that_present_storm_version = true && that.is_set_storm_version();
+ if (this_present_storm_version || that_present_storm_version) {
+ if (!(this_present_storm_version && that_present_storm_version))
+ return false;
+ if (!this.storm_version.equals(that.storm_version))
+ return false;
+ }
+
+ boolean this_present_jdk_version = true && this.is_set_jdk_version();
+ boolean that_present_jdk_version = true && that.is_set_jdk_version();
+ if (this_present_jdk_version || that_present_jdk_version) {
+ if (!(this_present_jdk_version && that_present_jdk_version))
+ return false;
+ if (!this.jdk_version.equals(that.jdk_version))
+ return false;
+ }
+
return true;
}
@@ -689,6 +801,16 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
if (present_dependency_artifacts)
list.add(dependency_artifacts);
+ boolean present_storm_version = true && (is_set_storm_version());
+ list.add(present_storm_version);
+ if (present_storm_version)
+ list.add(storm_version);
+
+ boolean present_jdk_version = true && (is_set_jdk_version());
+ list.add(present_jdk_version);
+ if (present_jdk_version)
+ list.add(jdk_version);
+
return list.hashCode();
}
@@ -760,6 +882,26 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(is_set_storm_version()).compareTo(other.is_set_storm_version());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_storm_version()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.storm_version, other.storm_version);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(is_set_jdk_version()).compareTo(other.is_set_jdk_version());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_jdk_version()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.jdk_version, other.jdk_version);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -833,6 +975,26 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
}
first = false;
}
+ if (is_set_storm_version()) {
+ if (!first) sb.append(", ");
+ sb.append("storm_version:");
+ if (this.storm_version == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.storm_version);
+ }
+ first = false;
+ }
+ if (is_set_jdk_version()) {
+ if (!first) sb.append(", ");
+ sb.append("jdk_version:");
+ if (this.jdk_version == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.jdk_version);
+ }
+ first = false;
+ }
sb.append(")");
return sb.toString();
}
@@ -1005,6 +1167,22 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
+ case 7: // STORM_VERSION
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.storm_version = iprot.readString();
+ struct.set_storm_version_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 8: // JDK_VERSION
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.jdk_version = iprot.readString();
+ struct.set_jdk_version_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@@ -1099,6 +1277,20 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
oprot.writeFieldEnd();
}
}
+ if (struct.storm_version != null) {
+ if (struct.is_set_storm_version()) {
+ oprot.writeFieldBegin(STORM_VERSION_FIELD_DESC);
+ oprot.writeString(struct.storm_version);
+ oprot.writeFieldEnd();
+ }
+ }
+ if (struct.jdk_version != null) {
+ if (struct.is_set_jdk_version()) {
+ oprot.writeFieldBegin(JDK_VERSION_FIELD_DESC);
+ oprot.writeString(struct.jdk_version);
+ oprot.writeFieldEnd();
+ }
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -1150,7 +1342,13 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
if (struct.is_set_dependency_artifacts()) {
optionals.set(2);
}
- oprot.writeBitSet(optionals, 3);
+ if (struct.is_set_storm_version()) {
+ optionals.set(3);
+ }
+ if (struct.is_set_jdk_version()) {
+ optionals.set(4);
+ }
+ oprot.writeBitSet(optionals, 5);
if (struct.is_set_worker_hooks()) {
{
oprot.writeI32(struct.worker_hooks.size());
@@ -1178,6 +1376,12 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
}
}
}
+ if (struct.is_set_storm_version()) {
+ oprot.writeString(struct.storm_version);
+ }
+ if (struct.is_set_jdk_version()) {
+ oprot.writeString(struct.jdk_version);
+ }
}
@Override
@@ -1225,7 +1429,7 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
}
}
struct.set_state_spouts_isSet(true);
- BitSet incoming = iprot.readBitSet(3);
+ BitSet incoming = iprot.readBitSet(5);
if (incoming.get(0)) {
{
org.apache.thrift.protocol.TList _list89 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, iprot.readI32());
@@ -1265,6 +1469,14 @@ public class StormTopology implements org.apache.thrift.TBase<StormTopology, Sto
}
struct.set_dependency_artifacts_isSet(true);
}
+ if (incoming.get(3)) {
+ struct.storm_version = iprot.readString();
+ struct.set_storm_version_isSet(true);
+ }
+ if (incoming.get(4)) {
+ struct.jdk_version = iprot.readString();
+ struct.set_jdk_version_isSet(true);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-client/src/jvm/org/apache/storm/generated/TopologyInfo.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/generated/TopologyInfo.java b/storm-client/src/jvm/org/apache/storm/generated/TopologyInfo.java
index fa95be7..622bd81 100644
--- a/storm-client/src/jvm/org/apache/storm/generated/TopologyInfo.java
+++ b/storm-client/src/jvm/org/apache/storm/generated/TopologyInfo.java
@@ -62,6 +62,7 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
private static final org.apache.thrift.protocol.TField STATUS_FIELD_DESC = new org.apache.thrift.protocol.TField("status", org.apache.thrift.protocol.TType.STRING, (short)5);
private static final org.apache.thrift.protocol.TField ERRORS_FIELD_DESC = new org.apache.thrift.protocol.TField("errors", org.apache.thrift.protocol.TType.MAP, (short)6);
private static final org.apache.thrift.protocol.TField COMPONENT_DEBUG_FIELD_DESC = new org.apache.thrift.protocol.TField("component_debug", org.apache.thrift.protocol.TType.MAP, (short)7);
+ private static final org.apache.thrift.protocol.TField STORM_VERSION_FIELD_DESC = new org.apache.thrift.protocol.TField("storm_version", org.apache.thrift.protocol.TType.STRING, (short)8);
private static final org.apache.thrift.protocol.TField SCHED_STATUS_FIELD_DESC = new org.apache.thrift.protocol.TField("sched_status", org.apache.thrift.protocol.TType.STRING, (short)513);
private static final org.apache.thrift.protocol.TField OWNER_FIELD_DESC = new org.apache.thrift.protocol.TField("owner", org.apache.thrift.protocol.TType.STRING, (short)514);
private static final org.apache.thrift.protocol.TField REPLICATION_COUNT_FIELD_DESC = new org.apache.thrift.protocol.TField("replication_count", org.apache.thrift.protocol.TType.I32, (short)515);
@@ -85,6 +86,7 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
private String status; // required
private Map<String,List<ErrorInfo>> errors; // required
private Map<String,DebugOptions> component_debug; // optional
+ private String storm_version; // optional
private String sched_status; // optional
private String owner; // optional
private int replication_count; // optional
@@ -104,6 +106,7 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
STATUS((short)5, "status"),
ERRORS((short)6, "errors"),
COMPONENT_DEBUG((short)7, "component_debug"),
+ STORM_VERSION((short)8, "storm_version"),
SCHED_STATUS((short)513, "sched_status"),
OWNER((short)514, "owner"),
REPLICATION_COUNT((short)515, "replication_count"),
@@ -141,6 +144,8 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
return ERRORS;
case 7: // COMPONENT_DEBUG
return COMPONENT_DEBUG;
+ case 8: // STORM_VERSION
+ return STORM_VERSION;
case 513: // SCHED_STATUS
return SCHED_STATUS;
case 514: // OWNER
@@ -208,7 +213,7 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
private static final int __ASSIGNED_MEMOFFHEAP_ISSET_ID = 6;
private static final int __ASSIGNED_CPU_ISSET_ID = 7;
private byte __isset_bitfield = 0;
- private static final _Fields optionals[] = {_Fields.COMPONENT_DEBUG,_Fields.SCHED_STATUS,_Fields.OWNER,_Fields.REPLICATION_COUNT,_Fields.REQUESTED_MEMONHEAP,_Fields.REQUESTED_MEMOFFHEAP,_Fields.REQUESTED_CPU,_Fields.ASSIGNED_MEMONHEAP,_Fields.ASSIGNED_MEMOFFHEAP,_Fields.ASSIGNED_CPU};
+ private static final _Fields optionals[] = {_Fields.COMPONENT_DEBUG,_Fields.STORM_VERSION,_Fields.SCHED_STATUS,_Fields.OWNER,_Fields.REPLICATION_COUNT,_Fields.REQUESTED_MEMONHEAP,_Fields.REQUESTED_MEMOFFHEAP,_Fields.REQUESTED_CPU,_Fields.ASSIGNED_MEMONHEAP,_Fields.ASSIGNED_MEMOFFHEAP,_Fields.ASSIGNED_CPU};
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -232,6 +237,8 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING),
new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, DebugOptions.class))));
+ tmpMap.put(_Fields.STORM_VERSION, new org.apache.thrift.meta_data.FieldMetaData("storm_version", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
tmpMap.put(_Fields.SCHED_STATUS, new org.apache.thrift.meta_data.FieldMetaData("sched_status", org.apache.thrift.TFieldRequirementType.OPTIONAL,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
tmpMap.put(_Fields.OWNER, new org.apache.thrift.meta_data.FieldMetaData("owner", org.apache.thrift.TFieldRequirementType.OPTIONAL,
@@ -330,6 +337,9 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
}
this.component_debug = __this__component_debug;
}
+ if (other.is_set_storm_version()) {
+ this.storm_version = other.storm_version;
+ }
if (other.is_set_sched_status()) {
this.sched_status = other.sched_status;
}
@@ -359,6 +369,7 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
this.status = null;
this.errors = null;
this.component_debug = null;
+ this.storm_version = null;
this.sched_status = null;
this.owner = null;
set_replication_count_isSet(false);
@@ -574,6 +585,29 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
}
}
+ public String get_storm_version() {
+ return this.storm_version;
+ }
+
+ public void set_storm_version(String storm_version) {
+ this.storm_version = storm_version;
+ }
+
+ public void unset_storm_version() {
+ this.storm_version = null;
+ }
+
+ /** Returns true if field storm_version is set (has been assigned a value) and false otherwise */
+ public boolean is_set_storm_version() {
+ return this.storm_version != null;
+ }
+
+ public void set_storm_version_isSet(boolean value) {
+ if (!value) {
+ this.storm_version = null;
+ }
+ }
+
public String get_sched_status() {
return this.sched_status;
}
@@ -832,6 +866,14 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
}
break;
+ case STORM_VERSION:
+ if (value == null) {
+ unset_storm_version();
+ } else {
+ set_storm_version((String)value);
+ }
+ break;
+
case SCHED_STATUS:
if (value == null) {
unset_sched_status();
@@ -930,6 +972,9 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
case COMPONENT_DEBUG:
return get_component_debug();
+ case STORM_VERSION:
+ return get_storm_version();
+
case SCHED_STATUS:
return get_sched_status();
@@ -982,6 +1027,8 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
return is_set_errors();
case COMPONENT_DEBUG:
return is_set_component_debug();
+ case STORM_VERSION:
+ return is_set_storm_version();
case SCHED_STATUS:
return is_set_sched_status();
case OWNER:
@@ -1080,6 +1127,15 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
return false;
}
+ boolean this_present_storm_version = true && this.is_set_storm_version();
+ boolean that_present_storm_version = true && that.is_set_storm_version();
+ if (this_present_storm_version || that_present_storm_version) {
+ if (!(this_present_storm_version && that_present_storm_version))
+ return false;
+ if (!this.storm_version.equals(that.storm_version))
+ return false;
+ }
+
boolean this_present_sched_status = true && this.is_set_sched_status();
boolean that_present_sched_status = true && that.is_set_sched_status();
if (this_present_sched_status || that_present_sched_status) {
@@ -1203,6 +1259,11 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
if (present_component_debug)
list.add(component_debug);
+ boolean present_storm_version = true && (is_set_storm_version());
+ list.add(present_storm_version);
+ if (present_storm_version)
+ list.add(storm_version);
+
boolean present_sched_status = true && (is_set_sched_status());
list.add(present_sched_status);
if (present_sched_status)
@@ -1329,6 +1390,16 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(is_set_storm_version()).compareTo(other.is_set_storm_version());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_storm_version()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.storm_version, other.storm_version);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
lastComparison = Boolean.valueOf(is_set_sched_status()).compareTo(other.is_set_sched_status());
if (lastComparison != 0) {
return lastComparison;
@@ -1492,6 +1563,16 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
}
first = false;
}
+ if (is_set_storm_version()) {
+ if (!first) sb.append(", ");
+ sb.append("storm_version:");
+ if (this.storm_version == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.storm_version);
+ }
+ first = false;
+ }
if (is_set_sched_status()) {
if (!first) sb.append(", ");
sb.append("sched_status:");
@@ -1726,6 +1807,14 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
+ case 8: // STORM_VERSION
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.storm_version = iprot.readString();
+ struct.set_storm_version_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
case 513: // SCHED_STATUS
if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
struct.sched_status = iprot.readString();
@@ -1876,6 +1965,13 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
oprot.writeFieldEnd();
}
}
+ if (struct.storm_version != null) {
+ if (struct.is_set_storm_version()) {
+ oprot.writeFieldBegin(STORM_VERSION_FIELD_DESC);
+ oprot.writeString(struct.storm_version);
+ oprot.writeFieldEnd();
+ }
+ }
if (struct.sched_status != null) {
if (struct.is_set_sched_status()) {
oprot.writeFieldBegin(SCHED_STATUS_FIELD_DESC);
@@ -1971,34 +2067,37 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
if (struct.is_set_component_debug()) {
optionals.set(0);
}
- if (struct.is_set_sched_status()) {
+ if (struct.is_set_storm_version()) {
optionals.set(1);
}
- if (struct.is_set_owner()) {
+ if (struct.is_set_sched_status()) {
optionals.set(2);
}
- if (struct.is_set_replication_count()) {
+ if (struct.is_set_owner()) {
optionals.set(3);
}
- if (struct.is_set_requested_memonheap()) {
+ if (struct.is_set_replication_count()) {
optionals.set(4);
}
- if (struct.is_set_requested_memoffheap()) {
+ if (struct.is_set_requested_memonheap()) {
optionals.set(5);
}
- if (struct.is_set_requested_cpu()) {
+ if (struct.is_set_requested_memoffheap()) {
optionals.set(6);
}
- if (struct.is_set_assigned_memonheap()) {
+ if (struct.is_set_requested_cpu()) {
optionals.set(7);
}
- if (struct.is_set_assigned_memoffheap()) {
+ if (struct.is_set_assigned_memonheap()) {
optionals.set(8);
}
- if (struct.is_set_assigned_cpu()) {
+ if (struct.is_set_assigned_memoffheap()) {
optionals.set(9);
}
- oprot.writeBitSet(optionals, 10);
+ if (struct.is_set_assigned_cpu()) {
+ optionals.set(10);
+ }
+ oprot.writeBitSet(optionals, 11);
if (struct.is_set_component_debug()) {
{
oprot.writeI32(struct.component_debug.size());
@@ -2009,6 +2108,9 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
}
}
}
+ if (struct.is_set_storm_version()) {
+ oprot.writeString(struct.storm_version);
+ }
if (struct.is_set_sched_status()) {
oprot.writeString(struct.sched_status);
}
@@ -2084,7 +2186,7 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
}
}
struct.set_errors_isSet(true);
- BitSet incoming = iprot.readBitSet(10);
+ BitSet incoming = iprot.readBitSet(11);
if (incoming.get(0)) {
{
org.apache.thrift.protocol.TMap _map364 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
@@ -2102,38 +2204,42 @@ public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, Topol
struct.set_component_debug_isSet(true);
}
if (incoming.get(1)) {
+ struct.storm_version = iprot.readString();
+ struct.set_storm_version_isSet(true);
+ }
+ if (incoming.get(2)) {
struct.sched_status = iprot.readString();
struct.set_sched_status_isSet(true);
}
- if (incoming.get(2)) {
+ if (incoming.get(3)) {
struct.owner = iprot.readString();
struct.set_owner_isSet(true);
}
- if (incoming.get(3)) {
+ if (incoming.get(4)) {
struct.replication_count = iprot.readI32();
struct.set_replication_count_isSet(true);
}
- if (incoming.get(4)) {
+ if (incoming.get(5)) {
struct.requested_memonheap = iprot.readDouble();
struct.set_requested_memonheap_isSet(true);
}
- if (incoming.get(5)) {
+ if (incoming.get(6)) {
struct.requested_memoffheap = iprot.readDouble();
struct.set_requested_memoffheap_isSet(true);
}
- if (incoming.get(6)) {
+ if (incoming.get(7)) {
struct.requested_cpu = iprot.readDouble();
struct.set_requested_cpu_isSet(true);
}
- if (incoming.get(7)) {
+ if (incoming.get(8)) {
struct.assigned_memonheap = iprot.readDouble();
struct.set_assigned_memonheap_isSet(true);
}
- if (incoming.get(8)) {
+ if (incoming.get(9)) {
struct.assigned_memoffheap = iprot.readDouble();
struct.set_assigned_memoffheap_isSet(true);
}
- if (incoming.get(9)) {
+ if (incoming.get(10)) {
struct.assigned_cpu = iprot.readDouble();
struct.set_assigned_cpu_isSet(true);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-client/src/jvm/org/apache/storm/generated/TopologyPageInfo.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/generated/TopologyPageInfo.java b/storm-client/src/jvm/org/apache/storm/generated/TopologyPageInfo.java
index 25bda95..04c2232 100644
--- a/storm-client/src/jvm/org/apache/storm/generated/TopologyPageInfo.java
+++ b/storm-client/src/jvm/org/apache/storm/generated/TopologyPageInfo.java
@@ -71,6 +71,7 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
private static final org.apache.thrift.protocol.TField DEBUG_OPTIONS_FIELD_DESC = new org.apache.thrift.protocol.TField("debug_options", org.apache.thrift.protocol.TType.STRUCT, (short)14);
private static final org.apache.thrift.protocol.TField REPLICATION_COUNT_FIELD_DESC = new org.apache.thrift.protocol.TField("replication_count", org.apache.thrift.protocol.TType.I32, (short)15);
private static final org.apache.thrift.protocol.TField WORKERS_FIELD_DESC = new org.apache.thrift.protocol.TField("workers", org.apache.thrift.protocol.TType.LIST, (short)16);
+ private static final org.apache.thrift.protocol.TField STORM_VERSION_FIELD_DESC = new org.apache.thrift.protocol.TField("storm_version", org.apache.thrift.protocol.TType.STRING, (short)17);
private static final org.apache.thrift.protocol.TField REQUESTED_MEMONHEAP_FIELD_DESC = new org.apache.thrift.protocol.TField("requested_memonheap", org.apache.thrift.protocol.TType.DOUBLE, (short)521);
private static final org.apache.thrift.protocol.TField REQUESTED_MEMOFFHEAP_FIELD_DESC = new org.apache.thrift.protocol.TField("requested_memoffheap", org.apache.thrift.protocol.TType.DOUBLE, (short)522);
private static final org.apache.thrift.protocol.TField REQUESTED_CPU_FIELD_DESC = new org.apache.thrift.protocol.TField("requested_cpu", org.apache.thrift.protocol.TType.DOUBLE, (short)523);
@@ -100,6 +101,7 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
private DebugOptions debug_options; // optional
private int replication_count; // optional
private List<WorkerSummary> workers; // optional
+ private String storm_version; // optional
private double requested_memonheap; // optional
private double requested_memoffheap; // optional
private double requested_cpu; // optional
@@ -125,6 +127,7 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
DEBUG_OPTIONS((short)14, "debug_options"),
REPLICATION_COUNT((short)15, "replication_count"),
WORKERS((short)16, "workers"),
+ STORM_VERSION((short)17, "storm_version"),
REQUESTED_MEMONHEAP((short)521, "requested_memonheap"),
REQUESTED_MEMOFFHEAP((short)522, "requested_memoffheap"),
REQUESTED_CPU((short)523, "requested_cpu"),
@@ -177,6 +180,8 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
return REPLICATION_COUNT;
case 16: // WORKERS
return WORKERS;
+ case 17: // STORM_VERSION
+ return STORM_VERSION;
case 521: // REQUESTED_MEMONHEAP
return REQUESTED_MEMONHEAP;
case 522: // REQUESTED_MEMOFFHEAP
@@ -241,7 +246,7 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
private static final int __ASSIGNED_MEMOFFHEAP_ISSET_ID = 9;
private static final int __ASSIGNED_CPU_ISSET_ID = 10;
private short __isset_bitfield = 0;
- private static final _Fields optionals[] = {_Fields.NAME,_Fields.UPTIME_SECS,_Fields.STATUS,_Fields.NUM_TASKS,_Fields.NUM_WORKERS,_Fields.NUM_EXECUTORS,_Fields.TOPOLOGY_CONF,_Fields.ID_TO_SPOUT_AGG_STATS,_Fields.ID_TO_BOLT_AGG_STATS,_Fields.SCHED_STATUS,_Fields.TOPOLOGY_STATS,_Fields.OWNER,_Fields.DEBUG_OPTIONS,_Fields.REPLICATION_COUNT,_Fields.WORKERS,_Fields.REQUESTED_MEMONHEAP,_Fields.REQUESTED_MEMOFFHEAP,_Fields.REQUESTED_CPU,_Fields.ASSIGNED_MEMONHEAP,_Fields.ASSIGNED_MEMOFFHEAP,_Fields.ASSIGNED_CPU};
+ private static final _Fields optionals[] = {_Fields.NAME,_Fields.UPTIME_SECS,_Fields.STATUS,_Fields.NUM_TASKS,_Fields.NUM_WORKERS,_Fields.NUM_EXECUTORS,_Fields.TOPOLOGY_CONF,_Fields.ID_TO_SPOUT_AGG_STATS,_Fields.ID_TO_BOLT_AGG_STATS,_Fields.SCHED_STATUS,_Fields.TOPOLOGY_STATS,_Fields.OWNER,_Fields.DEBUG_OPTIONS,_Fields.REPLICATION_COUNT,_Fields.WORKERS,_Fields.STORM_VERSION,_Fields.REQUESTED_MEMONHEAP,_Fields.REQUESTED_MEMOFFHEAP,_Fields.REQUESTED_CPU,_Fields.ASSIGNED_MEMONHEAP,_Fields.ASSIGNED_MEMOFFHEAP,_Fields.ASSIGNED_CPU};
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -282,6 +287,8 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
tmpMap.put(_Fields.WORKERS, new org.apache.thrift.meta_data.FieldMetaData("workers", org.apache.thrift.TFieldRequirementType.OPTIONAL,
new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST,
new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, WorkerSummary.class))));
+ tmpMap.put(_Fields.STORM_VERSION, new org.apache.thrift.meta_data.FieldMetaData("storm_version", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
tmpMap.put(_Fields.REQUESTED_MEMONHEAP, new org.apache.thrift.meta_data.FieldMetaData("requested_memonheap", org.apache.thrift.TFieldRequirementType.OPTIONAL,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.DOUBLE)));
tmpMap.put(_Fields.REQUESTED_MEMOFFHEAP, new org.apache.thrift.meta_data.FieldMetaData("requested_memoffheap", org.apache.thrift.TFieldRequirementType.OPTIONAL,
@@ -379,6 +386,9 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
}
this.workers = __this__workers;
}
+ if (other.is_set_storm_version()) {
+ this.storm_version = other.storm_version;
+ }
this.requested_memonheap = other.requested_memonheap;
this.requested_memoffheap = other.requested_memoffheap;
this.requested_cpu = other.requested_cpu;
@@ -414,6 +424,7 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
set_replication_count_isSet(false);
this.replication_count = 0;
this.workers = null;
+ this.storm_version = null;
set_requested_memonheap_isSet(false);
this.requested_memonheap = 0.0;
set_requested_memoffheap_isSet(false);
@@ -828,6 +839,29 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
}
}
+ public String get_storm_version() {
+ return this.storm_version;
+ }
+
+ public void set_storm_version(String storm_version) {
+ this.storm_version = storm_version;
+ }
+
+ public void unset_storm_version() {
+ this.storm_version = null;
+ }
+
+ /** Returns true if field storm_version is set (has been assigned a value) and false otherwise */
+ public boolean is_set_storm_version() {
+ return this.storm_version != null;
+ }
+
+ public void set_storm_version_isSet(boolean value) {
+ if (!value) {
+ this.storm_version = null;
+ }
+ }
+
public double get_requested_memonheap() {
return this.requested_memonheap;
}
@@ -1090,6 +1124,14 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
}
break;
+ case STORM_VERSION:
+ if (value == null) {
+ unset_storm_version();
+ } else {
+ set_storm_version((String)value);
+ }
+ break;
+
case REQUESTED_MEMONHEAP:
if (value == null) {
unset_requested_memonheap();
@@ -1191,6 +1233,9 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
case WORKERS:
return get_workers();
+ case STORM_VERSION:
+ return get_storm_version();
+
case REQUESTED_MEMONHEAP:
return get_requested_memonheap();
@@ -1252,6 +1297,8 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
return is_set_replication_count();
case WORKERS:
return is_set_workers();
+ case STORM_VERSION:
+ return is_set_storm_version();
case REQUESTED_MEMONHEAP:
return is_set_requested_memonheap();
case REQUESTED_MEMOFFHEAP:
@@ -1425,6 +1472,15 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
return false;
}
+ boolean this_present_storm_version = true && this.is_set_storm_version();
+ boolean that_present_storm_version = true && that.is_set_storm_version();
+ if (this_present_storm_version || that_present_storm_version) {
+ if (!(this_present_storm_version && that_present_storm_version))
+ return false;
+ if (!this.storm_version.equals(that.storm_version))
+ return false;
+ }
+
boolean this_present_requested_memonheap = true && this.is_set_requested_memonheap();
boolean that_present_requested_memonheap = true && that.is_set_requested_memonheap();
if (this_present_requested_memonheap || that_present_requested_memonheap) {
@@ -1566,6 +1622,11 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
if (present_workers)
list.add(workers);
+ boolean present_storm_version = true && (is_set_storm_version());
+ list.add(present_storm_version);
+ if (present_storm_version)
+ list.add(storm_version);
+
boolean present_requested_memonheap = true && (is_set_requested_memonheap());
list.add(present_requested_memonheap);
if (present_requested_memonheap)
@@ -1767,6 +1828,16 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(is_set_storm_version()).compareTo(other.is_set_storm_version());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_storm_version()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.storm_version, other.storm_version);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
lastComparison = Boolean.valueOf(is_set_requested_memonheap()).compareTo(other.is_set_requested_memonheap());
if (lastComparison != 0) {
return lastComparison;
@@ -1984,6 +2055,16 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
}
first = false;
}
+ if (is_set_storm_version()) {
+ if (!first) sb.append(", ");
+ sb.append("storm_version:");
+ if (this.storm_version == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.storm_version);
+ }
+ first = false;
+ }
if (is_set_requested_memonheap()) {
if (!first) sb.append(", ");
sb.append("requested_memonheap:");
@@ -2242,6 +2323,14 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
+ case 17: // STORM_VERSION
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.storm_version = iprot.readString();
+ struct.set_storm_version_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
case 521: // REQUESTED_MEMONHEAP
if (schemeField.type == org.apache.thrift.protocol.TType.DOUBLE) {
struct.requested_memonheap = iprot.readDouble();
@@ -2426,6 +2515,13 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
oprot.writeFieldEnd();
}
}
+ if (struct.storm_version != null) {
+ if (struct.is_set_storm_version()) {
+ oprot.writeFieldBegin(STORM_VERSION_FIELD_DESC);
+ oprot.writeString(struct.storm_version);
+ oprot.writeFieldEnd();
+ }
+ }
if (struct.is_set_requested_memonheap()) {
oprot.writeFieldBegin(REQUESTED_MEMONHEAP_FIELD_DESC);
oprot.writeDouble(struct.requested_memonheap);
@@ -2520,25 +2616,28 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
if (struct.is_set_workers()) {
optionals.set(14);
}
- if (struct.is_set_requested_memonheap()) {
+ if (struct.is_set_storm_version()) {
optionals.set(15);
}
- if (struct.is_set_requested_memoffheap()) {
+ if (struct.is_set_requested_memonheap()) {
optionals.set(16);
}
- if (struct.is_set_requested_cpu()) {
+ if (struct.is_set_requested_memoffheap()) {
optionals.set(17);
}
- if (struct.is_set_assigned_memonheap()) {
+ if (struct.is_set_requested_cpu()) {
optionals.set(18);
}
- if (struct.is_set_assigned_memoffheap()) {
+ if (struct.is_set_assigned_memonheap()) {
optionals.set(19);
}
- if (struct.is_set_assigned_cpu()) {
+ if (struct.is_set_assigned_memoffheap()) {
optionals.set(20);
}
- oprot.writeBitSet(optionals, 21);
+ if (struct.is_set_assigned_cpu()) {
+ optionals.set(21);
+ }
+ oprot.writeBitSet(optionals, 22);
if (struct.is_set_name()) {
oprot.writeString(struct.name);
}
@@ -2604,6 +2703,9 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
}
}
}
+ if (struct.is_set_storm_version()) {
+ oprot.writeString(struct.storm_version);
+ }
if (struct.is_set_requested_memonheap()) {
oprot.writeDouble(struct.requested_memonheap);
}
@@ -2629,7 +2731,7 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
TTupleProtocol iprot = (TTupleProtocol) prot;
struct.id = iprot.readString();
struct.set_id_isSet(true);
- BitSet incoming = iprot.readBitSet(21);
+ BitSet incoming = iprot.readBitSet(22);
if (incoming.get(0)) {
struct.name = iprot.readString();
struct.set_name_isSet(true);
@@ -2727,26 +2829,30 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf
struct.set_workers_isSet(true);
}
if (incoming.get(15)) {
+ struct.storm_version = iprot.readString();
+ struct.set_storm_version_isSet(true);
+ }
+ if (incoming.get(16)) {
struct.requested_memonheap = iprot.readDouble();
struct.set_requested_memonheap_isSet(true);
}
- if (incoming.get(16)) {
+ if (incoming.get(17)) {
struct.requested_memoffheap = iprot.readDouble();
struct.set_requested_memoffheap_isSet(true);
}
- if (incoming.get(17)) {
+ if (incoming.get(18)) {
struct.requested_cpu = iprot.readDouble();
struct.set_requested_cpu_isSet(true);
}
- if (incoming.get(18)) {
+ if (incoming.get(19)) {
struct.assigned_memonheap = iprot.readDouble();
struct.set_assigned_memonheap_isSet(true);
}
- if (incoming.get(19)) {
+ if (incoming.get(20)) {
struct.assigned_memoffheap = iprot.readDouble();
struct.set_assigned_memoffheap_isSet(true);
}
- if (incoming.get(20)) {
+ if (incoming.get(21)) {
struct.assigned_cpu = iprot.readDouble();
struct.set_assigned_cpu_isSet(true);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-client/src/jvm/org/apache/storm/generated/TopologySummary.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/generated/TopologySummary.java b/storm-client/src/jvm/org/apache/storm/generated/TopologySummary.java
index 8b7cd75..39547e4 100644
--- a/storm-client/src/jvm/org/apache/storm/generated/TopologySummary.java
+++ b/storm-client/src/jvm/org/apache/storm/generated/TopologySummary.java
@@ -62,6 +62,7 @@ public class TopologySummary implements org.apache.thrift.TBase<TopologySummary,
private static final org.apache.thrift.protocol.TField NUM_WORKERS_FIELD_DESC = new org.apache.thrift.protocol.TField("num_workers", org.apache.thrift.protocol.TType.I32, (short)5);
private static final org.apache.thrift.protocol.TField UPTIME_SECS_FIELD_DESC = new org.apache.thrift.protocol.TField("uptime_secs", org.apache.thrift.protocol.TType.I32, (short)6);
private static final org.apache.thrift.protocol.TField STATUS_FIELD_DESC = new org.apache.thrift.protocol.TField("status", org.apache.thrift.protocol.TType.STRING, (short)7);
+ private static final org.apache.thrift.protocol.TField STORM_VERSION_FIELD_DESC = new org.apache.thrift.protocol.TField("storm_version", org.apache.thrift.protocol.TType.STRING, (short)8);
private static final org.apache.thrift.protocol.TField SCHED_STATUS_FIELD_DESC = new org.apache.thrift.protocol.TField("sched_status", org.apache.thrift.protocol.TType.STRING, (short)513);
private static final org.apache.thrift.protocol.TField OWNER_FIELD_DESC = new org.apache.thrift.protocol.TField("owner", org.apache.thrift.protocol.TType.STRING, (short)514);
private static final org.apache.thrift.protocol.TField REPLICATION_COUNT_FIELD_DESC = new org.apache.thrift.protocol.TField("replication_count", org.apache.thrift.protocol.TType.I32, (short)515);
@@ -85,6 +86,7 @@ public class TopologySummary implements org.apache.thrift.TBase<TopologySummary,
private int num_workers; // required
private int uptime_secs; // required
private String status; // required
+ private String storm_version; // optional
private String sched_status; // optional
private String owner; // optional
private int replication_count; // optional
@@ -104,6 +106,7 @@ public class TopologySummary implements org.apache.thrift.TBase<TopologySummary,
NUM_WORKERS((short)5, "num_workers"),
UPTIME_SECS((short)6, "uptime_secs"),
STATUS((short)7, "status"),
+ STORM_VERSION((short)8, "storm_version"),
SCHED_STATUS((short)513, "sched_status"),
OWNER((short)514, "owner"),
REPLICATION_COUNT((short)515, "replication_count"),
@@ -141,6 +144,8 @@ public class TopologySummary implements org.apache.thrift.TBase<TopologySummary,
return UPTIME_SECS;
case 7: // STATUS
return STATUS;
+ case 8: // STORM_VERSION
+ return STORM_VERSION;
case 513: // SCHED_STATUS
return SCHED_STATUS;
case 514: // OWNER
@@ -211,7 +216,7 @@ public class TopologySummary implements org.apache.thrift.TBase<TopologySummary,
private static final int __ASSIGNED_MEMOFFHEAP_ISSET_ID = 9;
private static final int __ASSIGNED_CPU_ISSET_ID = 10;
private short __isset_bitfield = 0;
- private static final _Fields optionals[] = {_Fields.SCHED_STATUS,_Fields.OWNER,_Fields.REPLICATION_COUNT,_Fields.REQUESTED_MEMONHEAP,_Fields.REQUESTED_MEMOFFHEAP,_Fields.REQUESTED_CPU,_Fields.ASSIGNED_MEMONHEAP,_Fields.ASSIGNED_MEMOFFHEAP,_Fields.ASSIGNED_CPU};
+ private static final _Fields optionals[] = {_Fields.STORM_VERSION,_Fields.SCHED_STATUS,_Fields.OWNER,_Fields.REPLICATION_COUNT,_Fields.REQUESTED_MEMONHEAP,_Fields.REQUESTED_MEMOFFHEAP,_Fields.REQUESTED_CPU,_Fields.ASSIGNED_MEMONHEAP,_Fields.ASSIGNED_MEMOFFHEAP,_Fields.ASSIGNED_CPU};
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -229,6 +234,8 @@ public class TopologySummary implements org.apache.thrift.TBase<TopologySummary,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
tmpMap.put(_Fields.STATUS, new org.apache.thrift.meta_data.FieldMetaData("status", org.apache.thrift.TFieldRequirementType.REQUIRED,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.STORM_VERSION, new org.apache.thrift.meta_data.FieldMetaData("storm_version", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
tmpMap.put(_Fields.SCHED_STATUS, new org.apache.thrift.meta_data.FieldMetaData("sched_status", org.apache.thrift.TFieldRequirementType.OPTIONAL,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
tmpMap.put(_Fields.OWNER, new org.apache.thrift.meta_data.FieldMetaData("owner", org.apache.thrift.TFieldRequirementType.OPTIONAL,
@@ -295,6 +302,9 @@ public class TopologySummary implements org.apache.thrift.TBase<TopologySummary,
if (other.is_set_status()) {
this.status = other.status;
}
+ if (other.is_set_storm_version()) {
+ this.storm_version = other.storm_version;
+ }
if (other.is_set_sched_status()) {
this.sched_status = other.sched_status;
}
@@ -327,6 +337,7 @@ public class TopologySummary implements org.apache.thrift.TBase<TopologySummary,
set_uptime_secs_isSet(false);
this.uptime_secs = 0;
this.status = null;
+ this.storm_version = null;
this.sched_status = null;
this.owner = null;
set_replication_count_isSet(false);
@@ -502,6 +513,29 @@ public class TopologySummary implements org.apache.thrift.TBase<TopologySummary,
}
}
+ public String get_storm_version() {
+ return this.storm_version;
+ }
+
+ public void set_storm_version(String storm_version) {
+ this.storm_version = storm_version;
+ }
+
+ public void unset_storm_version() {
+ this.storm_version = null;
+ }
+
+ /** Returns true if field storm_version is set (has been assigned a value) and false otherwise */
+ public boolean is_set_storm_version() {
+ return this.storm_version != null;
+ }
+
+ public void set_storm_version_isSet(boolean value) {
+ if (!value) {
+ this.storm_version = null;
+ }
+ }
+
public String get_sched_status() {
return this.sched_status;
}
@@ -760,6 +794,14 @@ public class TopologySummary implements org.apache.thrift.TBase<TopologySummary,
}
break;
+ case STORM_VERSION:
+ if (value == null) {
+ unset_storm_version();
+ } else {
+ set_storm_version((String)value);
+ }
+ break;
+
case SCHED_STATUS:
if (value == null) {
unset_sched_status();
@@ -858,6 +900,9 @@ public class TopologySummary implements org.apache.thrift.TBase<TopologySummary,
case STATUS:
return get_status();
+ case STORM_VERSION:
+ return get_storm_version();
+
case SCHED_STATUS:
return get_sched_status();
@@ -910,6 +955,8 @@ public class TopologySummary implements org.apache.thrift.TBase<TopologySummary,
return is_set_uptime_secs();
case STATUS:
return is_set_status();
+ case STORM_VERSION:
+ return is_set_storm_version();
case SCHED_STATUS:
return is_set_sched_status();
case OWNER:
@@ -1008,6 +1055,15 @@ public class TopologySummary implements org.apache.thrift.TBase<TopologySummary,
return false;
}
+ boolean this_present_storm_version = true && this.is_set_storm_version();
+ boolean that_present_storm_version = true && that.is_set_storm_version();
+ if (this_present_storm_version || that_present_storm_version) {
+ if (!(this_present_storm_version && that_present_storm_version))
+ return false;
+ if (!this.storm_version.equals(that.storm_version))
+ return false;
+ }
+
boolean this_present_sched_status = true && this.is_set_sched_status();
boolean that_present_sched_status = true && that.is_set_sched_status();
if (this_present_sched_status || that_present_sched_status) {
@@ -1131,6 +1187,11 @@ public class TopologySummary implements org.apache.thrift.TBase<TopologySummary,
if (present_status)
list.add(status);
+ boolean present_storm_version = true && (is_set_storm_version());
+ list.add(present_storm_version);
+ if (present_storm_version)
+ list.add(storm_version);
+
boolean present_sched_status = true && (is_set_sched_status());
list.add(present_sched_status);
if (present_sched_status)
@@ -1257,6 +1318,16 @@ public class TopologySummary implements org.apache.thrift.TBase<TopologySummary,
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(is_set_storm_version()).compareTo(other.is_set_storm_version());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_storm_version()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.storm_version, other.storm_version);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
lastComparison = Boolean.valueOf(is_set_sched_status()).compareTo(other.is_set_sched_status());
if (lastComparison != 0) {
return lastComparison;
@@ -1406,6 +1477,16 @@ public class TopologySummary implements org.apache.thrift.TBase<TopologySummary,
sb.append(this.status);
}
first = false;
+ if (is_set_storm_version()) {
+ if (!first) sb.append(", ");
+ sb.append("storm_version:");
+ if (this.storm_version == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.storm_version);
+ }
+ first = false;
+ }
if (is_set_sched_status()) {
if (!first) sb.append(", ");
sb.append("sched_status:");
@@ -1597,6 +1678,14 @@ public class TopologySummary implements org.apache.thrift.TBase<TopologySummary,
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
+ case 8: // STORM_VERSION
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.storm_version = iprot.readString();
+ struct.set_storm_version_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
case 513: // SCHED_STATUS
if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
struct.sched_status = iprot.readString();
@@ -1709,6 +1798,13 @@ public class TopologySummary implements org.apache.thrift.TBase<TopologySummary,
oprot.writeString(struct.status);
oprot.writeFieldEnd();
}
+ if (struct.storm_version != null) {
+ if (struct.is_set_storm_version()) {
+ oprot.writeFieldBegin(STORM_VERSION_FIELD_DESC);
+ oprot.writeString(struct.storm_version);
+ oprot.writeFieldEnd();
+ }
+ }
if (struct.sched_status != null) {
if (struct.is_set_sched_status()) {
oprot.writeFieldBegin(SCHED_STATUS_FIELD_DESC);
@@ -1783,34 +1879,40 @@ public class TopologySummary implements org.apache.thrift.TBase<TopologySummary,
oprot.writeI32(struct.uptime_secs);
oprot.writeString(struct.status);
BitSet optionals = new BitSet();
- if (struct.is_set_sched_status()) {
+ if (struct.is_set_storm_version()) {
optionals.set(0);
}
- if (struct.is_set_owner()) {
+ if (struct.is_set_sched_status()) {
optionals.set(1);
}
- if (struct.is_set_replication_count()) {
+ if (struct.is_set_owner()) {
optionals.set(2);
}
- if (struct.is_set_requested_memonheap()) {
+ if (struct.is_set_replication_count()) {
optionals.set(3);
}
- if (struct.is_set_requested_memoffheap()) {
+ if (struct.is_set_requested_memonheap()) {
optionals.set(4);
}
- if (struct.is_set_requested_cpu()) {
+ if (struct.is_set_requested_memoffheap()) {
optionals.set(5);
}
- if (struct.is_set_assigned_memonheap()) {
+ if (struct.is_set_requested_cpu()) {
optionals.set(6);
}
- if (struct.is_set_assigned_memoffheap()) {
+ if (struct.is_set_assigned_memonheap()) {
optionals.set(7);
}
- if (struct.is_set_assigned_cpu()) {
+ if (struct.is_set_assigned_memoffheap()) {
optionals.set(8);
}
- oprot.writeBitSet(optionals, 9);
+ if (struct.is_set_assigned_cpu()) {
+ optionals.set(9);
+ }
+ oprot.writeBitSet(optionals, 10);
+ if (struct.is_set_storm_version()) {
+ oprot.writeString(struct.storm_version);
+ }
if (struct.is_set_sched_status()) {
oprot.writeString(struct.sched_status);
}
@@ -1857,40 +1959,44 @@ public class TopologySummary implements org.apache.thrift.TBase<TopologySummary,
struct.set_uptime_secs_isSet(true);
struct.status = iprot.readString();
struct.set_status_isSet(true);
- BitSet incoming = iprot.readBitSet(9);
+ BitSet incoming = iprot.readBitSet(10);
if (incoming.get(0)) {
+ struct.storm_version = iprot.readString();
+ struct.set_storm_version_isSet(true);
+ }
+ if (incoming.get(1)) {
struct.sched_status = iprot.readString();
struct.set_sched_status_isSet(true);
}
- if (incoming.get(1)) {
+ if (incoming.get(2)) {
struct.owner = iprot.readString();
struct.set_owner_isSet(true);
}
- if (incoming.get(2)) {
+ if (incoming.get(3)) {
struct.replication_count = iprot.readI32();
struct.set_replication_count_isSet(true);
}
- if (incoming.get(3)) {
+ if (incoming.get(4)) {
struct.requested_memonheap = iprot.readDouble();
struct.set_requested_memonheap_isSet(true);
}
- if (incoming.get(4)) {
+ if (incoming.get(5)) {
struct.requested_memoffheap = iprot.readDouble();
struct.set_requested_memoffheap_isSet(true);
}
- if (incoming.get(5)) {
+ if (incoming.get(6)) {
struct.requested_cpu = iprot.readDouble();
struct.set_requested_cpu_isSet(true);
}
- if (incoming.get(6)) {
+ if (incoming.get(7)) {
struct.assigned_memonheap = iprot.readDouble();
struct.set_assigned_memonheap_isSet(true);
}
- if (incoming.get(7)) {
+ if (incoming.get(8)) {
struct.assigned_memoffheap = iprot.readDouble();
struct.set_assigned_memoffheap_isSet(true);
}
- if (incoming.get(8)) {
+ if (incoming.get(9)) {
struct.assigned_cpu = iprot.readDouble();
struct.set_assigned_cpu_isSet(true);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java b/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java
index 6542c7e..47cbd2d 100644
--- a/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java
+++ b/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java
@@ -158,7 +158,7 @@ public class TopologyBuilder {
stormTopology.set_worker_hooks(_workerHooks);
- return stormTopology;
+ return Utils.addVersions(stormTopology);
}
/**
http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-client/src/jvm/org/apache/storm/utils/LocalState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/LocalState.java b/storm-client/src/jvm/org/apache/storm/utils/LocalState.java
index 2f0bb60..29310f5 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/LocalState.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/LocalState.java
@@ -84,7 +84,13 @@ public class LocalState {
private TBase deserialize(ThriftSerializedObject obj, TDeserializer td) {
try {
- Class<?> clazz = Class.forName(obj.get_name());
+ Class<?> clazz;
+ try {
+ clazz = Class.forName(obj.get_name());
+ } catch (ClassNotFoundException ex) {
+ //Try to maintain rolling upgrade compatible with 0.10 releases
+ clazz = Class.forName(obj.get_name().replaceAll("^backtype\\.storm\\.", "org.apache.storm."));
+ }
TBase instance = (TBase) clazz.newInstance();
td.deserialize(instance, obj.get_bits());
return instance;
http://git-wip-us.apache.org/repos/asf/storm/blob/22d1fe38/storm-client/src/jvm/org/apache/storm/utils/SimpleVersion.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/SimpleVersion.java b/storm-client/src/jvm/org/apache/storm/utils/SimpleVersion.java
new file mode 100644
index 0000000..471b049
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/utils/SimpleVersion.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.utils;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Take a version string and parse out a Major.Minor version
+ */
+public class SimpleVersion implements Comparable <SimpleVersion> {
+ private final int _major;
+ private final int _minor;
+
+ private static final Pattern VERSION_PATTERN = Pattern.compile("(\\d+)[.-_]+(\\d+).*");
+
+ public SimpleVersion(String version) {
+ Matcher m = VERSION_PATTERN.matcher(version);
+ int maj = -1;
+ int min = -1;
+ if (!m.matches()) {
+ //Unknown should only happen during compilation or some unit tests.
+ if (!"Unknown".equals(version)) {
+ throw new IllegalArgumentException("Cannot parse " + version);
+ }
+ } else {
+ maj = Integer.valueOf(m.group(1));
+ min = Integer.valueOf(m.group(2));
+ }
+ _major = maj;
+ _minor = min;
+ }
+
+ public int getMajor() {
+ return _major;
+ }
+
+ public int getMinor() {
+ return _minor;
+ }
+
+ @Override
+ public int hashCode() {
+ return (Integer.hashCode(_major) * 17) & Integer.hashCode(_minor);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == this) {
+ return true;
+ }
+
+ if (!(o instanceof SimpleVersion)) {
+ return false;
+ }
+
+ return compareTo((SimpleVersion)o) == 0;
+ }
+
+ @Override
+ public int compareTo(SimpleVersion o) {
+ int ret = Integer.compare(_major, o._major);
+ if (ret == 0) {
+ ret = Integer.compare(_minor, o._minor);
+ }
+ return ret;
+ }
+
+ @Override
+ public String toString() {
+ return _major + "." + _minor;
+ }
+}