You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/05/15 20:02:45 UTC

[03/13] storm git commit: STORM-762: uptime for worker heartbeats is lost when converted to thrift

STORM-762: uptime for worker heartbeats is lost when converted to thrift


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/24e3b988
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/24e3b988
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/24e3b988

Branch: refs/heads/0.10.x-branch
Commit: 24e3b98820c78be4ae03f52c66a9b36bb04ea0dc
Parents: 0085319
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Wed Apr 8 08:46:32 2015 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 12:56:59 2015 -0400

----------------------------------------------------------------------
 storm-core/src/clj/backtype/storm/converter.clj |   3 +-
 .../storm/generated/ClusterWorkerHeartbeat.java | 101 ++++++++++++++++++-
 storm-core/src/py/__init__.py                   |   2 +
 storm-core/src/py/storm/DistributedRPC.py       |   2 +
 .../src/py/storm/DistributedRPCInvocations.py   |   2 +
 storm-core/src/py/storm/Nimbus.py               |   2 +
 storm-core/src/py/storm/__init__.py             |   2 +
 storm-core/src/py/storm/constants.py            |   2 +
 storm-core/src/py/storm/ttypes.py               |  19 +++-
 storm-core/src/storm.thrift                     |   1 +
 10 files changed, 131 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/24e3b988/storm-core/src/clj/backtype/storm/converter.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/converter.clj b/storm-core/src/clj/backtype/storm/converter.clj
index 6102ced..ae66fa5 100644
--- a/storm-core/src/clj/backtype/storm/converter.clj
+++ b/storm-core/src/clj/backtype/storm/converter.clj
@@ -162,7 +162,7 @@
   (if worker-hb
     {:storm-id (.get_storm_id worker-hb)
      :executor-stats (clojurify-stats (into {} (.get_executor_stats worker-hb)))
-     :uptime (time-delta (.get_time_secs worker-hb))
+     :uptime (.get_uptime_secs worker-hb)
      :time-secs (.get_time_secs worker-hb)
      }
     {}))
@@ -170,6 +170,7 @@
 (defn thriftify-zk-worker-hb [worker-hb]
   (if (not-empty (filter second (:executor-stats worker-hb)))
     (doto (ClusterWorkerHeartbeat.)
+      (.set_uptime_secs (:uptime worker-hb))
       (.set_storm_id (:storm-id worker-hb))
       (.set_executor_stats (thriftify-stats (filter second (:executor-stats worker-hb))))
       (.set_time_secs (:time-secs worker-hb)))))

http://git-wip-us.apache.org/repos/asf/storm/blob/24e3b988/storm-core/src/jvm/backtype/storm/generated/ClusterWorkerHeartbeat.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/ClusterWorkerHeartbeat.java b/storm-core/src/jvm/backtype/storm/generated/ClusterWorkerHeartbeat.java
index fb04e3a..30a99b3 100644
--- a/storm-core/src/jvm/backtype/storm/generated/ClusterWorkerHeartbeat.java
+++ b/storm-core/src/jvm/backtype/storm/generated/ClusterWorkerHeartbeat.java
@@ -51,13 +51,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-3-11")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-4-8")
 public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWorkerHeartbeat, ClusterWorkerHeartbeat._Fields>, java.io.Serializable, Cloneable, Comparable<ClusterWorkerHeartbeat> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ClusterWorkerHeartbeat");
 
   private static final org.apache.thrift.protocol.TField STORM_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("storm_id", org.apache.thrift.protocol.TType.STRING, (short)1);
   private static final org.apache.thrift.protocol.TField EXECUTOR_STATS_FIELD_DESC = new org.apache.thrift.protocol.TField("executor_stats", org.apache.thrift.protocol.TType.MAP, (short)2);
   private static final org.apache.thrift.protocol.TField TIME_SECS_FIELD_DESC = new org.apache.thrift.protocol.TField("time_secs", org.apache.thrift.protocol.TType.I32, (short)3);
+  private static final org.apache.thrift.protocol.TField UPTIME_SECS_FIELD_DESC = new org.apache.thrift.protocol.TField("uptime_secs", org.apache.thrift.protocol.TType.I32, (short)4);
 
   private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
   static {
@@ -68,12 +69,14 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
   private String storm_id; // required
   private Map<ExecutorInfo,ExecutorStats> executor_stats; // required
   private int time_secs; // required
+  private int uptime_secs; // required
 
   /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
     STORM_ID((short)1, "storm_id"),
     EXECUTOR_STATS((short)2, "executor_stats"),
-    TIME_SECS((short)3, "time_secs");
+    TIME_SECS((short)3, "time_secs"),
+    UPTIME_SECS((short)4, "uptime_secs");
 
     private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
 
@@ -94,6 +97,8 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
           return EXECUTOR_STATS;
         case 3: // TIME_SECS
           return TIME_SECS;
+        case 4: // UPTIME_SECS
+          return UPTIME_SECS;
         default:
           return null;
       }
@@ -135,6 +140,7 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
 
   // isset id assignments
   private static final int __TIME_SECS_ISSET_ID = 0;
+  private static final int __UPTIME_SECS_ISSET_ID = 1;
   private byte __isset_bitfield = 0;
   public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
   static {
@@ -147,6 +153,8 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
             new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, ExecutorStats.class))));
     tmpMap.put(_Fields.TIME_SECS, new org.apache.thrift.meta_data.FieldMetaData("time_secs", org.apache.thrift.TFieldRequirementType.REQUIRED, 
         new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
+    tmpMap.put(_Fields.UPTIME_SECS, new org.apache.thrift.meta_data.FieldMetaData("uptime_secs", org.apache.thrift.TFieldRequirementType.REQUIRED, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
     metaDataMap = Collections.unmodifiableMap(tmpMap);
     org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ClusterWorkerHeartbeat.class, metaDataMap);
   }
@@ -157,13 +165,16 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
   public ClusterWorkerHeartbeat(
     String storm_id,
     Map<ExecutorInfo,ExecutorStats> executor_stats,
-    int time_secs)
+    int time_secs,
+    int uptime_secs)
   {
     this();
     this.storm_id = storm_id;
     this.executor_stats = executor_stats;
     this.time_secs = time_secs;
     set_time_secs_isSet(true);
+    this.uptime_secs = uptime_secs;
+    set_uptime_secs_isSet(true);
   }
 
   /**
@@ -190,6 +201,7 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
       this.executor_stats = __this__executor_stats;
     }
     this.time_secs = other.time_secs;
+    this.uptime_secs = other.uptime_secs;
   }
 
   public ClusterWorkerHeartbeat deepCopy() {
@@ -202,6 +214,8 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
     this.executor_stats = null;
     set_time_secs_isSet(false);
     this.time_secs = 0;
+    set_uptime_secs_isSet(false);
+    this.uptime_secs = 0;
   }
 
   public String get_storm_id() {
@@ -283,6 +297,28 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
     __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __TIME_SECS_ISSET_ID, value);
   }
 
+  public int get_uptime_secs() {
+    return this.uptime_secs;
+  }
+
+  public void set_uptime_secs(int uptime_secs) {
+    this.uptime_secs = uptime_secs;
+    set_uptime_secs_isSet(true);
+  }
+
+  public void unset_uptime_secs() {
+    __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __UPTIME_SECS_ISSET_ID);
+  }
+
+  /** Returns true if field uptime_secs is set (has been assigned a value) and false otherwise */
+  public boolean is_set_uptime_secs() {
+    return EncodingUtils.testBit(__isset_bitfield, __UPTIME_SECS_ISSET_ID);
+  }
+
+  public void set_uptime_secs_isSet(boolean value) {
+    __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __UPTIME_SECS_ISSET_ID, value);
+  }
+
   public void setFieldValue(_Fields field, Object value) {
     switch (field) {
     case STORM_ID:
@@ -309,6 +345,14 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
       }
       break;
 
+    case UPTIME_SECS:
+      if (value == null) {
+        unset_uptime_secs();
+      } else {
+        set_uptime_secs((Integer)value);
+      }
+      break;
+
     }
   }
 
@@ -323,6 +367,9 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
     case TIME_SECS:
       return Integer.valueOf(get_time_secs());
 
+    case UPTIME_SECS:
+      return Integer.valueOf(get_uptime_secs());
+
     }
     throw new IllegalStateException();
   }
@@ -340,6 +387,8 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
       return is_set_executor_stats();
     case TIME_SECS:
       return is_set_time_secs();
+    case UPTIME_SECS:
+      return is_set_uptime_secs();
     }
     throw new IllegalStateException();
   }
@@ -384,6 +433,15 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
         return false;
     }
 
+    boolean this_present_uptime_secs = true;
+    boolean that_present_uptime_secs = true;
+    if (this_present_uptime_secs || that_present_uptime_secs) {
+      if (!(this_present_uptime_secs && that_present_uptime_secs))
+        return false;
+      if (this.uptime_secs != that.uptime_secs)
+        return false;
+    }
+
     return true;
   }
 
@@ -406,6 +464,11 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
     if (present_time_secs)
       list.add(time_secs);
 
+    boolean present_uptime_secs = true;
+    list.add(present_uptime_secs);
+    if (present_uptime_secs)
+      list.add(uptime_secs);
+
     return list.hashCode();
   }
 
@@ -447,6 +510,16 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
         return lastComparison;
       }
     }
+    lastComparison = Boolean.valueOf(is_set_uptime_secs()).compareTo(other.is_set_uptime_secs());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_uptime_secs()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.uptime_secs, other.uptime_secs);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     return 0;
   }
 
@@ -486,6 +559,10 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
     sb.append("time_secs:");
     sb.append(this.time_secs);
     first = false;
+    if (!first) sb.append(", ");
+    sb.append("uptime_secs:");
+    sb.append(this.uptime_secs);
+    first = false;
     sb.append(")");
     return sb.toString();
   }
@@ -504,6 +581,10 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
       throw new org.apache.thrift.protocol.TProtocolException("Required field 'time_secs' is unset! Struct:" + toString());
     }
 
+    if (!is_set_uptime_secs()) {
+      throw new org.apache.thrift.protocol.TProtocolException("Required field 'uptime_secs' is unset! Struct:" + toString());
+    }
+
     // check for sub-struct validity
   }
 
@@ -581,6 +662,14 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
             }
             break;
+          case 4: // UPTIME_SECS
+            if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
+              struct.uptime_secs = iprot.readI32();
+              struct.set_uptime_secs_isSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
           default:
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
         }
@@ -615,6 +704,9 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
       oprot.writeFieldBegin(TIME_SECS_FIELD_DESC);
       oprot.writeI32(struct.time_secs);
       oprot.writeFieldEnd();
+      oprot.writeFieldBegin(UPTIME_SECS_FIELD_DESC);
+      oprot.writeI32(struct.uptime_secs);
+      oprot.writeFieldEnd();
       oprot.writeFieldStop();
       oprot.writeStructEnd();
     }
@@ -642,6 +734,7 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
         }
       }
       oprot.writeI32(struct.time_secs);
+      oprot.writeI32(struct.uptime_secs);
     }
 
     @Override
@@ -666,6 +759,8 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
       struct.set_executor_stats_isSet(true);
       struct.time_secs = iprot.readI32();
       struct.set_time_secs_isSet(true);
+      struct.uptime_secs = iprot.readI32();
+      struct.set_uptime_secs_isSet(true);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/24e3b988/storm-core/src/py/__init__.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/__init__.py b/storm-core/src/py/__init__.py
index 59dd060..0896fcd 100644
--- a/storm-core/src/py/__init__.py
+++ b/storm-core/src/py/__init__.py
@@ -1,3 +1,5 @@
+#!/usr/bin/env python
+#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/storm/blob/24e3b988/storm-core/src/py/storm/DistributedRPC.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/DistributedRPC.py b/storm-core/src/py/storm/DistributedRPC.py
index 1728434..330499c 100644
--- a/storm-core/src/py/storm/DistributedRPC.py
+++ b/storm-core/src/py/storm/DistributedRPC.py
@@ -1,3 +1,5 @@
+#!/usr/bin/env python
+#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/storm/blob/24e3b988/storm-core/src/py/storm/DistributedRPCInvocations.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/DistributedRPCInvocations.py b/storm-core/src/py/storm/DistributedRPCInvocations.py
index fddbbe5..493fcc7 100644
--- a/storm-core/src/py/storm/DistributedRPCInvocations.py
+++ b/storm-core/src/py/storm/DistributedRPCInvocations.py
@@ -1,3 +1,5 @@
+#!/usr/bin/env python
+#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/storm/blob/24e3b988/storm-core/src/py/storm/Nimbus.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/Nimbus.py b/storm-core/src/py/storm/Nimbus.py
index 38e137d..b952e3c 100644
--- a/storm-core/src/py/storm/Nimbus.py
+++ b/storm-core/src/py/storm/Nimbus.py
@@ -1,3 +1,5 @@
+#!/usr/bin/env python
+#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/storm/blob/24e3b988/storm-core/src/py/storm/__init__.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/__init__.py b/storm-core/src/py/storm/__init__.py
index 3692381..9ecdc2b 100644
--- a/storm-core/src/py/storm/__init__.py
+++ b/storm-core/src/py/storm/__init__.py
@@ -1,3 +1,5 @@
+#!/usr/bin/env python
+#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/storm/blob/24e3b988/storm-core/src/py/storm/constants.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/constants.py b/storm-core/src/py/storm/constants.py
index 8f77f7c..3f0c64a 100644
--- a/storm-core/src/py/storm/constants.py
+++ b/storm-core/src/py/storm/constants.py
@@ -1,3 +1,5 @@
+#!/usr/bin/env python
+#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/storm/blob/24e3b988/storm-core/src/py/storm/ttypes.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/ttypes.py b/storm-core/src/py/storm/ttypes.py
index f683dfe..512e666 100644
--- a/storm-core/src/py/storm/ttypes.py
+++ b/storm-core/src/py/storm/ttypes.py
@@ -1,3 +1,5 @@
+#!/usr/bin/env python
+#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -5255,6 +5257,7 @@ class ClusterWorkerHeartbeat:
    - storm_id
    - executor_stats
    - time_secs
+   - uptime_secs
   """
 
   thrift_spec = (
@@ -5262,12 +5265,14 @@ class ClusterWorkerHeartbeat:
     (1, TType.STRING, 'storm_id', None, None, ), # 1
     (2, TType.MAP, 'executor_stats', (TType.STRUCT,(ExecutorInfo, ExecutorInfo.thrift_spec),TType.STRUCT,(ExecutorStats, ExecutorStats.thrift_spec)), None, ), # 2
     (3, TType.I32, 'time_secs', None, None, ), # 3
+    (4, TType.I32, 'uptime_secs', None, None, ), # 4
   )
 
-  def __init__(self, storm_id=None, executor_stats=None, time_secs=None,):
+  def __init__(self, storm_id=None, executor_stats=None, time_secs=None, uptime_secs=None,):
     self.storm_id = storm_id
     self.executor_stats = executor_stats
     self.time_secs = time_secs
+    self.uptime_secs = uptime_secs
 
   def read(self, iprot):
     if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -5301,6 +5306,11 @@ class ClusterWorkerHeartbeat:
           self.time_secs = iprot.readI32();
         else:
           iprot.skip(ftype)
+      elif fid == 4:
+        if ftype == TType.I32:
+          self.uptime_secs = iprot.readI32();
+        else:
+          iprot.skip(ftype)
       else:
         iprot.skip(ftype)
       iprot.readFieldEnd()
@@ -5327,6 +5337,10 @@ class ClusterWorkerHeartbeat:
       oprot.writeFieldBegin('time_secs', TType.I32, 3)
       oprot.writeI32(self.time_secs)
       oprot.writeFieldEnd()
+    if self.uptime_secs is not None:
+      oprot.writeFieldBegin('uptime_secs', TType.I32, 4)
+      oprot.writeI32(self.uptime_secs)
+      oprot.writeFieldEnd()
     oprot.writeFieldStop()
     oprot.writeStructEnd()
 
@@ -5337,6 +5351,8 @@ class ClusterWorkerHeartbeat:
       raise TProtocol.TProtocolException(message='Required field executor_stats is unset!')
     if self.time_secs is None:
       raise TProtocol.TProtocolException(message='Required field time_secs is unset!')
+    if self.uptime_secs is None:
+      raise TProtocol.TProtocolException(message='Required field uptime_secs is unset!')
     return
 
 
@@ -5345,6 +5361,7 @@ class ClusterWorkerHeartbeat:
     value = (value * 31) ^ hash(self.storm_id)
     value = (value * 31) ^ hash(self.executor_stats)
     value = (value * 31) ^ hash(self.time_secs)
+    value = (value * 31) ^ hash(self.uptime_secs)
     return value
 
   def __repr__(self):

http://git-wip-us.apache.org/repos/asf/storm/blob/24e3b988/storm-core/src/storm.thrift
----------------------------------------------------------------------
diff --git a/storm-core/src/storm.thrift b/storm-core/src/storm.thrift
index db4a7b3..3971a43 100644
--- a/storm-core/src/storm.thrift
+++ b/storm-core/src/storm.thrift
@@ -292,6 +292,7 @@ struct ClusterWorkerHeartbeat {
     1: required string storm_id;
     2: required map<ExecutorInfo,ExecutorStats> executor_stats;
     3: required i32 time_secs;
+    4: required i32 uptime_secs;
 }
 
 enum NumErrorsChoice {