You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/03/15 18:44:57 UTC

[24/30] storm git commit: 1. changed heartbeat structure to java HashMap 2. use HashMaps in StatsUtil instead of clojure map 3. changed tests accordingly

1. changed heartbeat structure to java HashMap
2. use HashMaps in StatsUtil instead of clojure map
3. changed tests accordingly


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4c246d1c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4c246d1c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4c246d1c

Branch: refs/heads/master
Commit: 4c246d1c5582396debfad2a3687a243303e9a0e5
Parents: 9002528
Author: 卫乐 <we...@taobao.com>
Authored: Tue Mar 8 20:28:14 2016 +0800
Committer: 卫乐 <we...@taobao.com>
Committed: Tue Mar 8 20:28:14 2016 +0800

----------------------------------------------------------------------
 .../clj/org/apache/storm/command/heartbeats.clj |    5 +-
 .../src/clj/org/apache/storm/converter.clj      |   25 -
 .../clj/org/apache/storm/daemon/executor.clj    |    2 +-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |   56 +-
 .../src/clj/org/apache/storm/daemon/worker.clj  |   18 +-
 storm-core/src/clj/org/apache/storm/testing.clj |    8 +-
 storm-core/src/clj/org/apache/storm/ui/core.clj |   16 +-
 .../apache/storm/stats/BoltExecutorStats.java   |   47 +-
 .../apache/storm/stats/SpoutExecutorStats.java  |   37 +-
 .../jvm/org/apache/storm/stats/StatsUtil.java   | 1281 +++++++++++-------
 .../test/clj/org/apache/storm/nimbus_test.clj   |   17 +-
 11 files changed, 835 insertions(+), 677 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4c246d1c/storm-core/src/clj/org/apache/storm/command/heartbeats.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/heartbeats.clj b/storm-core/src/clj/org/apache/storm/command/heartbeats.clj
index c4413f0..625cff7 100644
--- a/storm-core/src/clj/org/apache/storm/command/heartbeats.clj
+++ b/storm-core/src/clj/org/apache/storm/command/heartbeats.clj
@@ -22,7 +22,8 @@
             [clojure.string :as string])
   (:import [org.apache.storm.generated ClusterWorkerHeartbeat]
            [org.apache.storm.utils Utils ConfigUtils]
-           [org.apache.storm.cluster ZKStateStorage ClusterStateContext ClusterUtils])
+           [org.apache.storm.cluster ZKStateStorage ClusterStateContext ClusterUtils]
+           [org.apache.storm.stats StatsUtil])
   (:gen-class))
 
 (defn -main [command path & args]
@@ -37,7 +38,7 @@
       "get"
       (log-message 
        (if-let [hb (.get_worker_hb cluster path false)]
-         (clojurify-zk-worker-hb
+         (StatsUtil/convertZkWorkerHb
           (Utils/deserialize
            hb
            ClusterWorkerHeartbeat))

http://git-wip-us.apache.org/repos/asf/storm/blob/4c246d1c/storm-core/src/clj/org/apache/storm/converter.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/converter.clj b/storm-core/src/clj/org/apache/storm/converter.clj
index 495fe7f..6bd7e72 100644
--- a/storm-core/src/clj/org/apache/storm/converter.clj
+++ b/storm-core/src/clj/org/apache/storm/converter.clj
@@ -215,31 +215,6 @@
       (convert-to-symbol-from-status (.get_prev_status storm-base))
       (map-val clojurify-debugoptions (.get_component_debug storm-base)))))
 
-(defn clojurify-zk-worker-hb [^ClusterWorkerHeartbeat worker-hb]
-  (if worker-hb
-    {:storm-id (.get_storm_id worker-hb)
-     :executor-stats (clojurify-structure (StatsUtil/clojurifyStats (into {} (.get_executor_stats worker-hb))))
-     :uptime (.get_uptime_secs worker-hb)
-     :time-secs (.get_time_secs worker-hb)
-     }
-    {}))
-
-(defn clojurify-zk-executor-hb [^ExecutorBeat executor-hb]
-  (if executor-hb
-    {:stats (StatsUtil/clojurifyExecutorStats (.getStats executor-hb))
-     :uptime (.getUptime executor-hb)
-     :time-secs (.getTimeSecs executor-hb)
-     }
-    {}))
-
-(defn thriftify-zk-worker-hb [worker-hb]
-  (if (not-empty (filter second (:executor-stats worker-hb)))
-    (doto (ClusterWorkerHeartbeat.)
-      (.set_uptime_secs (:uptime worker-hb))
-      (.set_storm_id (:storm-id worker-hb))
-      (.set_executor_stats (StatsUtil/thriftifyStats (filter second (:executor-stats worker-hb))))
-      (.set_time_secs (:time-secs worker-hb)))))
-
 (defn thriftify-error [error]
   (doto (ErrorInfo. (:error error) (:time-secs error))
     (.set_host (:host error))

http://git-wip-us.apache.org/repos/asf/storm/blob/4c246d1c/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 4bbce10..becd8f3 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -406,7 +406,7 @@
     (reify
       RunningExecutor
       (render-stats [this]
-        (clojurify-structure (.renderStats (:stats executor-data))))
+        (.renderStats (:stats executor-data)))
       (get-executor-id [this]
         executor-id)
       (credentials-changed [this creds]

http://git-wip-us.apache.org/repos/asf/storm/blob/4c246d1c/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 83f73d5..997f92c 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -559,48 +559,17 @@
                       executor->component
                       (:launch-time-secs storm-base))))
 
-;; Does not assume that clocks are synchronized. Executor heartbeat is only used so that
-;; nimbus knows when it's received a new heartbeat. All timing is done by nimbus and
-;; tracked through heartbeat-cache
-(defn- update-executor-cache [curr hb timeout]
-  (let [reported-time (:time-secs hb)
-        {last-nimbus-time :nimbus-time
-         last-reported-time :executor-reported-time} curr
-        reported-time (cond reported-time reported-time
-                            last-reported-time last-reported-time
-                            :else 0)
-        nimbus-time (if (or (not last-nimbus-time)
-                        (not= last-reported-time reported-time))
-                      (Time/currentTimeSecs)
-                      last-nimbus-time
-                      )]
-      {:is-timed-out (and
-                       nimbus-time
-                       (>= (Time/deltaSecs nimbus-time) timeout))
-       :nimbus-time nimbus-time
-       :executor-reported-time reported-time
-       :heartbeat hb}))
-
-(defn update-heartbeat-cache [cache executor-beats all-executors timeout]
-  (let [cache (select-keys cache all-executors)]
-    (into {}
-      (for [executor all-executors :let [curr (cache executor)]]
-        [executor
-         (update-executor-cache curr (get executor-beats executor) timeout)]
-         ))))
 
 (defn update-heartbeats! [nimbus storm-id all-executors existing-assignment]
   (log-debug "Updating heartbeats for " storm-id " " (pr-str all-executors))
   (let [storm-cluster-state (:storm-cluster-state nimbus)
-        executor-beats (let [executor-stats-java-map (.executorBeats storm-cluster-state storm-id (.get_executor_node_port (thriftify-assignment existing-assignment)))
-                             executor-stats-clojurify (clojurify-structure executor-stats-java-map)]
-                         (->> (dofor [[^ExecutorInfo executor-info  ^ExecutorBeat executor-heartbeat] executor-stats-clojurify]
-                             {[(.get_task_start executor-info) (.get_task_end executor-info)] (clojurify-zk-executor-hb executor-heartbeat)})
-                           (apply merge)))
-        cache (update-heartbeat-cache (@(:heartbeats-cache nimbus) storm-id)
+        executor-beats (let [executor-stats-java-map (.executorBeats storm-cluster-state storm-id
+                                                       (.get_executor_node_port (thriftify-assignment existing-assignment)))]
+                         (StatsUtil/convertExecutorBeats executor-stats-java-map))
+        cache (StatsUtil/updateHeartbeatCache (@(:heartbeats-cache nimbus) storm-id)
                                       executor-beats
-                                      all-executors
-                                      ((:conf nimbus) NIMBUS-TASK-TIMEOUT-SECS))]
+                                      (StatsUtil/convertExecutors all-executors)
+                                      (int ((:conf nimbus) NIMBUS-TASK-TIMEOUT-SECS)))]
       (swap! (:heartbeats-cache nimbus) assoc storm-id cache)))
 
 (defn- update-all-heartbeats! [nimbus existing-assignments topology->executors]
@@ -625,7 +594,7 @@
     (->> all-executors
         (filter (fn [executor]
           (let [start-time (get executor-start-times executor)
-                is-timed-out (-> heartbeats-cache (get executor) :is-timed-out)]
+                is-timed-out (.get (.get heartbeats-cache (StatsUtil/convertExecutor executor)) "is-timed-out")]
             (if (and start-time
                    (or
                     (< (Time/deltaSecs start-time)
@@ -1415,8 +1384,7 @@
                                      (throw
                                        (NotAliveException. (str storm-id))))
                   assignment (clojurify-assignment (.assignmentInfo storm-cluster-state storm-id nil))
-                  beats (map-val :heartbeat (get @(:heartbeats-cache nimbus)
-                                                 storm-id))
+                  beats (get @(:heartbeats-cache nimbus) storm-id)
                   all-components (set (vals task->component))]
               {:storm-name storm-name
                :storm-cluster-state storm-cluster-state
@@ -1919,9 +1887,9 @@
                           (map (fn [c] [c (errors-fn storm-cluster-state storm-id c)]))
                           (into {}))
               executor-summaries (dofor [[executor [node port]] (:executor->node+port assignment)]
-                                        (let [host (-> assignment :node->host (get node))
-                                              heartbeat (get beats executor)
-                                              excutorstats (:stats heartbeat)
+                                     (let [host (-> assignment :node->host (get node))
+                                              heartbeat (.get beats (StatsUtil/convertExecutor executor))
+                                              excutorstats (.get (.get heartbeat "heartbeat") "stats")
                                               excutorstats (if excutorstats
                                                       (StatsUtil/thriftifyExecutorStats excutorstats))]
                                               
@@ -1930,7 +1898,7 @@
                                                                 (-> executor first task->component)
                                                                 host
                                                                 port
-                                                                (Utils/nullToZero (:uptime heartbeat)))
+                                                                (Utils/nullToZero (.get heartbeat "uptime")))
                                             (.set_stats excutorstats))
                                           ))
               topo-info  (TopologyInfo. storm-id

http://git-wip-us.apache.org/repos/asf/storm/blob/4c246d1c/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 92ba807..10a1e47 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -21,7 +21,8 @@
   (:require [org.apache.storm.daemon [executor :as executor]])
 
   (:require [clojure.set :as set])
-  (:import [java.io File])
+  (:import [java.io File]
+           [org.apache.storm.stats StatsUtil])
   (:import [java.util.concurrent Executors]
            [org.apache.storm.hooks IWorkerHook BaseWorkerHook]
            [uk.org.lidalia.sysoutslf4j.context SysOutOverSLF4J])
@@ -66,18 +67,15 @@
 (defnk do-executor-heartbeats [worker :executors nil]
   ;; stats is how we know what executors are assigned to this worker 
   (let [stats (if-not executors
-                  (into {} (map (fn [e] {e nil}) (:executors worker)))
-                  (->> executors
+                  (StatsUtil/mkEmptyExecutorZkHbs (:executors worker))
+                  (StatsUtil/convertExecutorZkHbs (->> executors
                     (map (fn [e] {(executor/get-executor-id e) (executor/render-stats e)}))
-                    (apply merge)))
-        zk-hb {:storm-id (:storm-id worker)
-               :executor-stats stats
-               :uptime (. (:uptime worker) upTime)
-               :time-secs (Time/currentTimeSecs)
-               }]
+                    (apply merge))))
+        zk-hb (StatsUtil/mkZkWorkerHb (:storm-id worker) stats (. (:uptime worker) upTime))]
     ;; do the zookeeper heartbeat
     (try
-      (.workerHeartbeat (:storm-cluster-state worker) (:storm-id worker) (:assignment-id worker) (long (:port worker)) (thriftify-zk-worker-hb zk-hb))
+      (.workerHeartbeat (:storm-cluster-state worker) (:storm-id worker) (:assignment-id worker) (long (:port worker))
+        (StatsUtil/thriftifyZkWorkerHb zk-hb))
       (catch Exception exc
         (log-error exc "Worker failed to write heatbeats to ZK or Pacemaker...will retry")))))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/4c246d1c/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
index 66fc051..419cf2b 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -452,7 +452,7 @@
         assignment (clojurify-assignment (.assignmentInfo state storm-id nil))
         taskbeats (.taskbeats state storm-id (:task->node+port assignment))
         heartbeats (dofor [id task-ids] (get taskbeats id))
-        stats (dofor [hb heartbeats] (if hb (stat-key (:stats hb)) 0))]
+        stats (dofor [hb heartbeats] (if hb (.get (.get hb "stats") stat-key) 0))]
     (reduce + stats)))
 
 (defn emitted-spout-tuples
@@ -460,16 +460,16 @@
   (aggregated-stat
     cluster-map
     storm-name
-    :emitted
+    "emitted"
     :component-ids (keys (.get_spouts topology))))
 
 (defn transferred-tuples
   [cluster-map storm-name]
-  (aggregated-stat cluster-map storm-name :transferred))
+  (aggregated-stat cluster-map storm-name "transferred"))
 
 (defn acked-tuples
   [cluster-map storm-name]
-  (aggregated-stat cluster-map storm-name :acked))
+  (aggregated-stat cluster-map storm-name "acked"))
 
 (defn simulate-wait
   [cluster-map]

http://git-wip-us.apache.org/repos/asf/storm/blob/4c246d1c/storm-core/src/clj/org/apache/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj b/storm-core/src/clj/org/apache/storm/ui/core.clj
index b9cf2d7..0730d96 100644
--- a/storm-core/src/clj/org/apache/storm/ui/core.clj
+++ b/storm-core/src/clj/org/apache/storm/ui/core.clj
@@ -124,11 +124,11 @@
 
 (defn spout-summary?
   [topology s]
-  (= :spout (executor-summary-type topology s)))
+  (= "spout" (executor-summary-type topology s)))
 
 (defn bolt-summary?
   [topology s]
-  (= :bolt (executor-summary-type topology s)))
+  (= "bolt" (executor-summary-type topology s)))
 
 (defn group-by-comp
   [summs]
@@ -230,8 +230,8 @@
   (let [components (for [[id spec] spout-bolt]
             [id
              (let [inputs (.get_inputs (.get_common spec))
-                   bolt-summs (get bolt-comp-summs id)
-                   spout-summs (get spout-comp-summs id)
+                   bolt-summs (.get bolt-comp-summs id)
+                   spout-summs (.get spout-comp-summs id)
                    bolt-cap (if bolt-summs
                               (StatsUtil/computeBoltCapacity bolt-summs)
                               0)]
@@ -240,17 +240,17 @@
                 :latency (if bolt-summs
                            (get-in
                              (clojurify-structure (StatsUtil/boltStreamsStats bolt-summs true))
-                             [:process-latencies window])
+                             ["process-latencies" window])
                            (get-in
                              (clojurify-structure (StatsUtil/spoutStreamsStats spout-summs true))
-                             [:complete-latencies window]))
+                             ["complete-latencies" window]))
                 :transferred (or
                                (get-in
                                  (clojurify-structure (StatsUtil/spoutStreamsStats spout-summs true))
-                                 [:transferred window])
+                                 ["transferred" window])
                                (get-in
                                  (clojurify-structure (StatsUtil/boltStreamsStats bolt-summs true))
-                                 [:transferred window]))
+                                 ["transferred" window]))
                 :stats (let [mapfn (fn [dat]
                                      (map (fn [^ExecutorSummary summ]
                                             {:host (.get_host summ)

http://git-wip-us.apache.org/repos/asf/storm/blob/4c246d1c/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java b/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
index d8c7f06..e26e56b 100644
--- a/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
+++ b/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
@@ -18,9 +18,10 @@
 package org.apache.storm.stats;
 
 import com.google.common.collect.Lists;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
+import org.apache.storm.generated.BoltStats;
+import org.apache.storm.generated.ExecutorSpecificStats;
+import org.apache.storm.generated.ExecutorStats;
 import org.apache.storm.metric.internal.MultiCountStatAndMetric;
 import org.apache.storm.metric.internal.MultiLatencyStatAndMetric;
 
@@ -33,8 +34,6 @@ public class BoltExecutorStats extends CommonStats {
     public static final String PROCESS_LATENCIES = "process-latencies";
     public static final String EXECUTE_LATENCIES = "execute-latencies";
 
-    public static final String[] BOLT_FIELDS = {ACKED, FAILED, EXECUTED, PROCESS_LATENCIES, EXECUTE_LATENCIES};
-
     public BoltExecutorStats(int rate) {
         super(rate);
 
@@ -83,32 +82,24 @@ public class BoltExecutorStats extends CommonStats {
 
     }
 
-    public Map renderStats() {
+    public ExecutorStats renderStats() {
         cleanupStats();
-        Map ret = new HashMap();
-        ret.putAll(valueStats(CommonStats.COMMON_FIELDS));
-        ret.putAll(valueStats(BoltExecutorStats.BOLT_FIELDS));
-        StatsUtil.putKV(ret, StatsUtil.TYPE, StatsUtil.KW_BOLT);
+
+        ExecutorStats ret = new ExecutorStats();
+        // common stats
+        ret.set_emitted(valueStat(EMITTED));
+        ret.set_transferred(valueStat(TRANSFERRED));
+        ret.set_rate(this.rate);
+
+        // bolt stats
+        BoltStats boltStats = new BoltStats(
+                StatsUtil.windowSetConverter(valueStat(ACKED), StatsUtil.TO_GSID, StatsUtil.IDENTITY),
+                StatsUtil.windowSetConverter(valueStat(FAILED), StatsUtil.TO_GSID, StatsUtil.IDENTITY),
+                StatsUtil.windowSetConverter(valueStat(PROCESS_LATENCIES), StatsUtil.TO_GSID, StatsUtil.IDENTITY),
+                StatsUtil.windowSetConverter(valueStat(EXECUTED), StatsUtil.TO_GSID, StatsUtil.IDENTITY),
+                StatsUtil.windowSetConverter(valueStat(EXECUTE_LATENCIES), StatsUtil.TO_GSID, StatsUtil.IDENTITY));
+        ret.set_specific(ExecutorSpecificStats.bolt(boltStats));
 
         return ret;
     }
-
-//    public ExecutorStats renderStats() {
-//        cleanupStats();
-//
-//        ExecutorStats ret = new ExecutorStats();
-//        ret.set_emitted(valueStat(EMITTED));
-//        ret.set_transferred(valueStat(TRANSFERRED));
-//        ret.set_rate(this.rate);
-//
-//        BoltStats boltStats = new BoltStats(
-//                StatsUtil.windowSetConverter(valueStat(ACKED), StatsUtil.TO_GSID, StatsUtil.IDENTITY),
-//                StatsUtil.windowSetConverter(valueStat(FAILED), StatsUtil.TO_GSID, StatsUtil.IDENTITY),
-//                StatsUtil.windowSetConverter(valueStat(PROCESS_LATENCIES), StatsUtil.TO_GSID, StatsUtil.IDENTITY),
-//                StatsUtil.windowSetConverter(valueStat(EXECUTED), StatsUtil.TO_GSID, StatsUtil.IDENTITY),
-//                StatsUtil.windowSetConverter(valueStat(EXECUTE_LATENCIES), StatsUtil.TO_GSID, StatsUtil.IDENTITY));
-//        ret.set_specific(ExecutorSpecificStats.bolt(boltStats));
-//
-//        return ret;
-//    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/4c246d1c/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java b/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
index 27c626e..3c09a38 100644
--- a/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
+++ b/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
@@ -19,6 +19,9 @@ package org.apache.storm.stats;
 
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.storm.generated.ExecutorSpecificStats;
+import org.apache.storm.generated.ExecutorStats;
+import org.apache.storm.generated.SpoutStats;
 import org.apache.storm.metric.internal.MultiCountStatAndMetric;
 import org.apache.storm.metric.internal.MultiLatencyStatAndMetric;
 
@@ -29,8 +32,6 @@ public class SpoutExecutorStats extends CommonStats {
     public static final String FAILED = "failed";
     public static final String COMPLETE_LATENCIES = "complete-latencies";
 
-    public static final String[] SPOUT_FIELDS = {ACKED, FAILED, COMPLETE_LATENCIES};
-
     public SpoutExecutorStats(int rate) {
         super(rate);
         this.put(ACKED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS));
@@ -59,28 +60,20 @@ public class SpoutExecutorStats extends CommonStats {
         this.getFailed().incBy(stream, this.rate);
     }
 
-    public Map renderStats() {
+    public ExecutorStats renderStats() {
         cleanupStats();
-        Map ret = new HashMap();
-        ret.putAll(valueStats(CommonStats.COMMON_FIELDS));
-        ret.putAll(valueStats(SpoutExecutorStats.SPOUT_FIELDS));
-        StatsUtil.putKV(ret, StatsUtil.TYPE, StatsUtil.KW_SPOUT);
+
+        ExecutorStats ret = new ExecutorStats();
+        // common fields
+        ret.set_emitted(valueStat(EMITTED));
+        ret.set_transferred(valueStat(TRANSFERRED));
+        ret.set_rate(this.rate);
+
+        // spout stats
+        SpoutStats spoutStats = new SpoutStats(
+                valueStat(ACKED), valueStat(FAILED), valueStat(COMPLETE_LATENCIES));
+        ret.set_specific(ExecutorSpecificStats.spout(spoutStats));
 
         return ret;
     }
-
-//    public ExecutorStats renderStats() {
-//        cleanupStats();
-//
-//        ExecutorStats ret = new ExecutorStats();
-//        ret.set_emitted(valueStat(EMITTED));
-//        ret.set_transferred(valueStat(TRANSFERRED));
-//        ret.set_rate(this.rate);
-//
-//        SpoutStats spoutStats = new SpoutStats(
-//                valueStat(ACKED), valueStat(FAILED), valueStat(COMPLETE_LATENCIES));
-//        ret.set_specific(ExecutorSpecificStats.spout(spoutStats));
-//
-//        return ret;
-//    }
 }