You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/05/15 20:02:45 UTC
[03/13] storm git commit: STORM-762: uptime for worker heartbeats is
lost when converted to thrift
STORM-762: uptime for worker heartbeats is lost when converted to thrift
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/24e3b988
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/24e3b988
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/24e3b988
Branch: refs/heads/0.10.x-branch
Commit: 24e3b98820c78be4ae03f52c66a9b36bb04ea0dc
Parents: 0085319
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Wed Apr 8 08:46:32 2015 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 12:56:59 2015 -0400
----------------------------------------------------------------------
storm-core/src/clj/backtype/storm/converter.clj | 3 +-
.../storm/generated/ClusterWorkerHeartbeat.java | 101 ++++++++++++++++++-
storm-core/src/py/__init__.py | 2 +
storm-core/src/py/storm/DistributedRPC.py | 2 +
.../src/py/storm/DistributedRPCInvocations.py | 2 +
storm-core/src/py/storm/Nimbus.py | 2 +
storm-core/src/py/storm/__init__.py | 2 +
storm-core/src/py/storm/constants.py | 2 +
storm-core/src/py/storm/ttypes.py | 19 +++-
storm-core/src/storm.thrift | 1 +
10 files changed, 131 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/24e3b988/storm-core/src/clj/backtype/storm/converter.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/converter.clj b/storm-core/src/clj/backtype/storm/converter.clj
index 6102ced..ae66fa5 100644
--- a/storm-core/src/clj/backtype/storm/converter.clj
+++ b/storm-core/src/clj/backtype/storm/converter.clj
@@ -162,7 +162,7 @@
(if worker-hb
{:storm-id (.get_storm_id worker-hb)
:executor-stats (clojurify-stats (into {} (.get_executor_stats worker-hb)))
- :uptime (time-delta (.get_time_secs worker-hb))
+ :uptime (.get_uptime_secs worker-hb)
:time-secs (.get_time_secs worker-hb)
}
{}))
@@ -170,6 +170,7 @@
(defn thriftify-zk-worker-hb [worker-hb]
(if (not-empty (filter second (:executor-stats worker-hb)))
(doto (ClusterWorkerHeartbeat.)
+ (.set_uptime_secs (:uptime worker-hb))
(.set_storm_id (:storm-id worker-hb))
(.set_executor_stats (thriftify-stats (filter second (:executor-stats worker-hb))))
(.set_time_secs (:time-secs worker-hb)))))
http://git-wip-us.apache.org/repos/asf/storm/blob/24e3b988/storm-core/src/jvm/backtype/storm/generated/ClusterWorkerHeartbeat.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/ClusterWorkerHeartbeat.java b/storm-core/src/jvm/backtype/storm/generated/ClusterWorkerHeartbeat.java
index fb04e3a..30a99b3 100644
--- a/storm-core/src/jvm/backtype/storm/generated/ClusterWorkerHeartbeat.java
+++ b/storm-core/src/jvm/backtype/storm/generated/ClusterWorkerHeartbeat.java
@@ -51,13 +51,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-3-11")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-4-8")
public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWorkerHeartbeat, ClusterWorkerHeartbeat._Fields>, java.io.Serializable, Cloneable, Comparable<ClusterWorkerHeartbeat> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ClusterWorkerHeartbeat");
private static final org.apache.thrift.protocol.TField STORM_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("storm_id", org.apache.thrift.protocol.TType.STRING, (short)1);
private static final org.apache.thrift.protocol.TField EXECUTOR_STATS_FIELD_DESC = new org.apache.thrift.protocol.TField("executor_stats", org.apache.thrift.protocol.TType.MAP, (short)2);
private static final org.apache.thrift.protocol.TField TIME_SECS_FIELD_DESC = new org.apache.thrift.protocol.TField("time_secs", org.apache.thrift.protocol.TType.I32, (short)3);
+ private static final org.apache.thrift.protocol.TField UPTIME_SECS_FIELD_DESC = new org.apache.thrift.protocol.TField("uptime_secs", org.apache.thrift.protocol.TType.I32, (short)4);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
@@ -68,12 +69,14 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
private String storm_id; // required
private Map<ExecutorInfo,ExecutorStats> executor_stats; // required
private int time_secs; // required
+ private int uptime_secs; // required
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
STORM_ID((short)1, "storm_id"),
EXECUTOR_STATS((short)2, "executor_stats"),
- TIME_SECS((short)3, "time_secs");
+ TIME_SECS((short)3, "time_secs"),
+ UPTIME_SECS((short)4, "uptime_secs");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -94,6 +97,8 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
return EXECUTOR_STATS;
case 3: // TIME_SECS
return TIME_SECS;
+ case 4: // UPTIME_SECS
+ return UPTIME_SECS;
default:
return null;
}
@@ -135,6 +140,7 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
// isset id assignments
private static final int __TIME_SECS_ISSET_ID = 0;
+ private static final int __UPTIME_SECS_ISSET_ID = 1;
private byte __isset_bitfield = 0;
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
@@ -147,6 +153,8 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, ExecutorStats.class))));
tmpMap.put(_Fields.TIME_SECS, new org.apache.thrift.meta_data.FieldMetaData("time_secs", org.apache.thrift.TFieldRequirementType.REQUIRED,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
+ tmpMap.put(_Fields.UPTIME_SECS, new org.apache.thrift.meta_data.FieldMetaData("uptime_secs", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ClusterWorkerHeartbeat.class, metaDataMap);
}
@@ -157,13 +165,16 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
public ClusterWorkerHeartbeat(
String storm_id,
Map<ExecutorInfo,ExecutorStats> executor_stats,
- int time_secs)
+ int time_secs,
+ int uptime_secs)
{
this();
this.storm_id = storm_id;
this.executor_stats = executor_stats;
this.time_secs = time_secs;
set_time_secs_isSet(true);
+ this.uptime_secs = uptime_secs;
+ set_uptime_secs_isSet(true);
}
/**
@@ -190,6 +201,7 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
this.executor_stats = __this__executor_stats;
}
this.time_secs = other.time_secs;
+ this.uptime_secs = other.uptime_secs;
}
public ClusterWorkerHeartbeat deepCopy() {
@@ -202,6 +214,8 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
this.executor_stats = null;
set_time_secs_isSet(false);
this.time_secs = 0;
+ set_uptime_secs_isSet(false);
+ this.uptime_secs = 0;
}
public String get_storm_id() {
@@ -283,6 +297,28 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
__isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __TIME_SECS_ISSET_ID, value);
}
+ public int get_uptime_secs() {
+ return this.uptime_secs;
+ }
+
+ public void set_uptime_secs(int uptime_secs) {
+ this.uptime_secs = uptime_secs;
+ set_uptime_secs_isSet(true);
+ }
+
+ public void unset_uptime_secs() {
+ __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __UPTIME_SECS_ISSET_ID);
+ }
+
+ /** Returns true if field uptime_secs is set (has been assigned a value) and false otherwise */
+ public boolean is_set_uptime_secs() {
+ return EncodingUtils.testBit(__isset_bitfield, __UPTIME_SECS_ISSET_ID);
+ }
+
+ public void set_uptime_secs_isSet(boolean value) {
+ __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __UPTIME_SECS_ISSET_ID, value);
+ }
+
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case STORM_ID:
@@ -309,6 +345,14 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
}
break;
+ case UPTIME_SECS:
+ if (value == null) {
+ unset_uptime_secs();
+ } else {
+ set_uptime_secs((Integer)value);
+ }
+ break;
+
}
}
@@ -323,6 +367,9 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
case TIME_SECS:
return Integer.valueOf(get_time_secs());
+ case UPTIME_SECS:
+ return Integer.valueOf(get_uptime_secs());
+
}
throw new IllegalStateException();
}
@@ -340,6 +387,8 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
return is_set_executor_stats();
case TIME_SECS:
return is_set_time_secs();
+ case UPTIME_SECS:
+ return is_set_uptime_secs();
}
throw new IllegalStateException();
}
@@ -384,6 +433,15 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
return false;
}
+ boolean this_present_uptime_secs = true;
+ boolean that_present_uptime_secs = true;
+ if (this_present_uptime_secs || that_present_uptime_secs) {
+ if (!(this_present_uptime_secs && that_present_uptime_secs))
+ return false;
+ if (this.uptime_secs != that.uptime_secs)
+ return false;
+ }
+
return true;
}
@@ -406,6 +464,11 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
if (present_time_secs)
list.add(time_secs);
+ boolean present_uptime_secs = true;
+ list.add(present_uptime_secs);
+ if (present_uptime_secs)
+ list.add(uptime_secs);
+
return list.hashCode();
}
@@ -447,6 +510,16 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(is_set_uptime_secs()).compareTo(other.is_set_uptime_secs());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_uptime_secs()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.uptime_secs, other.uptime_secs);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -486,6 +559,10 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
sb.append("time_secs:");
sb.append(this.time_secs);
first = false;
+ if (!first) sb.append(", ");
+ sb.append("uptime_secs:");
+ sb.append(this.uptime_secs);
+ first = false;
sb.append(")");
return sb.toString();
}
@@ -504,6 +581,10 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
throw new org.apache.thrift.protocol.TProtocolException("Required field 'time_secs' is unset! Struct:" + toString());
}
+ if (!is_set_uptime_secs()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'uptime_secs' is unset! Struct:" + toString());
+ }
+
// check for sub-struct validity
}
@@ -581,6 +662,14 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
+ case 4: // UPTIME_SECS
+ if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
+ struct.uptime_secs = iprot.readI32();
+ struct.set_uptime_secs_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@@ -615,6 +704,9 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
oprot.writeFieldBegin(TIME_SECS_FIELD_DESC);
oprot.writeI32(struct.time_secs);
oprot.writeFieldEnd();
+ oprot.writeFieldBegin(UPTIME_SECS_FIELD_DESC);
+ oprot.writeI32(struct.uptime_secs);
+ oprot.writeFieldEnd();
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -642,6 +734,7 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
}
}
oprot.writeI32(struct.time_secs);
+ oprot.writeI32(struct.uptime_secs);
}
@Override
@@ -666,6 +759,8 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
struct.set_executor_stats_isSet(true);
struct.time_secs = iprot.readI32();
struct.set_time_secs_isSet(true);
+ struct.uptime_secs = iprot.readI32();
+ struct.set_uptime_secs_isSet(true);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/24e3b988/storm-core/src/py/__init__.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/__init__.py b/storm-core/src/py/__init__.py
index 59dd060..0896fcd 100644
--- a/storm-core/src/py/__init__.py
+++ b/storm-core/src/py/__init__.py
@@ -1,3 +1,5 @@
+#!/usr/bin/env python
+#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
http://git-wip-us.apache.org/repos/asf/storm/blob/24e3b988/storm-core/src/py/storm/DistributedRPC.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/DistributedRPC.py b/storm-core/src/py/storm/DistributedRPC.py
index 1728434..330499c 100644
--- a/storm-core/src/py/storm/DistributedRPC.py
+++ b/storm-core/src/py/storm/DistributedRPC.py
@@ -1,3 +1,5 @@
+#!/usr/bin/env python
+#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
http://git-wip-us.apache.org/repos/asf/storm/blob/24e3b988/storm-core/src/py/storm/DistributedRPCInvocations.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/DistributedRPCInvocations.py b/storm-core/src/py/storm/DistributedRPCInvocations.py
index fddbbe5..493fcc7 100644
--- a/storm-core/src/py/storm/DistributedRPCInvocations.py
+++ b/storm-core/src/py/storm/DistributedRPCInvocations.py
@@ -1,3 +1,5 @@
+#!/usr/bin/env python
+#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
http://git-wip-us.apache.org/repos/asf/storm/blob/24e3b988/storm-core/src/py/storm/Nimbus.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/Nimbus.py b/storm-core/src/py/storm/Nimbus.py
index 38e137d..b952e3c 100644
--- a/storm-core/src/py/storm/Nimbus.py
+++ b/storm-core/src/py/storm/Nimbus.py
@@ -1,3 +1,5 @@
+#!/usr/bin/env python
+#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
http://git-wip-us.apache.org/repos/asf/storm/blob/24e3b988/storm-core/src/py/storm/__init__.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/__init__.py b/storm-core/src/py/storm/__init__.py
index 3692381..9ecdc2b 100644
--- a/storm-core/src/py/storm/__init__.py
+++ b/storm-core/src/py/storm/__init__.py
@@ -1,3 +1,5 @@
+#!/usr/bin/env python
+#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
http://git-wip-us.apache.org/repos/asf/storm/blob/24e3b988/storm-core/src/py/storm/constants.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/constants.py b/storm-core/src/py/storm/constants.py
index 8f77f7c..3f0c64a 100644
--- a/storm-core/src/py/storm/constants.py
+++ b/storm-core/src/py/storm/constants.py
@@ -1,3 +1,5 @@
+#!/usr/bin/env python
+#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
http://git-wip-us.apache.org/repos/asf/storm/blob/24e3b988/storm-core/src/py/storm/ttypes.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/ttypes.py b/storm-core/src/py/storm/ttypes.py
index f683dfe..512e666 100644
--- a/storm-core/src/py/storm/ttypes.py
+++ b/storm-core/src/py/storm/ttypes.py
@@ -1,3 +1,5 @@
+#!/usr/bin/env python
+#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -5255,6 +5257,7 @@ class ClusterWorkerHeartbeat:
- storm_id
- executor_stats
- time_secs
+ - uptime_secs
"""
thrift_spec = (
@@ -5262,12 +5265,14 @@ class ClusterWorkerHeartbeat:
(1, TType.STRING, 'storm_id', None, None, ), # 1
(2, TType.MAP, 'executor_stats', (TType.STRUCT,(ExecutorInfo, ExecutorInfo.thrift_spec),TType.STRUCT,(ExecutorStats, ExecutorStats.thrift_spec)), None, ), # 2
(3, TType.I32, 'time_secs', None, None, ), # 3
+ (4, TType.I32, 'uptime_secs', None, None, ), # 4
)
- def __init__(self, storm_id=None, executor_stats=None, time_secs=None,):
+ def __init__(self, storm_id=None, executor_stats=None, time_secs=None, uptime_secs=None,):
self.storm_id = storm_id
self.executor_stats = executor_stats
self.time_secs = time_secs
+ self.uptime_secs = uptime_secs
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -5301,6 +5306,11 @@ class ClusterWorkerHeartbeat:
self.time_secs = iprot.readI32();
else:
iprot.skip(ftype)
+ elif fid == 4:
+ if ftype == TType.I32:
+ self.uptime_secs = iprot.readI32();
+ else:
+ iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
@@ -5327,6 +5337,10 @@ class ClusterWorkerHeartbeat:
oprot.writeFieldBegin('time_secs', TType.I32, 3)
oprot.writeI32(self.time_secs)
oprot.writeFieldEnd()
+ if self.uptime_secs is not None:
+ oprot.writeFieldBegin('uptime_secs', TType.I32, 4)
+ oprot.writeI32(self.uptime_secs)
+ oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
@@ -5337,6 +5351,8 @@ class ClusterWorkerHeartbeat:
raise TProtocol.TProtocolException(message='Required field executor_stats is unset!')
if self.time_secs is None:
raise TProtocol.TProtocolException(message='Required field time_secs is unset!')
+ if self.uptime_secs is None:
+ raise TProtocol.TProtocolException(message='Required field uptime_secs is unset!')
return
@@ -5345,6 +5361,7 @@ class ClusterWorkerHeartbeat:
value = (value * 31) ^ hash(self.storm_id)
value = (value * 31) ^ hash(self.executor_stats)
value = (value * 31) ^ hash(self.time_secs)
+ value = (value * 31) ^ hash(self.uptime_secs)
return value
def __repr__(self):
http://git-wip-us.apache.org/repos/asf/storm/blob/24e3b988/storm-core/src/storm.thrift
----------------------------------------------------------------------
diff --git a/storm-core/src/storm.thrift b/storm-core/src/storm.thrift
index db4a7b3..3971a43 100644
--- a/storm-core/src/storm.thrift
+++ b/storm-core/src/storm.thrift
@@ -292,6 +292,7 @@ struct ClusterWorkerHeartbeat {
1: required string storm_id;
2: required map<ExecutorInfo,ExecutorStats> executor_stats;
3: required i32 time_secs;
+ 4: required i32 uptime_secs;
}
enum NumErrorsChoice {