You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by d2r <gi...@git.apache.org> on 2015/05/16 21:12:14 UTC

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

GitHub user d2r opened a pull request:

    https://github.com/apache/storm/pull/554

    [STORM-820] Aggregate topo stats on nimbus, not ui

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/d2r/storm storm-820-agg-stats-on-nimbus

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/554.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #554
    
----
commit 3d3c2b7701e351d63b8757155a3e37ec84f497b2
Author: Derek Dagit <de...@yahoo-inc.com>
Date:   2015-05-13T13:35:23Z

    Aggregate topo stats on nimbus, not ui

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/554#discussion_r30919942
  
    --- Diff: storm-core/src/clj/backtype/storm/stats.clj ---
    @@ -373,4 +360,1248 @@
         (ExecutorStats. (window-set-converter (:emitted stats) str)
           (window-set-converter (:transferred stats) str)
           specific-stats
    -      rate)))
    \ No newline at end of file
    +      rate)))
    +
    +(defn- agg-bolt-lat-and-count
    +  "Aggregates number executed and process & execute latencies across all
    --- End diff --
    
    Yeah, I'll reword it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by d2r <gi...@git.apache.org>.
Github user d2r closed the pull request at:

    https://github.com/apache/storm/pull/554


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on the pull request:

    https://github.com/apache/storm/pull/554#issuecomment-106567146
  
    Debugging a corner case we found today...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on the pull request:

    https://github.com/apache/storm/pull/554#issuecomment-102667234
  
    This pull request requires some more scrutiny because of its size and to make sure it does not break functionality.
    
    Goals for this PR
    
    * New Nimbus Thrift calls:
       * getTopologyPageInfo
       * getComponentPageInfo
    * Nimbus Thrift call getTopologyInfo remains
    * Aggregates metrics on nimbus so that serialized data is much smaller for large, highly-connected topologies
    * Moves ui code to stats namespace
    * Iterates over heartbeats one time.
    * Thrift deserialization code avoids deserializing to symbols, preferring strings, since making a symbol out of a string like "600" results in a symbol 600 and not an integer 600.  This can be very confusing to debug, and breaks key comparisons in places.
    * Removes some dead code and does a little refactoring in places.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/554#discussion_r40715660
  
    --- Diff: storm-core/src/jvm/backtype/storm/generated/BoltStats.java ---
    @@ -51,7 +51,7 @@
     import org.slf4j.LoggerFactory;
     
     @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
    -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-9-3")
    +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-9-28")
    --- End diff --
    
    Please fix, only a time stamp change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on the pull request:

    https://github.com/apache/storm/pull/554#issuecomment-144169425
  
    > And did you change time to errorTime in the JSON response?
    
    Good catch. Updated API doc with a note on the name change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by d2r <gi...@git.apache.org>.
GitHub user d2r reopened a pull request:

    https://github.com/apache/storm/pull/554

    [STORM-820] Aggregate topo stats on nimbus, not ui

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/d2r/storm storm-820-agg-stats-on-nimbus

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/554.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #554
    
----
commit a16b50c961a1272315dcc3340ae369c516e25235
Author: Derek Dagit <de...@yahoo-inc.com>
Date:   2015-05-21T16:24:13Z

    Aggregate topo stats on nimbus, not ui

commit b3abf05f02305260cdcbf081a8fe7d7db4b03ce7
Author: Derek Dagit <de...@yahoo-inc.com>
Date:   2015-05-22T19:49:00Z

    rename macro to be clearer that output is to logs

commit c1cc0bab5c9f0338b18cc431d36f8b5b465a195f
Author: Derek Dagit <de...@yahoo-inc.com>
Date:   2015-05-22T19:50:59Z

    clearer docstrings

commit 40ddae268d7c56156db7ca164788f0625d48c505
Author: Derek Dagit <de...@yahoo-inc.com>
Date:   2015-05-22T19:53:17Z

    do not use multimethods when it does not help

commit 1392b24fd9ccfe6d263839e5733b3f556039591e
Author: Derek Dagit <de...@yahoo-inc.com>
Date:   2015-05-22T19:54:08Z

    preserve REST API calls

commit a6b82f8b52533476b645cfbb5a80638f98703103
Author: Derek Dagit <de...@yahoo-inc.com>
Date:   2015-05-22T19:55:42Z

    Renumber thrift struct fields consistently from 1

commit 4b11f71b7e2fe45a62ad5a26d9d6bc32ebdec6df
Author: Derek Dagit <de...@yahoo-inc.com>
Date:   2015-05-22T21:51:10Z

    define a constant var for literal

commit 6b606cf68f60858bdc9064c1fe50c873e6dd1971
Author: Derek Dagit <de...@yahoo-inc.com>
Date:   2015-05-28T20:08:54Z

    handle cases when heartbeats lack stats metrics

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/storm/pull/554#issuecomment-144164575
  
    For the most part this looks good.  My only comment would be to update the REST API docs to describe what the new APIs are like.  And did you change time to errorTime in the JSON response?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/554#discussion_r30921890
  
    --- Diff: storm-core/src/clj/backtype/storm/ui/core.clj ---
    @@ -783,128 +504,180 @@
             "errorLapsedSecs" (get-error-time e)
             "error" (.get_error e)})}))
     
    -(defn spout-stats
    -  [window ^TopologyInfo topology-info component executors include-sys?]
    -  (let [window-hint (str " (" (window-hint window) ")")
    -        stats (get-filled-stats executors)
    -        stream-summary (-> stats (aggregate-spout-stats include-sys?))
    -        summary (-> stream-summary aggregate-spout-streams)]
    -    {"spoutSummary" (spout-summary-json
    -                      (.get_id topology-info) component summary window)
    -     "outputStats" (spout-output-stats stream-summary window)
    -     "executorStats" (spout-executor-stats (.get_id topology-info)
    -                                           executors window include-sys?)}))
    -
    -(defn bolt-summary
    -  [topology-id id stats window]
    -  (let [times (stats-times (:emitted stats))
    -        display-map (into {} (for [t times] [t pretty-uptime-sec]))
    -        display-map (assoc display-map ":all-time" (fn [_] "All time"))]
    -    (for [k (concat times [":all-time"])
    -          :let [disp ((display-map k) k)]]
    -      {"window" k
    -       "windowPretty" disp
    -       "emitted" (get-in stats [:emitted k])
    -       "transferred" (get-in stats [:transferred k])
    -       "executeLatency" (float-str (get-in stats [:execute-latencies k]))
    -       "executed" (get-in stats [:executed k])
    -       "processLatency" (float-str (get-in stats [:process-latencies k]))
    -       "acked" (get-in stats [:acked k])
    -       "failed" (get-in stats [:failed k])})))
    -
    -(defn bolt-output-stats
    -  [stream-summary window]
    -  (let [stream-summary (-> stream-summary
    -                           swap-map-order
    -                           (get window)
    -                           (select-keys [:emitted :transferred])
    -                           swap-map-order)]
    -    (for [[s stats] stream-summary]
    -      {"stream" s
    -        "emitted" (nil-to-zero (:emitted stats))
    -        "transferred" (nil-to-zero (:transferred stats))})))
    -
    -(defn bolt-input-stats
    -  [stream-summary window]
    -  (let [stream-summary
    -        (-> stream-summary
    -            swap-map-order
    -            (get window)
    -            (select-keys [:acked :failed :process-latencies
    -                          :executed :execute-latencies])
    -            swap-map-order)]
    -    (for [[^GlobalStreamId s stats] stream-summary]
    -      {"component" (.get_componentId s)
    -       "encodedComponent" (url-encode (.get_componentId s))
    -       "stream" (.get_streamId s)
    -       "executeLatency" (float-str (:execute-latencies stats))
    -       "processLatency" (float-str (:process-latencies stats))
    -       "executed" (nil-to-zero (:executed stats))
    -       "acked" (nil-to-zero (:acked stats))
    -       "failed" (nil-to-zero (:failed stats))})))
    -
    -(defn bolt-executor-stats
    -  [topology-id executors window include-sys?]
    -  (for [^ExecutorSummary e executors
    -        :let [stats (.get_stats e)
    -              stats (if stats
    -                      (-> stats
    -                          (aggregate-bolt-stats include-sys?)
    -                          (aggregate-bolt-streams)
    -                          swap-map-order
    -                          (get window)))]]
    -    {"id" (pretty-executor-info (.get_executor_info e))
    -     "encodedId" (url-encode (pretty-executor-info (.get_executor_info e)))
    -     "uptime" (pretty-uptime-sec (.get_uptime_secs e))
    -     "host" (.get_host e)
    -     "port" (.get_port e)
    -     "emitted" (nil-to-zero (:emitted stats))
    -     "transferred" (nil-to-zero (:transferred stats))
    -     "capacity" (float-str (nil-to-zero (compute-executor-capacity e)))
    -     "executeLatency" (float-str (:execute-latencies stats))
    -     "executed" (nil-to-zero (:executed stats))
    -     "processLatency" (float-str (:process-latencies stats))
    -     "acked" (nil-to-zero (:acked stats))
    -     "failed" (nil-to-zero (:failed stats))
    -     "workerLogLink" (worker-log-link (.get_host e) (.get_port e) topology-id)}))
    -
    -(defn bolt-stats
    -  [window ^TopologyInfo topology-info component executors include-sys?]
    -  (let [window-hint (str " (" (window-hint window) ")")
    -        stats (get-filled-stats executors)
    -        stream-summary (-> stats (aggregate-bolt-stats include-sys?))
    -        summary (-> stream-summary aggregate-bolt-streams)]
    -    {"boltStats" (bolt-summary (.get_id topology-info) component summary window)
    -     "inputStats" (bolt-input-stats stream-summary window)
    -     "outputStats" (bolt-output-stats stream-summary window)
    -     "executorStats" (bolt-executor-stats
    -                       (.get_id topology-info) executors window include-sys?)}))
    +(defmulti unpack-comp-agg-stat
    +  (fn [[_ ^ComponentAggregateStats s]] (.get_type s)))
    +
    +(defmethod unpack-comp-agg-stat ComponentType/BOLT
    +  [[window ^ComponentAggregateStats s]]
    +  (let [^CommonAggregateStats comm-s (.get_common_stats s)
    +        ^SpecificAggregateStats spec-s (.get_specific_stats s)
    +        ^BoltAggregateStats bolt-s (.get_bolt spec-s)]
    +    {"window" window
    +     "windowPretty" (window-hint window)
    +     "emitted" (.get_emitted comm-s)
    +     "transferred" (.get_transferred comm-s)
    +     "acked" (.get_acked comm-s)
    +     "failed" (.get_failed comm-s)
    +     "executeLatency" (float-str (.get_execute_latency_ms bolt-s))
    +     "processLatency"  (float-str (.get_process_latency_ms bolt-s))
    +     "executed" (.get_executed bolt-s)
    +     "capacity" (float-str (.get_capacity bolt-s))}))
    +
    +(defmethod unpack-comp-agg-stat ComponentType/SPOUT
    +  [[window ^ComponentAggregateStats s]]
    +  (let [^CommonAggregateStats comm-s (.get_common_stats s)
    +        ^SpecificAggregateStats spec-s (.get_specific_stats s)
    +        ^SpoutAggregateStats spout-s (.get_spout spec-s)]
    +    {"window" window
    +     "windowPretty" (window-hint window)
    +     "emitted" (.get_emitted comm-s)
    +     "transferred" (.get_transferred comm-s)
    +     "acked" (.get_acked comm-s)
    +     "failed" (.get_failed comm-s)
    +     "completeLatency" (float-str (.get_complete_latency_ms spout-s))}))
    +
    +(defn- unpack-bolt-input-stat
    +  [[^GlobalStreamId s ^ComponentAggregateStats stats]]
    +  (let [^SpecificAggregateStats sas (.get_specific_stats stats)
    +        ^BoltAggregateStats bas (.get_bolt sas)
    +        ^CommonAggregateStats cas (.get_common_stats stats)
    +        comp-id (.get_componentId s)]
    +    {"component" comp-id
    +     "encodedComponentId" (url-encode comp-id)
    +     "stream" (.get_streamId s)
    +     "executeLatency" (float-str (.get_execute_latency_ms bas))
    +     "processLatency" (float-str (.get_process_latency_ms bas))
    +     "executed" (nil-to-zero (.get_executed bas))
    +     "acked" (nil-to-zero (.get_acked cas))
    +     "failed" (nil-to-zero (.get_failed cas))}))
    +
    +(defmulti unpack-comp-output-stat
    +  (fn [[_ ^ComponentAggregateStats s]] (.get_type s)))
    +
    +(defmethod unpack-comp-output-stat ComponentType/BOLT
    +  [[stream-id ^ComponentAggregateStats stats]]
    +  (let [^CommonAggregateStats cas (.get_common_stats stats)]
    +    {"stream" stream-id
    +     "emitted" (nil-to-zero (.get_emitted cas))
    +     "transferred" (nil-to-zero (.get_transferred cas))}))
    +
    +(defmethod unpack-comp-output-stat ComponentType/SPOUT
    +  [[stream-id ^ComponentAggregateStats stats]]
    +  (let [^CommonAggregateStats cas (.get_common_stats stats)
    +        ^SpecificAggregateStats spec-s (.get_specific_stats stats)
    +        ^SpoutAggregateStats spout-s (.get_spout spec-s)]
    +    {"stream" stream-id
    +     "emitted" (nil-to-zero (.get_emitted cas))
    +     "transferred" (nil-to-zero (.get_transferred cas))
    +     "completeLatency" (float-str (.get_complete_latency_ms spout-s))
    +     "acked" (nil-to-zero (.get_acked cas))
    +     "failed" (nil-to-zero (.get_failed cas))}))
    +
    +(defmulti unpack-comp-exec-stat
    +  (fn [_ ^ComponentAggregateStats cas] (.get_type (.get_stats ^ExecutorAggregateStats cas))))
    +
    +(defmethod unpack-comp-exec-stat ComponentType/BOLT
    +  [topology-id ^ExecutorAggregateStats eas]
    +  (let [^ExecutorSummary summ (.get_exec_summary eas)
    +        ^ExecutorInfo info (.get_executor_info summ)
    +        ^ComponentAggregateStats stats (.get_stats eas)
    +        ^SpecificAggregateStats ss (.get_specific_stats stats)
    +        ^BoltAggregateStats bas (.get_bolt ss)
    +        ^CommonAggregateStats cas (.get_common_stats stats)
    +        host (.get_host summ)
    +        port (.get_port summ)
    +        exec-id (pretty-executor-info info)]
    +    {"id" exec-id
    +     "encodedId" (url-encode exec-id)
    +     "uptime" (pretty-uptime-sec (.get_uptime_secs summ))
    +     "host" host
    +     "port" port
    +     "emitted" (nil-to-zero (.get_emitted cas))
    +     "transferred" (nil-to-zero (.get_transferred cas))
    +     "capacity" (float-str (nil-to-zero (.get_capacity bas)))
    +     "executeLatency" (float-str (.get_execute_latency_ms bas))
    +     "executed" (nil-to-zero (.get_executed bas))
    +     "processLatency" (float-str (.get_process_latency_ms bas))
    +     "acked" (nil-to-zero (.get_acked cas))
    +     "failed" (nil-to-zero (.get_failed cas))
    +     "workerLogLink" (worker-log-link host port topology-id)}))
    +
    +(defmethod unpack-comp-exec-stat ComponentType/SPOUT
    +  [topology-id ^ExecutorAggregateStats eas]
    +  (let [^ExecutorSummary summ (.get_exec_summary eas)
    +        ^ExecutorInfo info (.get_executor_info summ)
    +        ^ComponentAggregateStats stats (.get_stats eas)
    +        ^SpecificAggregateStats ss (.get_specific_stats stats)
    +        ^SpoutAggregateStats sas (.get_spout ss)
    +        ^CommonAggregateStats cas (.get_common_stats stats)
    +        host (.get_host summ)
    +        port (.get_port summ)
    +        exec-id (pretty-executor-info info)]
    +    {"id" exec-id
    +     "encodedId" (url-encode exec-id)
    +     "uptime" (pretty-uptime-sec (.get_uptime_secs summ))
    +     "host" host
    +     "port" port
    +     "emitted" (nil-to-zero (.get_emitted cas))
    +     "transferred" (nil-to-zero (.get_transferred cas))
    +     "completeLatency" (float-str (.get_complete_latency_ms sas))
    +     "acked" (nil-to-zero (.get_acked cas))
    +     "failed" (nil-to-zero (.get_failed cas))
    +     "workerLogLink" (worker-log-link host port topology-id)}))
    +
    +(defmulti unpack-component-page-info
    +  "Unpacks component-specific info to clojure data structures"
    +  (fn [^ComponentPageInfo info & _]
    +    (.get_component_type info)))
    +
    +(defmethod unpack-component-page-info ComponentType/BOLT
    +  [^ComponentPageInfo info topology-id window include-sys?]
    +  (merge
    +    {"boltStats" (map unpack-comp-agg-stat (.get_window_to_stats info))
    +     "inputStats" (map unpack-bolt-input-stat (.get_gsid_to_input_stats info))
    +     "outputStats" (map unpack-comp-output-stat (.get_sid_to_output_stats info))
    +     "executorStats" (map (partial unpack-comp-exec-stat topology-id)
    +                          (.get_exec_stats info))}
    +    (-> info .get_errors (component-errors topology-id))))
    +
    +(defmethod unpack-component-page-info ComponentType/SPOUT
    +  [^ComponentPageInfo info topology-id window include-sys?]
    +  (merge
    +    {"spoutStats" (map unpack-comp-agg-stat (.get_window_to_stats info))
    --- End diff --
    
    I had changed it because it was named inconsistently, and I also should have updated STORM-UI-REST-API.md.
    
    However since it is not essential, I'll revert to using spoutSummary so that the REST API does not change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by d2r <gi...@git.apache.org>.
GitHub user d2r reopened a pull request:

    https://github.com/apache/storm/pull/554

    [STORM-820] Aggregate topo stats on nimbus, not ui

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/d2r/storm storm-820-agg-stats-on-nimbus

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/554.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #554
    
----
commit a16b50c961a1272315dcc3340ae369c516e25235
Author: Derek Dagit <de...@yahoo-inc.com>
Date:   2015-05-21T16:24:13Z

    Aggregate topo stats on nimbus, not ui

commit b3abf05f02305260cdcbf081a8fe7d7db4b03ce7
Author: Derek Dagit <de...@yahoo-inc.com>
Date:   2015-05-22T19:49:00Z

    rename macro to be clearer that output is to logs

commit c1cc0bab5c9f0338b18cc431d36f8b5b465a195f
Author: Derek Dagit <de...@yahoo-inc.com>
Date:   2015-05-22T19:50:59Z

    clearer docstrings

commit 40ddae268d7c56156db7ca164788f0625d48c505
Author: Derek Dagit <de...@yahoo-inc.com>
Date:   2015-05-22T19:53:17Z

    do not use multimethods when it does not help

commit 1392b24fd9ccfe6d263839e5733b3f556039591e
Author: Derek Dagit <de...@yahoo-inc.com>
Date:   2015-05-22T19:54:08Z

    preserve REST API calls

commit a6b82f8b52533476b645cfbb5a80638f98703103
Author: Derek Dagit <de...@yahoo-inc.com>
Date:   2015-05-22T19:55:42Z

    Renumber thrift struct fields consistently from 1

commit 4b11f71b7e2fe45a62ad5a26d9d6bc32ebdec6df
Author: Derek Dagit <de...@yahoo-inc.com>
Date:   2015-05-22T21:51:10Z

    define a constant var for literal

commit 6b606cf68f60858bdc9064c1fe50c873e6dd1971
Author: Derek Dagit <de...@yahoo-inc.com>
Date:   2015-05-28T20:08:54Z

    handle cases when heartbeats lack stats metrics

commit 7a0c3eeeb5fbcbc5c32d47cca692cec103a68244
Author: Derek Dagit <de...@yahoo-inc.com>
Date:   2015-09-04T17:23:06Z

    correct key name

commit 5266a167169cbc7e36a31d55772fdc7ac33ccf94
Author: Derek Dagit <de...@yahoo-inc.com>
Date:   2015-09-22T21:39:11Z

    component page renders with partial heartbeats

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/554#discussion_r30921420
  
    --- Diff: storm-core/src/ui/public/templates/topology-page-template.html ---
    @@ -131,33 +131,37 @@
         </tbody>
       </table>
     </script>
    +
    +<script id="topology-visualization-container-template" type="text/html">
    --- End diff --
    
    For the visualization to initialize, it needs to call the old nimbus thrift API call getTopologyInfo, which has not changed.  I only wanted the visualization to initialize when the user shows it, and not pull down all the stream data for every executor each time the page loads.  If we did that, we would lose most of the gains from aggregating stats data on nimbus.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on the pull request:

    https://github.com/apache/storm/pull/554#issuecomment-104784316
  
    Addressed review comments


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/554


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/554#discussion_r40716500
  
    --- Diff: storm-core/src/clj/backtype/storm/log.clj ---
    @@ -44,3 +46,11 @@
     (defn log-stream
       [& args]
       (apply log/log-stream args))
    +
    +(defmacro log-pprint
    +  [& args]
    +  `(let [^StringWriter writer# (StringWriter.)]
    +     (doall
    +       (for [object# [~@args]]
    +         (pprint object# writer#)))
    +     (log-message "\n" writer#)))
    --- End diff --
    
    Why is there a "\n" before logging the message?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/554#discussion_r30919981
  
    --- Diff: storm-core/src/ui/public/templates/topology-page-template.html ---
    @@ -131,33 +131,37 @@
         </tbody>
       </table>
     </script>
    +
    +<script id="topology-visualization-container-template" type="text/html">
    --- End diff --
    
    What is this change for? It looks like it just moved from topology-visualization-template to a new topology-visualization-container-template, that is now loaded through AJAX.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on the pull request:

    https://github.com/apache/storm/pull/554#issuecomment-145632654
  
    ok working on it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/554#discussion_r40718750
  
    --- Diff: storm-core/src/clj/backtype/storm/log.clj ---
    @@ -44,3 +46,11 @@
     (defn log-stream
       [& args]
       (apply log/log-stream args))
    +
    +(defmacro log-pprint
    +  [& args]
    +  `(let [^StringWriter writer# (StringWriter.)]
    +     (doall
    +       (for [object# [~@args]]
    +         (pprint object# writer#)))
    +     (log-message "\n" writer#)))
    --- End diff --
    
    I think it was to clean up the output. Otherwise the data structure would not begin on its own line but instead begin after the timestamp, etc., and so that was not "pretty."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/554#discussion_r30921545
  
    --- Diff: storm-core/src/storm.thrift ---
    @@ -222,6 +222,86 @@ struct TopologyInfo {
     514: optional string owner;
     }
     
    +struct CommonAggregateStats {
    +513: optional i32 num_executors;
    --- End diff --
    
    Agreed, I'll change this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/554#discussion_r31266476
  
    --- Diff: storm-core/src/clj/backtype/storm/stats.clj ---
    @@ -373,4 +362,1222 @@
         (ExecutorStats. (window-set-converter (:emitted stats) str)
           (window-set-converter (:transferred stats) str)
           specific-stats
    -      rate)))
    \ No newline at end of file
    +      rate)))
    +
    +(defn- agg-bolt-lat-and-count
    +  "Aggregates number executed, process latency, and execute latency across all
    +  streams."
    +  [idk->exec-avg idk->proc-avg idk->num-executed]
    +  {:pre (apply = (map #(set (keys %))
    +                      [idk->exec-avg
    +                       idk->proc-avg
    +                       idk->num-executed]))}
    +  (letfn [(weight-avg [[id avg]] (let [num-e (get idk->num-executed id)]
    +                                   (if (and avg num-e)
    +                                     (* avg num-e)
    +                                     0)))]
    +    {:executeLatencyTotal (sum (map weight-avg idk->exec-avg))
    +     :processLatencyTotal (sum (map weight-avg idk->proc-avg))
    +     :executed (sum (vals idk->num-executed))}))
    +
    +(defn- agg-spout-lat-and-count
    +  "Aggregates number acked and complete latencies across all streams."
    +  [sid->comp-avg sid->num-acked]
    +  {:pre (apply = (map #(set (keys %))
    +                      [sid->comp-avg
    +                       sid->num-acked]))}
    +  (letfn [(weight-avg [[id avg]] (* avg (get sid->num-acked id)))]
    +    {:completeLatencyTotal (sum (map weight-avg sid->comp-avg))
    +     :acked (sum (vals sid->num-acked))}))
    +
    +(defn add-pairs
    +  ([] [0 0])
    +  ([[a1 a2] [b1 b2]]
    +   [(+ a1 b1) (+ a2 b2)]))
    +
    +(defn mk-include-sys-fn
    +  [include-sys?]
    +  (if include-sys?
    +    (fn [_] true)
    +    (fn [stream] (and (string? stream) (not (system-id? stream))))))
    +
    +(defn mk-include-sys-filter
    +  "Returns a function that includes or excludes map entries whose keys are
    +  system ids."
    +  [include-sys?]
    +  (if include-sys?
    +    identity
    +    (partial filter-key (mk-include-sys-fn false))))
    +
    +(defn- agg-bolt-streams-lat-and-count
    +  "Aggregates number executed and process & execute latencies."
    +  [idk->exec-avg idk->proc-avg idk->executed]
    +  {:pre (apply = (map #(set (keys %))
    +                      [idk->exec-avg
    +                       idk->proc-avg
    +                       idk->executed]))}
    +  (letfn [(weight-avg [id avg] (let [num-e (idk->executed id)]
    +                                   (if (and avg num-e)
    +                                     (* avg num-e)
    +                                     0)))]
    +    (into {}
    +      (for [k (keys idk->exec-avg)]
    +        [k {:executeLatencyTotal (weight-avg k (idk->exec-avg k))
    +            :processLatencyTotal (weight-avg k (idk->proc-avg k))
    +            :executed (idk->executed k)}]))))
    +
    +(defn- agg-spout-streams-lat-and-count
    +  "Aggregates number acked and complete latencies."
    +  [idk->comp-avg idk->acked]
    +  {:pre (apply = (map #(set (keys %))
    +                      [idk->comp-avg
    +                       idk->acked]))}
    +  (letfn [(weight-avg [id avg] (let [num-e (get idk->acked id)]
    +                                   (if (and avg num-e)
    +                                     (* avg num-e)
    +                                     0)))]
    +    (into {}
    +      (for [k (keys idk->comp-avg)]
    +        [k {:completeLatencyTotal (weight-avg k (get idk->comp-avg k))
    +            :acked (get idk->acked k)}]))))
    +
    +(defn swap-map-order
    +  "For a nested map, rearrange data such that the top-level keys become the
    +  nested map's keys and vice versa.
    +  Example:
    +  {:a {:X :banana, :Y :pear}, :b {:X :apple, :Y :orange}}
    +  -> {:Y {:a :pear, :b :orange}, :X {:a :banana, :b :apple}}"
    +  [m]
    +  (apply merge-with
    +         merge
    +         (map (fn [[k v]]
    +                (into {}
    +                      (for [[k2 v2] v]
    +                        [k2 {k v2}])))
    +              m)))
    +
    +(defn- compute-agg-capacity
    +  "Computes the capacity metric for one executor given its heartbeat data and
    +  uptime."
    +  [m uptime]
    +  (when uptime
    +    (->>
    +      ;; For each stream, create weighted averages and counts.
    +      (merge-with (fn weighted-avg+count-fn
    +                    [avg cnt]
    +                    [(* avg cnt) cnt])
    +                  (get (:execute-latencies m) (str TEN-MIN-IN-SECONDS))
    +                  (get (:executed m) (str TEN-MIN-IN-SECONDS)))
    +      vals ;; Ignore the stream ids.
    +      (reduce add-pairs
    +              [0. 0]) ;; Combine weighted averages and counts.
    +      ((fn [[weighted-avg cnt]]
    +        (div weighted-avg (* 1000 (min uptime TEN-MIN-IN-SECONDS))))))))
    +
    +(defn agg-pre-merge-comp-page-bolt
    +  [{exec-id :exec-id
    +    host :host
    +    port :port
    +    uptime :uptime
    +    comp-id :comp-id
    +    num-tasks :num-tasks
    +    statk->w->sid->num :stats}
    +   window
    +   include-sys?]
    +  (let [str-key (partial map-key str)
    +        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
    +    {:executor-id exec-id,
    +     :host host,
    +     :port port,
    +     :uptime uptime,
    +     :num-executors 1,
    +     :num-tasks num-tasks,
    +     :capacity (compute-agg-capacity statk->w->sid->num uptime)
    +     :cid+sid->input-stats
    +     (merge-with
    +       merge
    +       (swap-map-order
    +         {:acked (-> statk->w->sid->num
    +                     :acked
    +                     str-key
    +                     (get window))
    +          :failed (-> statk->w->sid->num
    +                      :failed
    +                      str-key
    +                      (get window))})
    +       (agg-bolt-streams-lat-and-count (-> statk->w->sid->num
    +                                           :execute-latencies
    +                                           str-key
    +                                           (get window))
    +                                       (-> statk->w->sid->num
    +                                           :process-latencies
    +                                           str-key
    +                                           (get window))
    +                                       (-> statk->w->sid->num
    +                                           :executed
    +                                           str-key
    +                                           (get window)))),
    +     :sid->output-stats
    +     (swap-map-order
    +       {:emitted (-> statk->w->sid->num
    +                     :emitted
    +                     str-key
    +                     (get window)
    +                     handle-sys-components-fn)
    +        :transferred (-> statk->w->sid->num
    +                         :transferred
    +                         str-key
    +                         (get window)
    +                         handle-sys-components-fn)})}))
    +
    +(defn agg-pre-merge-comp-page-spout
    +  [{exec-id :exec-id
    +    host :host
    +    port :port
    +    uptime :uptime
    +    comp-id :comp-id
    +    num-tasks :num-tasks
    +    statk->w->sid->num :stats}
    +   window
    +   include-sys?]
    +  (let [str-key (partial map-key str)
    +        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
    +    {:executor-id exec-id,
    +     :host host,
    +     :port port,
    +     :uptime uptime,
    +     :num-executors 1,
    +     :num-tasks num-tasks,
    +     :sid->output-stats
    +     (merge-with
    +       merge
    +       (agg-spout-streams-lat-and-count (-> statk->w->sid->num
    +                                            :complete-latencies
    +                                            str-key
    +                                            (get window))
    +                                        (-> statk->w->sid->num
    +                                            :acked
    +                                            str-key
    +                                            (get window)))
    +       (swap-map-order
    +         {:acked (-> statk->w->sid->num
    +                     :acked
    +                     str-key
    +                     (get window))
    +          :failed (-> statk->w->sid->num
    +                      :failed
    +                      str-key
    +                      (get window))
    +          :emitted (-> statk->w->sid->num
    +                       :emitted
    +                       str-key
    +                       (get window)
    +                       handle-sys-components-fn)
    +          :transferred (-> statk->w->sid->num
    +                           :transferred
    +                           str-key
    +                           (get window)
    +                           handle-sys-components-fn)}))}))
    +
    +(defn agg-pre-merge-topo-page-bolt
    +  [{comp-id :comp-id
    +    num-tasks :num-tasks
    +    statk->w->sid->num :stats
    +    uptime :uptime}
    +   window
    +   include-sys?]
    +  (let [str-key (partial map-key str)
    +        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
    +    {comp-id
    +     (merge
    +       (agg-bolt-lat-and-count (-> statk->w->sid->num
    +                                   :execute-latencies
    +                                   str-key
    +                                   (get window))
    +                               (-> statk->w->sid->num
    +                                   :process-latencies
    +                                   str-key
    +                                   (get window))
    +                               (-> statk->w->sid->num
    +                                   :executed
    +                                   str-key
    +                                   (get window)))
    +       {:num-executors 1
    +        :num-tasks num-tasks
    +        :emitted (-> statk->w->sid->num
    +                     :emitted
    +                     str-key
    +                     (get window)
    +                     handle-sys-components-fn
    +                     vals
    +                     sum)
    +        :transferred (-> statk->w->sid->num
    +                         :transferred
    +                         str-key
    +                         (get window)
    +                         handle-sys-components-fn
    +                         vals
    +                         sum)
    +        :capacity (compute-agg-capacity statk->w->sid->num uptime)
    +        :acked (-> statk->w->sid->num
    +                   :acked
    +                   str-key
    +                   (get window)
    +                   vals
    +                   sum)
    +        :failed (-> statk->w->sid->num
    +                    :failed
    +                    str-key
    +                    (get window)
    +                    vals
    +                    sum)})}))
    +
    +(defn agg-pre-merge-topo-page-spout
    +  [{comp-id :comp-id
    +    num-tasks :num-tasks
    +    statk->w->sid->num :stats}
    +   window
    +   include-sys?]
    +  (let [str-key (partial map-key str)
    +        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
    +    {comp-id
    +     (merge
    +       (agg-spout-lat-and-count (-> statk->w->sid->num
    +                                    :complete-latencies
    +                                    str-key
    +                                    (get window))
    +                                (-> statk->w->sid->num
    +                                    :acked
    +                                    str-key
    +                                    (get window)))
    +       {:num-executors 1
    +        :num-tasks num-tasks
    +        :emitted (-> statk->w->sid->num
    +                     :emitted
    +                     str-key
    +                     (get window)
    +                     handle-sys-components-fn
    +                     vals
    +                     sum)
    +        :transferred (-> statk->w->sid->num
    +                         :transferred
    +                         str-key
    +                         (get window)
    +                         handle-sys-components-fn
    +                         vals
    +                         sum)
    +        :failed (-> statk->w->sid->num
    +                    :failed
    +                    str-key
    +                    (get window)
    +                    vals
    +                    sum)})}))
    +
    +(defn apply-default
    +  [f defaulting-fn & args]
    +  (apply f (map defaulting-fn args)))
    +
    +(defn apply-or-0
    +  [f & args]
    +  (apply apply-default f #(or % 0) args))
    +
    +(defn sum-or-0
    +  [& args]
    +  (apply apply-or-0 + args))
    +
    +(defn max-or-0
    +  [& args]
    +  (apply apply-or-0 max args))
    +
    +(defn merge-agg-comp-stats-comp-page-bolt
    +  [{acc-in :cid+sid->input-stats
    +    acc-out :sid->output-stats
    +    :as acc-bolt-stats}
    +   {bolt-in :cid+sid->input-stats
    +    bolt-out :sid->output-stats
    +    :as bolt-stats}]
    +  {:num-executors (inc (or (:num-executors acc-bolt-stats) 0)),
    +   :num-tasks (sum-or-0 (:num-tasks acc-bolt-stats) (:num-tasks bolt-stats)),
    +   :sid->output-stats (merge-with (partial merge-with sum-or-0)
    +                                  acc-out
    +                                  bolt-out),
    +   :cid+sid->input-stats (merge-with (partial merge-with sum-or-0)
    +                                     acc-in
    +                                     bolt-in),
    +   :executor-stats
    +   (let [sum-streams (fn [m k] (->> m vals (map k) (apply sum-or-0)))
    +         executed (sum-streams bolt-in :executed)]
    +     (conj (:executor-stats acc-bolt-stats)
    +           (merge
    +             (select-keys bolt-stats
    +                          [:executor-id :uptime :host :port :capacity])
    +             {:emitted (sum-streams bolt-out :emitted)
    +              :transferred (sum-streams bolt-out :transferred)
    +              :acked (sum-streams bolt-in :acked)
    +              :failed (sum-streams bolt-in :failed)
    +              :executed executed}
    +             (->>
    +               (if (and executed (pos? executed))
    +                 [(div (sum-streams bolt-in :executeLatencyTotal) executed)
    +                  (div (sum-streams bolt-in :processLatencyTotal) executed)]
    +                 [nil nil])
    +               (mapcat vector [:execute-latency :process-latency])
    +               (apply assoc {})))))})
    +
    +(defn merge-agg-comp-stats-comp-page-spout
    +  [{acc-out :sid->output-stats
    +    :as acc-spout-stats}
    +   {spout-out :sid->output-stats
    +    :as spout-stats}]
    +  {:num-executors (inc (or (:num-executors acc-spout-stats) 0)),
    +   :num-tasks (sum-or-0 (:num-tasks acc-spout-stats) (:num-tasks spout-stats)),
    +   :sid->output-stats (merge-with (partial merge-with sum-or-0)
    +                                  acc-out
    +                                  spout-out),
    +   :executor-stats
    +   (let [sum-streams (fn [m k] (->> m vals (map k) (apply sum-or-0)))
    +         acked (sum-streams spout-out :acked)]
    +     (conj (:executor-stats acc-spout-stats)
    +           (merge
    +             (select-keys spout-stats [:executor-id :uptime :host :port])
    +             {:emitted (sum-streams spout-out :emitted)
    +              :transferred (sum-streams spout-out :transferred)
    +              :acked acked
    +              :failed (sum-streams spout-out :failed)}
    +             {:complete-latency (if (and acked (pos? acked))
    +                                  (div (sum-streams spout-out
    +                                                    :completeLatencyTotal)
    +                                       acked)
    +                                  nil)})))})
    +
    +(defn merge-agg-comp-stats-topo-page-bolt
    +  [acc-bolt-stats bolt-stats]
    +  {:num-executors (inc (or (:num-executors acc-bolt-stats) 0))
    +   :num-tasks (sum-or-0 (:num-tasks acc-bolt-stats) (:num-tasks bolt-stats))
    +   :emitted (sum-or-0 (:emitted acc-bolt-stats) (:emitted bolt-stats))
    +   :transferred (sum-or-0 (:transferred acc-bolt-stats)
    +                          (:transferred bolt-stats))
    +   :capacity (max-or-0 (:capacity acc-bolt-stats) (:capacity bolt-stats))
    +   ;; We sum average latency totals here to avoid dividing at each step.
    +   ;; Compute the average latencies by dividing the total by the count.
    +   :executeLatencyTotal (sum-or-0 (:executeLatencyTotal acc-bolt-stats)
    +                                  (:executeLatencyTotal bolt-stats))
    +   :processLatencyTotal (sum-or-0 (:processLatencyTotal acc-bolt-stats)
    +                                  (:processLatencyTotal bolt-stats))
    +   :executed (sum-or-0 (:executed acc-bolt-stats) (:executed bolt-stats))
    +   :acked (sum-or-0 (:acked acc-bolt-stats) (:acked bolt-stats))
    +   :failed (sum-or-0 (:failed acc-bolt-stats) (:failed bolt-stats))})
    +
    +(defn merge-agg-comp-stats-topo-page-spout
    +  [acc-spout-stats spout-stats]
    +  {:num-executors (inc (or (:num-executors acc-spout-stats) 0))
    +   :num-tasks (sum-or-0 (:num-tasks acc-spout-stats) (:num-tasks spout-stats))
    +   :emitted (sum-or-0 (:emitted acc-spout-stats) (:emitted spout-stats))
    +   :transferred (sum-or-0 (:transferred acc-spout-stats) (:transferred spout-stats))
    +   ;; We sum average latency totals here to avoid dividing at each step.
    +   ;; Compute the average latencies by dividing the total by the count.
    +   :completeLatencyTotal (sum-or-0 (:completeLatencyTotal acc-spout-stats)
    +                            (:completeLatencyTotal spout-stats))
    +   :acked (sum-or-0 (:acked acc-spout-stats) (:acked spout-stats))
    +   :failed (sum-or-0 (:failed acc-spout-stats) (:failed spout-stats))})
    +
    +(defn aggregate-count-streams
    +  [stats]
    +  (->> stats
    +       (map-val #(reduce + (vals %)))))
    +
    +(defn- agg-topo-exec-stats*
    +  "A helper function that does the common work to aggregate stats of one
    +  executor with the given map for the topology page."
    +  [window
    +   include-sys?
    +   {:keys [workers-set
    +           bolt-id->stats
    +           spout-id->stats
    +           window->emitted
    +           window->transferred
    +           window->comp-lat-wgt-avg
    +           window->acked
    +           window->failed] :as acc-stats}
    +   {:keys [stats] :as new-data}
    +   pre-merge-fn
    +   merge-fn
    +   comp-key]
    +  (let [cid->statk->num (pre-merge-fn new-data window include-sys?)
    +        {w->compLatWgtAvg :completeLatencyTotal
    +         w->acked :acked}
    +          (if (:complete-latencies stats)
    +            (swap-map-order
    +              (into {}
    +                    (for [w (keys (:acked stats))]
    +                         [w (agg-spout-lat-and-count
    +                              (get (:complete-latencies stats) w)
    +                              (get (:acked stats) w))])))
    +            {:completeLatencyTotal nil
    +             :acks (aggregate-count-streams (:acked stats))})
    +        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
    +    (assoc {:workers-set (conj workers-set
    +                               [(:host new-data) (:port new-data)])
    +            :bolt-id->stats bolt-id->stats
    +            :spout-id->stats spout-id->stats
    +            :window->emitted (->> (:emitted stats)
    +                                  (map-val handle-sys-components-fn)
    +                                  aggregate-count-streams
    +                                  (merge-with + window->emitted))
    +            :window->transferred (->> (:transferred stats)
    +                                      (map-val handle-sys-components-fn)
    +                                      aggregate-count-streams
    +                                      (merge-with + window->transferred))
    +            :window->comp-lat-wgt-avg (merge-with +
    +                                                  window->comp-lat-wgt-avg
    +                                                  w->compLatWgtAvg)
    +            :window->acked (if (= :spout (:type stats))
    +                             (merge-with + window->acked w->acked)
    +                             window->acked)
    +            :window->failed (if (= :spout (:type stats))
    +                              (->> (:failed stats)
    +                                   aggregate-count-streams
    +                                   (merge-with + window->failed))
    +                              window->failed)}
    +           comp-key (merge-with merge-fn
    +                                (acc-stats comp-key)
    +                                cid->statk->num)
    +           :type (:type stats))))
    +
    +(defmulti agg-topo-exec-stats
    +  "Combines the aggregate stats of one executor with the given map, selecting
    +  the appropriate window and including system components as specified."
    +  (fn dispatch-fn [& args] (:type (last args))))
    +
    +(defmethod agg-topo-exec-stats :bolt
    +  [window include-sys? acc-stats new-data]
    +  (agg-topo-exec-stats* window
    +                        include-sys?
    +                        acc-stats
    +                        new-data
    +                        agg-pre-merge-topo-page-bolt
    +                        merge-agg-comp-stats-topo-page-bolt
    +                        :bolt-id->stats))
    +
    +(defmethod agg-topo-exec-stats :spout
    +  [window include-sys? acc-stats new-data]
    +  (agg-topo-exec-stats* window
    +                        include-sys?
    +                        acc-stats
    +                        new-data
    +                        agg-pre-merge-topo-page-spout
    +                        merge-agg-comp-stats-topo-page-spout
    +                        :spout-id->stats))
    +
    +(defmethod agg-topo-exec-stats :default [_ _ acc-stats _] acc-stats)
    +
    +(defn get-last-error
    +  [storm-cluster-state storm-id component-id]
    +  (if-let [e (.last-error storm-cluster-state storm-id component-id)]
    +    (ErrorInfo. (:error e) (:time-secs e))))
    +
    +(defn component-type
    +  "Returns the component type (either :bolt or :spout) for a given
    +  topology and component id. Returns nil if not found."
    +  [^StormTopology topology id]
    +  (let [bolts (.get_bolts topology)
    +        spouts (.get_spouts topology)]
    +    (cond
    +      (.containsKey bolts id) :bolt
    +      (.containsKey spouts id) :spout)))
    +
    +(defn extract-data-from-hb
    +  ([exec->host+port task->component beats include-sys? topology comp-id]
    +   (for [[[start end :as executor] [host port]] exec->host+port
    +         :let [beat (beats executor)
    +               id (task->component start)]
    +         :when (and (or (nil? comp-id) (= comp-id id))
    +                    (or include-sys? (not (system-id? id))))]
    +     {:exec-id executor
    +      :comp-id id
    +      :num-tasks (count (range start (inc end)))
    +      :host host
    +      :port port
    +      :uptime (:uptime beat)
    +      :stats (:stats beat)
    +      :type (or (:type (:stats beat))
    +                (component-type topology id))}))
    +  ([exec->host+port task->component beats include-sys? topology]
    +    (extract-data-from-hb exec->host+port
    +                          task->component
    +                          beats
    +                          include-sys?
    +                          topology
    +                          nil)))
    +
    +(defn aggregate-topo-stats
    +  [window include-sys? data]
    +  (let [init-val {:workers-set #{}
    +                  :bolt-id->stats {}
    +                  :spout-id->stats {}
    +                  :window->emitted {}
    +                  :window->transferred {}
    +                  :window->comp-lat-wgt-avg {}
    +                  :window->acked {}
    +                  :window->failed {}}
    +        reducer-fn (partial agg-topo-exec-stats
    +                            window
    +                            include-sys?)]
    +    (reduce reducer-fn init-val data)))
    +
    +(defn- compute-weighted-averages-per-window
    +  [acc-data wgt-avg-key divisor-key]
    +  (into {} (for [[window wgt-avg] (wgt-avg-key acc-data)
    +                 :let [divisor ((divisor-key acc-data) window)]
    +                 :when (and divisor (pos? divisor))]
    +             [(str window) (div wgt-avg divisor)])))
    +
    +(defn- post-aggregate-topo-stats
    +  [task->component exec->node+port last-err-fn acc-data]
    +  {:num-tasks (count task->component)
    +   :num-workers (count (:workers-set acc-data))
    +   :num-executors (count exec->node+port)
    +   :bolt-id->stats
    +     (into {} (for [[id m] (:bolt-id->stats acc-data)
    +                    :let [executed (:executed m)]]
    +                     [id (-> m
    +                             (assoc :execute-latency
    +                                    (if (and executed (pos? executed))
    +                                      (div (or (:executeLatencyTotal m) 0)
    +                                           executed)
    +                                      0)
    +                                    :process-latency
    +                                    (if (and executed (pos? executed))
    +                                      (div (or (:processLatencyTotal m) 0)
    +                                           executed)
    +                                      0))
    +                             (dissoc :executeLatencyTotal
    +                                     :processLatencyTotal)
    +                             (assoc :lastError (last-err-fn id)))]))
    +   :spout-id->stats
    +     (into {} (for [[id m] (:spout-id->stats acc-data)
    +                    :let [acked (:acked m)]]
    +                    [id (-> m
    +                            (assoc :completeLatency
    +                                   (if (and acked (pos? acked))
    +                                     (div (:completeLatencyTotal m)
    +                                          (:acked m))
    +                                     0))
    +                            (dissoc :completeLatencyTotal)
    +                            (assoc :lastError (last-err-fn id)))]))
    +   :window->emitted (map-key str (:window->emitted acc-data))
    +   :window->transferred (map-key str (:window->transferred acc-data))
    +   :window->complete-latency
    +     (compute-weighted-averages-per-window acc-data
    +                                           :window->comp-lat-wgt-avg
    +                                           :window->acked)
    +   :window->acked (map-key str (:window->acked acc-data))
    +   :window->failed (map-key str (:window->failed acc-data))})
    +
    +(defn- thriftify-common-agg-stats
    +  [^ComponentAggregateStats s
    +   {:keys [num-tasks
    +           emitted
    +           transferred
    +           acked
    +           failed
    +           num-executors] :as statk->num}]
    +  (let [cas (CommonAggregateStats.)]
    +    (and num-executors (.set_num_executors cas num-executors))
    +    (and num-tasks (.set_num_tasks cas num-tasks))
    +    (and emitted (.set_emitted cas emitted))
    +    (and transferred (.set_transferred cas transferred))
    +    (and acked (.set_acked cas acked))
    +    (and failed (.set_failed cas failed))
    +    (.set_common_stats s cas)))
    +
    +(defn thriftify-bolt-agg-stats
    +  [statk->num]
    +  (let [{:keys [lastError
    +                execute-latency
    +                process-latency
    +                executed
    +                capacity]} statk->num
    +        s (ComponentAggregateStats.)]
    +    (.set_type s ComponentType/BOLT)
    +    (and lastError (.set_last_error s lastError))
    +    (thriftify-common-agg-stats s statk->num)
    +    (.set_specific_stats s
    +      (SpecificAggregateStats/bolt
    +        (let [bas (BoltAggregateStats.)]
    +          (and execute-latency (.set_execute_latency_ms bas execute-latency))
    +          (and process-latency (.set_process_latency_ms bas process-latency))
    +          (and executed (.set_executed bas executed))
    +          (and capacity (.set_capacity bas capacity))
    +          bas)))
    +    s))
    +
    +(defn thriftify-spout-agg-stats
    +  [statk->num]
    +  (let [{:keys [lastError
    +                complete-latency]} statk->num
    +        s (ComponentAggregateStats.)]
    +    (.set_type s ComponentType/SPOUT)
    +    (and lastError (.set_last_error s lastError))
    +    (thriftify-common-agg-stats s statk->num)
    +    (.set_specific_stats s
    +      (SpecificAggregateStats/spout
    +        (let [sas (SpoutAggregateStats.)]
    +          (and complete-latency (.set_complete_latency_ms sas complete-latency))
    +          sas)))
    +    s))
    +
    +(defn thriftify-topo-page-data
    +  [topology-id data]
    +  (let [{:keys [num-tasks
    +                num-workers
    +                num-executors
    +                spout-id->stats
    +                bolt-id->stats
    +                window->emitted
    +                window->transferred
    +                window->complete-latency
    +                window->acked
    +                window->failed]} data
    +        spout-agg-stats (into {}
    +                              (for [[id m] spout-id->stats
    +                                    :let [m (assoc m :type :spout)]]
    +                                [id
    +                                 (thriftify-spout-agg-stats m)]))
    +        bolt-agg-stats (into {}
    +                             (for [[id m] bolt-id->stats
    +                                   :let [m (assoc m :type :bolt)]]
    +                              [id
    +                               (thriftify-bolt-agg-stats m)]))
    +        topology-stats (doto (TopologyStats.)
    +                         (.set_window_to_emitted window->emitted)
    +                         (.set_window_to_transferred window->transferred)
    +                         (.set_window_to_complete_latencies_ms
    +                           window->complete-latency)
    +                         (.set_window_to_acked window->acked)
    +                         (.set_window_to_failed window->failed))
    +      topo-page-info (doto (TopologyPageInfo. topology-id)
    +                       (.set_num_tasks num-tasks)
    +                       (.set_num_workers num-workers)
    +                       (.set_num_executors num-executors)
    +                       (.set_id_to_spout_agg_stats spout-agg-stats)
    +                       (.set_id_to_bolt_agg_stats bolt-agg-stats)
    +                       (.set_topology_stats topology-stats))]
    +    topo-page-info))
    +
    +(defn agg-topo-execs-stats
    +  "Aggregate various executor statistics for a topology from the given
    +  heartbeats."
    +  [topology-id
    +   exec->node+port
    +   task->component
    +   beats
    +   topology
    +   window
    +   include-sys?
    +   last-err-fn]
    +  (->> ;; This iterates over each executor one time, because of lazy evaluation.
    +    (extract-data-from-hb exec->node+port
    +                          task->component
    +                          beats
    +                          include-sys?
    +                          topology)
    +    (aggregate-topo-stats window include-sys?)
    +    (post-aggregate-topo-stats task->component exec->node+port last-err-fn)
    +    (thriftify-topo-page-data topology-id)))
    +
    +(defn- agg-bolt-exec-win-stats
    +  "A helper function that aggregates windowed stats from one bolt executor."
    +  [acc-stats new-stats include-sys?]
    +  (let [{w->execLatWgtAvg :executeLatencyTotal
    +         w->procLatWgtAvg :processLatencyTotal
    +         w->executed :executed}
    +          (swap-map-order
    +            (into {} (for [w (keys (:executed new-stats))]
    +                       [w (agg-bolt-lat-and-count
    +                            (get (:execute-latencies new-stats) w)
    +                            (get (:process-latencies new-stats) w)
    +                            (get (:executed new-stats) w))])))
    +        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
    +    {:window->emitted (->> (:emitted new-stats)
    +                           (map-val handle-sys-components-fn)
    +                           aggregate-count-streams
    +                           (merge-with + (:window->emitted acc-stats)))
    +     :window->transferred (->> (:transferred new-stats)
    +                               (map-val handle-sys-components-fn)
    +                               aggregate-count-streams
    +                               (merge-with + (:window->transferred acc-stats)))
    +     :window->exec-lat-wgt-avg (merge-with +
    +                                           (:window->exec-lat-wgt-avg acc-stats)
    +                                           w->execLatWgtAvg)
    +     :window->proc-lat-wgt-avg (merge-with +
    +                                           (:window->proc-lat-wgt-avg acc-stats)
    +                                           w->procLatWgtAvg)
    +     :window->executed (merge-with + (:window->executed acc-stats) w->executed)
    +     :window->acked (->> (:acked new-stats)
    +                         aggregate-count-streams
    +                         (merge-with + (:window->acked acc-stats)))
    +     :window->failed (->> (:failed new-stats)
    +                          aggregate-count-streams
    +                          (merge-with + (:window->failed acc-stats)))}))
    +
    +(defn- agg-spout-exec-win-stats
    +  "A helper function that aggregates windowed stats from one spout executor."
    +  [acc-stats new-stats include-sys?]
    +  (let [{w->compLatWgtAvg :completeLatencyTotal
    +         w->acked :acked}
    +          (swap-map-order
    +            (into {} (for [w (keys (:acked new-stats))]
    +                       [w (agg-spout-lat-and-count
    +                            (get (:complete-latencies new-stats) w)
    +                            (get (:acked new-stats) w))])))
    +        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
    +    {:window->emitted (->> (:emitted new-stats)
    +                           (map-val handle-sys-components-fn)
    +                           aggregate-count-streams
    +                           (merge-with + (:window->emitted acc-stats)))
    +     :window->transferred (->> (:transferred new-stats)
    +                               (map-val handle-sys-components-fn)
    +                               aggregate-count-streams
    +                               (merge-with + (:window->transferred acc-stats)))
    +     :window->comp-lat-wgt-avg (merge-with +
    +                                           (:window->comp-lat-wgt-avg acc-stats)
    +                                           w->compLatWgtAvg)
    +     :window->acked (->> (:acked new-stats)
    +                         aggregate-count-streams
    +                         (merge-with + (:window->acked acc-stats)))
    +     :window->failed (->> (:failed new-stats)
    +                          aggregate-count-streams
    +                          (merge-with + (:window->failed acc-stats)))}))
    +
    +(defmulti agg-comp-exec-stats
    +  "Combines the aggregate stats of one executor with the given map, selecting
    +  the appropriate window and including system components as specified."
    +  (fn dispatch-fn [_ _ init-val _] (:type init-val)))
    +
    +(defmethod agg-comp-exec-stats :bolt
    +  [window include-sys? acc-stats new-data]
    +  (assoc (agg-bolt-exec-win-stats acc-stats (:stats new-data) include-sys?)
    +         :stats (merge-agg-comp-stats-comp-page-bolt
    +                  (:stats acc-stats)
    +                  (agg-pre-merge-comp-page-bolt new-data window include-sys?))
    +         :type :bolt))
    +
    +(defmethod agg-comp-exec-stats :spout
    +  [window include-sys? acc-stats new-data]
    +  (assoc (agg-spout-exec-win-stats acc-stats (:stats new-data) include-sys?)
    +         :stats (merge-agg-comp-stats-comp-page-spout
    +                  (:stats acc-stats)
    +                  (agg-pre-merge-comp-page-spout new-data window include-sys?))
    +         :type :spout))
    +
    +(defn- aggregate-comp-stats*
    +  [window include-sys? data init-val]
    +  (-> (partial agg-comp-exec-stats
    +               window
    +               include-sys?)
    +      (reduce init-val data)))
    +
    +(defmulti aggregate-comp-stats
    +  (fn dispatch-fn [& args] (-> args last first :stats :type)))
    --- End diff --
    
    This could result in a null dispatch value if the component's heartbeat does not contain metrics.  We should be using the `:type` directly instead of accessing it via `:stats`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/554#discussion_r30920707
  
    --- Diff: storm-core/src/clj/backtype/storm/stats.clj ---
    @@ -373,4 +360,1248 @@
         (ExecutorStats. (window-set-converter (:emitted stats) str)
           (window-set-converter (:transferred stats) str)
           specific-stats
    -      rate)))
    \ No newline at end of file
    +      rate)))
    +
    +(defn- agg-bolt-lat-and-count
    +  "Aggregates number executed and process & execute latencies across all
    +  streams."
    +  [idk->exec-avg idk->proc-avg idk->num-executed]
    +  {:pre (apply = (map #(set (keys %))
    +                      [idk->exec-avg
    +                       idk->proc-avg
    +                       idk->num-executed]))}
    +  (letfn [(weight-avg [[id avg]] (let [num-e (get idk->num-executed id)]
    +                                   (if (and avg num-e)
    +                                     (* avg num-e)
    +                                     0)))]
    +    {:executeLatencyTotal (sum (map weight-avg idk->exec-avg))
    +     :processLatencyTotal (sum (map weight-avg idk->proc-avg))
    +     :executed (sum (vals idk->num-executed))}))
    +
    +(defn- agg-spout-lat-and-count
    +  "Aggregates number acked and complete latencies across all streams."
    +  [sid->comp-avg sid->num-acked]
    +  {:pre (apply = (map #(set (keys %))
    +                      [sid->comp-avg
    +                       sid->num-acked]))}
    +  (letfn [(weight-avg [[id avg]] (* avg (get sid->num-acked id)))]
    +    {:completeLatencyTotal (sum (map weight-avg sid->comp-avg))
    +     :acked (sum (vals sid->num-acked))}))
    +
    +(defn add-pairs
    +  ([] [0 0])
    +  ([[a1 a2] [b1 b2]]
    +   [(+ a1 b1) (+ a2 b2)]))
    +
    +(defn mk-include-sys-fn
    +  [include-sys?]
    +  (if include-sys?
    +    (fn [_] true)
    +    (fn [stream] (and (string? stream) (not (system-id? stream))))))
    +
    +(defn mk-include-sys-filter
    +  "Returns a function that includes or excludes map entries whose keys are
    +  system ids."
    +  [include-sys?]
    +  (if include-sys?
    +    identity
    +    (partial filter-key (mk-include-sys-fn false))))
    +
    +(defn- agg-bolt-streams-lat-and-count
    +  "Aggregates number executed and process & execute latencies."
    +  [idk->exec-avg idk->proc-avg idk->executed]
    +  {:pre (apply = (map #(set (keys %))
    +                      [idk->exec-avg
    +                       idk->proc-avg
    +                       idk->executed]))}
    +  (letfn [(weight-avg [id avg] (let [num-e (idk->executed id)]
    +                                   (if (and avg num-e)
    +                                     (* avg num-e)
    +                                     0)))]
    +    (into {}
    +      (for [k (keys idk->exec-avg)]
    +        [k {:executeLatencyTotal (weight-avg k (idk->exec-avg k))
    +            :processLatencyTotal (weight-avg k (idk->proc-avg k))
    +            :executed (idk->executed k)}]))))
    +
    +(defn- agg-spout-streams-lat-and-count
    +  "Aggregates number acked and complete latencies."
    +  [idk->comp-avg idk->acked]
    +  {:pre (apply = (map #(set (keys %))
    +                      [idk->comp-avg
    +                       idk->acked]))}
    +  (letfn [(weight-avg [id avg] (let [num-e (get idk->acked id)]
    +                                   (if (and avg num-e)
    +                                     (* avg num-e)
    +                                     0)))]
    +    (into {}
    +      (for [k (keys idk->comp-avg)]
    +        [k {:completeLatencyTotal (weight-avg k (get idk->comp-avg k))
    +            :acked (get idk->acked k)}]))))
    +
    +(defn swap-map-order
    +  "{:a {:A 3, :B 5}, :b {:A 1, :B 2}}
    --- End diff --
    
    I'll rewrite.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/storm/pull/554#issuecomment-145666276
  
    Still +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/554#discussion_r30925867
  
    --- Diff: storm-core/src/clj/backtype/storm/stats.clj ---
    @@ -373,4 +360,1248 @@
         (ExecutorStats. (window-set-converter (:emitted stats) str)
           (window-set-converter (:transferred stats) str)
           specific-stats
    -      rate)))
    \ No newline at end of file
    +      rate)))
    +
    +(defn- agg-bolt-lat-and-count
    +  "Aggregates number executed and process & execute latencies across all
    +  streams."
    +  [idk->exec-avg idk->proc-avg idk->num-executed]
    +  {:pre (apply = (map #(set (keys %))
    +                      [idk->exec-avg
    +                       idk->proc-avg
    +                       idk->num-executed]))}
    +  (letfn [(weight-avg [[id avg]] (let [num-e (get idk->num-executed id)]
    +                                   (if (and avg num-e)
    +                                     (* avg num-e)
    +                                     0)))]
    +    {:executeLatencyTotal (sum (map weight-avg idk->exec-avg))
    +     :processLatencyTotal (sum (map weight-avg idk->proc-avg))
    +     :executed (sum (vals idk->num-executed))}))
    +
    +(defn- agg-spout-lat-and-count
    +  "Aggregates number acked and complete latencies across all streams."
    +  [sid->comp-avg sid->num-acked]
    +  {:pre (apply = (map #(set (keys %))
    +                      [sid->comp-avg
    +                       sid->num-acked]))}
    +  (letfn [(weight-avg [[id avg]] (* avg (get sid->num-acked id)))]
    +    {:completeLatencyTotal (sum (map weight-avg sid->comp-avg))
    +     :acked (sum (vals sid->num-acked))}))
    +
    +(defn add-pairs
    +  ([] [0 0])
    +  ([[a1 a2] [b1 b2]]
    +   [(+ a1 b1) (+ a2 b2)]))
    +
    +(defn mk-include-sys-fn
    +  [include-sys?]
    +  (if include-sys?
    +    (fn [_] true)
    +    (fn [stream] (and (string? stream) (not (system-id? stream))))))
    +
    +(defn mk-include-sys-filter
    +  "Returns a function that includes or excludes map entries whose keys are
    +  system ids."
    +  [include-sys?]
    +  (if include-sys?
    +    identity
    +    (partial filter-key (mk-include-sys-fn false))))
    +
    +(defn- agg-bolt-streams-lat-and-count
    +  "Aggregates number executed and process & execute latencies."
    +  [idk->exec-avg idk->proc-avg idk->executed]
    +  {:pre (apply = (map #(set (keys %))
    +                      [idk->exec-avg
    +                       idk->proc-avg
    +                       idk->executed]))}
    +  (letfn [(weight-avg [id avg] (let [num-e (idk->executed id)]
    +                                   (if (and avg num-e)
    +                                     (* avg num-e)
    +                                     0)))]
    +    (into {}
    +      (for [k (keys idk->exec-avg)]
    +        [k {:executeLatencyTotal (weight-avg k (idk->exec-avg k))
    +            :processLatencyTotal (weight-avg k (idk->proc-avg k))
    +            :executed (idk->executed k)}]))))
    +
    +(defn- agg-spout-streams-lat-and-count
    +  "Aggregates number acked and complete latencies."
    +  [idk->comp-avg idk->acked]
    +  {:pre (apply = (map #(set (keys %))
    +                      [idk->comp-avg
    +                       idk->acked]))}
    +  (letfn [(weight-avg [id avg] (let [num-e (get idk->acked id)]
    +                                   (if (and avg num-e)
    +                                     (* avg num-e)
    +                                     0)))]
    +    (into {}
    +      (for [k (keys idk->comp-avg)]
    +        [k {:completeLatencyTotal (weight-avg k (get idk->comp-avg k))
    +            :acked (get idk->acked k)}]))))
    +
    +(defn swap-map-order
    +  "{:a {:A 3, :B 5}, :b {:A 1, :B 2}}
    +    -> {:A {:b 1, :a 3}, :B {:b 2, :a 5}}"
    +  [m]
    +  (apply merge-with
    +         merge
    +         (map (fn [[k v]]
    +                (into {}
    +                      (for [[k2 v2] v]
    +                        [k2 {k v2}])))
    +              m)))
    +
    +(defn- compute-agg-capacity
    +  "Computes the capacity metric for one executor given its heartbeat data and
    +  uptime."
    +  [m uptime]
    +  (when uptime
    +    (->>
    +      ;; For each stream, create weighted averages and counts.
    +      (merge-with (fn weighted-avg+count-fn
    +                    [avg cnt]
    +                    [(* avg cnt) cnt])
    +                  (get (:execute-latencies m) "600")
    +                  (get (:executed m) "600"))
    +      vals ;; Ignore the stream ids.
    +      (reduce add-pairs
    +              [0. 0]) ;; Combine weighted averages and counts.
    +      ((fn [[weighted-avg cnt]]
    +        (div weighted-avg (* 1000 (min uptime 600))))))))
    --- End diff --
    
    Same here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/554#discussion_r30919837
  
    --- Diff: storm-core/src/clj/backtype/storm/stats.clj ---
    @@ -322,16 +307,16 @@
       [(.get_componentId global-stream-id) (.get_streamId global-stream-id)])
     
     (defmethod clojurify-specific-stats BoltStats [^BoltStats stats]
    -  [(window-set-converter (.get_acked stats) from-global-stream-id symbol)
    -   (window-set-converter (.get_failed stats) from-global-stream-id symbol)
    -   (window-set-converter (.get_process_ms_avg stats) from-global-stream-id symbol)
    -   (window-set-converter (.get_executed stats) from-global-stream-id symbol)
    -   (window-set-converter (.get_execute_ms_avg stats) from-global-stream-id symbol)])
    +  [(window-set-converter (.get_acked stats) from-global-stream-id identity)
    --- End diff --
    
    It is not necessary to convert the strings keys coming out from thrift deserialization, since we use them as strings anyway.
    
    It is also confusing to debug, as the symbol `600` and the number `600` do not compare equally, yet they look exactly the same when printed out.
    
    I changed this because in the course of writing the code, I printed the contents of a heartbeat and saw `600`, and when I tried to retrieve the value for the key `600` as a number, I got nothing back.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on the pull request:

    https://github.com/apache/storm/pull/554#issuecomment-145663357
  
    OK, I upmerged, and then had to upmerge again right after that.  I double-checked that uptimeSeconds appear in the places they should, and that I was able to change the log level for a worker running ExclamationTopology.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on the pull request:

    https://github.com/apache/storm/pull/554#issuecomment-104784739
  
    +1



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/554#discussion_r30925625
  
    --- Diff: storm-core/src/clj/backtype/storm/stats.clj ---
    @@ -373,4 +360,1248 @@
         (ExecutorStats. (window-set-converter (:emitted stats) str)
           (window-set-converter (:transferred stats) str)
           specific-stats
    -      rate)))
    \ No newline at end of file
    +      rate)))
    +
    +(defn- agg-bolt-lat-and-count
    +  "Aggregates number executed and process & execute latencies across all
    +  streams."
    +  [idk->exec-avg idk->proc-avg idk->num-executed]
    +  {:pre (apply = (map #(set (keys %))
    +                      [idk->exec-avg
    +                       idk->proc-avg
    +                       idk->num-executed]))}
    +  (letfn [(weight-avg [[id avg]] (let [num-e (get idk->num-executed id)]
    +                                   (if (and avg num-e)
    +                                     (* avg num-e)
    +                                     0)))]
    +    {:executeLatencyTotal (sum (map weight-avg idk->exec-avg))
    +     :processLatencyTotal (sum (map weight-avg idk->proc-avg))
    +     :executed (sum (vals idk->num-executed))}))
    +
    +(defn- agg-spout-lat-and-count
    +  "Aggregates number acked and complete latencies across all streams."
    +  [sid->comp-avg sid->num-acked]
    +  {:pre (apply = (map #(set (keys %))
    +                      [sid->comp-avg
    +                       sid->num-acked]))}
    +  (letfn [(weight-avg [[id avg]] (* avg (get sid->num-acked id)))]
    +    {:completeLatencyTotal (sum (map weight-avg sid->comp-avg))
    +     :acked (sum (vals sid->num-acked))}))
    +
    +(defn add-pairs
    +  ([] [0 0])
    +  ([[a1 a2] [b1 b2]]
    +   [(+ a1 b1) (+ a2 b2)]))
    +
    +(defn mk-include-sys-fn
    +  [include-sys?]
    +  (if include-sys?
    +    (fn [_] true)
    +    (fn [stream] (and (string? stream) (not (system-id? stream))))))
    +
    +(defn mk-include-sys-filter
    +  "Returns a function that includes or excludes map entries whose keys are
    +  system ids."
    +  [include-sys?]
    +  (if include-sys?
    +    identity
    +    (partial filter-key (mk-include-sys-fn false))))
    +
    +(defn- agg-bolt-streams-lat-and-count
    +  "Aggregates number executed and process & execute latencies."
    +  [idk->exec-avg idk->proc-avg idk->executed]
    +  {:pre (apply = (map #(set (keys %))
    +                      [idk->exec-avg
    +                       idk->proc-avg
    +                       idk->executed]))}
    +  (letfn [(weight-avg [id avg] (let [num-e (idk->executed id)]
    +                                   (if (and avg num-e)
    +                                     (* avg num-e)
    +                                     0)))]
    +    (into {}
    +      (for [k (keys idk->exec-avg)]
    +        [k {:executeLatencyTotal (weight-avg k (idk->exec-avg k))
    +            :processLatencyTotal (weight-avg k (idk->proc-avg k))
    +            :executed (idk->executed k)}]))))
    --- End diff --
    
    Why not pull the value out in the for's binding vector?
    ```
    (for [[k v] idk->exec-avg]
      [k {:executeLatencyTotal (weight-avg k v)
        :processLatencyTotal (weight-avg k v)
        :executed v}]))))
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/554#discussion_r30918356
  
    --- Diff: storm-core/src/clj/backtype/storm/stats.clj ---
    @@ -373,4 +360,1248 @@
         (ExecutorStats. (window-set-converter (:emitted stats) str)
           (window-set-converter (:transferred stats) str)
           specific-stats
    -      rate)))
    \ No newline at end of file
    +      rate)))
    +
    +(defn- agg-bolt-lat-and-count
    +  "Aggregates number executed and process & execute latencies across all
    --- End diff --
    
    OK I understand it is order of operations like thing.  I'm not sure a good way to explain it without being ambiguous.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/554#discussion_r30918410
  
    --- Diff: storm-core/src/clj/backtype/storm/stats.clj ---
    @@ -373,4 +360,1248 @@
         (ExecutorStats. (window-set-converter (:emitted stats) str)
           (window-set-converter (:transferred stats) str)
           specific-stats
    -      rate)))
    \ No newline at end of file
    +      rate)))
    +
    +(defn- agg-bolt-lat-and-count
    +  "Aggregates number executed and process & execute latencies across all
    +  streams."
    +  [idk->exec-avg idk->proc-avg idk->num-executed]
    +  {:pre (apply = (map #(set (keys %))
    +                      [idk->exec-avg
    +                       idk->proc-avg
    +                       idk->num-executed]))}
    +  (letfn [(weight-avg [[id avg]] (let [num-e (get idk->num-executed id)]
    +                                   (if (and avg num-e)
    +                                     (* avg num-e)
    +                                     0)))]
    +    {:executeLatencyTotal (sum (map weight-avg idk->exec-avg))
    +     :processLatencyTotal (sum (map weight-avg idk->proc-avg))
    +     :executed (sum (vals idk->num-executed))}))
    +
    +(defn- agg-spout-lat-and-count
    +  "Aggregates number acked and complete latencies across all streams."
    +  [sid->comp-avg sid->num-acked]
    +  {:pre (apply = (map #(set (keys %))
    +                      [sid->comp-avg
    +                       sid->num-acked]))}
    +  (letfn [(weight-avg [[id avg]] (* avg (get sid->num-acked id)))]
    +    {:completeLatencyTotal (sum (map weight-avg sid->comp-avg))
    +     :acked (sum (vals sid->num-acked))}))
    +
    +(defn add-pairs
    +  ([] [0 0])
    +  ([[a1 a2] [b1 b2]]
    +   [(+ a1 b1) (+ a2 b2)]))
    +
    +(defn mk-include-sys-fn
    +  [include-sys?]
    +  (if include-sys?
    +    (fn [_] true)
    +    (fn [stream] (and (string? stream) (not (system-id? stream))))))
    +
    +(defn mk-include-sys-filter
    +  "Returns a function that includes or excludes map entries whose keys are
    +  system ids."
    +  [include-sys?]
    +  (if include-sys?
    +    identity
    +    (partial filter-key (mk-include-sys-fn false))))
    +
    +(defn- agg-bolt-streams-lat-and-count
    +  "Aggregates number executed and process & execute latencies."
    +  [idk->exec-avg idk->proc-avg idk->executed]
    +  {:pre (apply = (map #(set (keys %))
    +                      [idk->exec-avg
    +                       idk->proc-avg
    +                       idk->executed]))}
    +  (letfn [(weight-avg [id avg] (let [num-e (idk->executed id)]
    +                                   (if (and avg num-e)
    +                                     (* avg num-e)
    +                                     0)))]
    +    (into {}
    +      (for [k (keys idk->exec-avg)]
    +        [k {:executeLatencyTotal (weight-avg k (idk->exec-avg k))
    +            :processLatencyTotal (weight-avg k (idk->proc-avg k))
    +            :executed (idk->executed k)}]))))
    +
    +(defn- agg-spout-streams-lat-and-count
    +  "Aggregates number acked and complete latencies."
    +  [idk->comp-avg idk->acked]
    +  {:pre (apply = (map #(set (keys %))
    +                      [idk->comp-avg
    +                       idk->acked]))}
    +  (letfn [(weight-avg [id avg] (let [num-e (get idk->acked id)]
    +                                   (if (and avg num-e)
    +                                     (* avg num-e)
    +                                     0)))]
    +    (into {}
    +      (for [k (keys idk->comp-avg)]
    +        [k {:completeLatencyTotal (weight-avg k (get idk->comp-avg k))
    +            :acked (get idk->acked k)}]))))
    +
    +(defn swap-map-order
    +  "{:a {:A 3, :B 5}, :b {:A 1, :B 2}}
    --- End diff --
    
    This doc string is a bit confusing.  I understand what the method does, but I had to read the code to really get it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/554#discussion_r30918092
  
    --- Diff: storm-core/src/clj/backtype/storm/stats.clj ---
    @@ -373,4 +360,1248 @@
         (ExecutorStats. (window-set-converter (:emitted stats) str)
           (window-set-converter (:transferred stats) str)
           specific-stats
    -      rate)))
    \ No newline at end of file
    +      rate)))
    +
    +(defn- agg-bolt-lat-and-count
    +  "Aggregates number executed and process & execute latencies across all
    --- End diff --
    
    This is really minor but it looks a little funny mixing "and" and "&" this way.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/554#discussion_r30925118
  
    --- Diff: storm-core/src/clj/backtype/storm/stats.clj ---
    @@ -373,4 +360,1248 @@
         (ExecutorStats. (window-set-converter (:emitted stats) str)
           (window-set-converter (:transferred stats) str)
           specific-stats
    -      rate)))
    \ No newline at end of file
    +      rate)))
    +
    +(defn- agg-bolt-lat-and-count
    +  "Aggregates number executed and process & execute latencies across all
    +  streams."
    +  [idk->exec-avg idk->proc-avg idk->num-executed]
    +  {:pre (apply = (map #(set (keys %))
    +                      [idk->exec-avg
    +                       idk->proc-avg
    +                       idk->num-executed]))}
    +  (letfn [(weight-avg [[id avg]] (let [num-e (get idk->num-executed id)]
    +                                   (if (and avg num-e)
    +                                     (* avg num-e)
    +                                     0)))]
    +    {:executeLatencyTotal (sum (map weight-avg idk->exec-avg))
    +     :processLatencyTotal (sum (map weight-avg idk->proc-avg))
    +     :executed (sum (vals idk->num-executed))}))
    +
    +(defn- agg-spout-lat-and-count
    +  "Aggregates number acked and complete latencies across all streams."
    +  [sid->comp-avg sid->num-acked]
    +  {:pre (apply = (map #(set (keys %))
    +                      [sid->comp-avg
    +                       sid->num-acked]))}
    +  (letfn [(weight-avg [[id avg]] (* avg (get sid->num-acked id)))]
    +    {:completeLatencyTotal (sum (map weight-avg sid->comp-avg))
    +     :acked (sum (vals sid->num-acked))}))
    +
    +(defn add-pairs
    +  ([] [0 0])
    +  ([[a1 a2] [b1 b2]]
    +   [(+ a1 b1) (+ a2 b2)]))
    --- End diff --
    
    This is essentially a zipWith:
    ```
    (let [a [1 2]
           b [3 4]]
      (map + a b))
    => (4 6)
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/storm/pull/554#issuecomment-144170034
  
    Thanks for the update I am +1 on the change now.  The aggregation code is a bit complex, but I cannot think of any way to make it less complex right now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on the pull request:

    https://github.com/apache/storm/pull/554#issuecomment-104760930
  
    Review comments should be addressed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/554#discussion_r40716909
  
    --- Diff: storm-core/src/jvm/backtype/storm/generated/ClusterSummary.java ---
    @@ -51,7 +51,7 @@
     import org.slf4j.LoggerFactory;
     
     @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
    -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-3-2")
    +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-9-28")
    --- End diff --
    
    Here is another one that is just a date change


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/554#discussion_r30920821
  
    --- Diff: storm-core/src/clj/backtype/storm/stats.clj ---
    @@ -373,4 +360,1248 @@
         (ExecutorStats. (window-set-converter (:emitted stats) str)
           (window-set-converter (:transferred stats) str)
           specific-stats
    -      rate)))
    \ No newline at end of file
    +      rate)))
    +
    +(defn- agg-bolt-lat-and-count
    +  "Aggregates number executed and process & execute latencies across all
    +  streams."
    +  [idk->exec-avg idk->proc-avg idk->num-executed]
    +  {:pre (apply = (map #(set (keys %))
    +                      [idk->exec-avg
    +                       idk->proc-avg
    +                       idk->num-executed]))}
    +  (letfn [(weight-avg [[id avg]] (let [num-e (get idk->num-executed id)]
    +                                   (if (and avg num-e)
    +                                     (* avg num-e)
    +                                     0)))]
    +    {:executeLatencyTotal (sum (map weight-avg idk->exec-avg))
    +     :processLatencyTotal (sum (map weight-avg idk->proc-avg))
    +     :executed (sum (vals idk->num-executed))}))
    +
    +(defn- agg-spout-lat-and-count
    +  "Aggregates number acked and complete latencies across all streams."
    +  [sid->comp-avg sid->num-acked]
    +  {:pre (apply = (map #(set (keys %))
    +                      [sid->comp-avg
    +                       sid->num-acked]))}
    +  (letfn [(weight-avg [[id avg]] (* avg (get sid->num-acked id)))]
    +    {:completeLatencyTotal (sum (map weight-avg sid->comp-avg))
    +     :acked (sum (vals sid->num-acked))}))
    +
    +(defn add-pairs
    +  ([] [0 0])
    +  ([[a1 a2] [b1 b2]]
    +   [(+ a1 b1) (+ a2 b2)]))
    +
    +(defn mk-include-sys-fn
    +  [include-sys?]
    +  (if include-sys?
    +    (fn [_] true)
    +    (fn [stream] (and (string? stream) (not (system-id? stream))))))
    +
    +(defn mk-include-sys-filter
    +  "Returns a function that includes or excludes map entries whose keys are
    +  system ids."
    +  [include-sys?]
    +  (if include-sys?
    +    identity
    +    (partial filter-key (mk-include-sys-fn false))))
    +
    +(defn- agg-bolt-streams-lat-and-count
    +  "Aggregates number executed and process & execute latencies."
    +  [idk->exec-avg idk->proc-avg idk->executed]
    +  {:pre (apply = (map #(set (keys %))
    +                      [idk->exec-avg
    +                       idk->proc-avg
    +                       idk->executed]))}
    +  (letfn [(weight-avg [id avg] (let [num-e (idk->executed id)]
    +                                   (if (and avg num-e)
    +                                     (* avg num-e)
    +                                     0)))]
    +    (into {}
    +      (for [k (keys idk->exec-avg)]
    +        [k {:executeLatencyTotal (weight-avg k (idk->exec-avg k))
    +            :processLatencyTotal (weight-avg k (idk->proc-avg k))
    +            :executed (idk->executed k)}]))))
    +
    +(defn- agg-spout-streams-lat-and-count
    +  "Aggregates number acked and complete latencies."
    +  [idk->comp-avg idk->acked]
    +  {:pre (apply = (map #(set (keys %))
    +                      [idk->comp-avg
    +                       idk->acked]))}
    +  (letfn [(weight-avg [id avg] (let [num-e (get idk->acked id)]
    +                                   (if (and avg num-e)
    +                                     (* avg num-e)
    +                                     0)))]
    +    (into {}
    +      (for [k (keys idk->comp-avg)]
    +        [k {:completeLatencyTotal (weight-avg k (get idk->comp-avg k))
    +            :acked (get idk->acked k)}]))))
    +
    +(defn swap-map-order
    +  "{:a {:A 3, :B 5}, :b {:A 1, :B 2}}
    +    -> {:A {:b 1, :a 3}, :B {:b 2, :a 5}}"
    +  [m]
    +  (apply merge-with
    +         merge
    +         (map (fn [[k v]]
    +                (into {}
    +                      (for [[k2 v2] v]
    +                        [k2 {k v2}])))
    +              m)))
    +
    +(defn- compute-agg-capacity
    +  "Computes the capacity metric for one executor given its heartbeat data and
    +  uptime."
    +  [m uptime]
    +  (when uptime
    +    (->>
    +      ;; For each stream, create weighted averages and counts.
    +      (merge-with (fn weighted-avg+count-fn
    +                    [avg cnt]
    +                    [(* avg cnt) cnt])
    +                  (get (:execute-latencies m) "600")
    +                  (get (:executed m) "600"))
    +      vals ;; Ignore the stream ids.
    +      (reduce add-pairs
    +              [0. 0]) ;; Combine weighted averages and counts.
    +      ((fn [[weighted-avg cnt]]
    +        (div weighted-avg (* 1000 (min uptime 600))))))))
    +
    +(defn- page+comp-dispatch
    +  [page-type comp-type & _]
    +  [page-type comp-type])
    +
    +(defmulti agg-pre-merge
    --- End diff --
    
    Agreed. I'll make them unique functions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/554#discussion_r30933587
  
    --- Diff: storm-core/src/clj/backtype/storm/stats.clj ---
    @@ -373,4 +360,1248 @@
         (ExecutorStats. (window-set-converter (:emitted stats) str)
           (window-set-converter (:transferred stats) str)
           specific-stats
    -      rate)))
    \ No newline at end of file
    +      rate)))
    +
    +(defn- agg-bolt-lat-and-count
    +  "Aggregates number executed and process & execute latencies across all
    +  streams."
    +  [idk->exec-avg idk->proc-avg idk->num-executed]
    +  {:pre (apply = (map #(set (keys %))
    +                      [idk->exec-avg
    +                       idk->proc-avg
    +                       idk->num-executed]))}
    +  (letfn [(weight-avg [[id avg]] (let [num-e (get idk->num-executed id)]
    +                                   (if (and avg num-e)
    +                                     (* avg num-e)
    +                                     0)))]
    +    {:executeLatencyTotal (sum (map weight-avg idk->exec-avg))
    +     :processLatencyTotal (sum (map weight-avg idk->proc-avg))
    +     :executed (sum (vals idk->num-executed))}))
    +
    +(defn- agg-spout-lat-and-count
    +  "Aggregates number acked and complete latencies across all streams."
    +  [sid->comp-avg sid->num-acked]
    +  {:pre (apply = (map #(set (keys %))
    +                      [sid->comp-avg
    +                       sid->num-acked]))}
    +  (letfn [(weight-avg [[id avg]] (* avg (get sid->num-acked id)))]
    +    {:completeLatencyTotal (sum (map weight-avg sid->comp-avg))
    +     :acked (sum (vals sid->num-acked))}))
    +
    +(defn add-pairs
    +  ([] [0 0])
    +  ([[a1 a2] [b1 b2]]
    +   [(+ a1 b1) (+ a2 b2)]))
    +
    +(defn mk-include-sys-fn
    +  [include-sys?]
    +  (if include-sys?
    +    (fn [_] true)
    +    (fn [stream] (and (string? stream) (not (system-id? stream))))))
    +
    +(defn mk-include-sys-filter
    +  "Returns a function that includes or excludes map entries whose keys are
    +  system ids."
    +  [include-sys?]
    +  (if include-sys?
    +    identity
    +    (partial filter-key (mk-include-sys-fn false))))
    +
    +(defn- agg-bolt-streams-lat-and-count
    +  "Aggregates number executed and process & execute latencies."
    +  [idk->exec-avg idk->proc-avg idk->executed]
    +  {:pre (apply = (map #(set (keys %))
    +                      [idk->exec-avg
    +                       idk->proc-avg
    +                       idk->executed]))}
    +  (letfn [(weight-avg [id avg] (let [num-e (idk->executed id)]
    +                                   (if (and avg num-e)
    +                                     (* avg num-e)
    +                                     0)))]
    +    (into {}
    +      (for [k (keys idk->exec-avg)]
    +        [k {:executeLatencyTotal (weight-avg k (idk->exec-avg k))
    +            :processLatencyTotal (weight-avg k (idk->proc-avg k))
    +            :executed (idk->executed k)}]))))
    --- End diff --
    
    Whoops. Nevermind. Totally missed that the maps were different.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/554#discussion_r30917869
  
    --- Diff: storm-core/src/clj/backtype/storm/stats.clj ---
    @@ -322,16 +307,16 @@
       [(.get_componentId global-stream-id) (.get_streamId global-stream-id)])
     
     (defmethod clojurify-specific-stats BoltStats [^BoltStats stats]
    -  [(window-set-converter (.get_acked stats) from-global-stream-id symbol)
    -   (window-set-converter (.get_failed stats) from-global-stream-id symbol)
    -   (window-set-converter (.get_process_ms_avg stats) from-global-stream-id symbol)
    -   (window-set-converter (.get_executed stats) from-global-stream-id symbol)
    -   (window-set-converter (.get_execute_ms_avg stats) from-global-stream-id symbol)])
    +  [(window-set-converter (.get_acked stats) from-global-stream-id identity)
    --- End diff --
    
    Why exactly are we no longer converting this to a symbol?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on the pull request:

    https://github.com/apache/storm/pull/554#issuecomment-137796743
  
    Leaving a comment here for now: the key [here](https://github.com/d2r/storm/blob/6b606cf68f60858bdc9064c1fe50c873e6dd1971/storm-core/src/clj/backtype/storm/stats.clj#L961) should be `:complete-latency` or else each spout's complete latency will show 0.000 on the topology page.  (Component page is not affected.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/storm/pull/554#issuecomment-145251891
  
    @d2r please upmerge.  The code still looks great but it looks like upSecs and a debug were added into the results.  I really want to get this in, as it will speed up page loads a lot.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/554#discussion_r30917744
  
    --- Diff: storm-core/src/clj/backtype/storm/log.clj ---
    @@ -44,3 +46,11 @@
     (defn log-stream
       [& args]
       (apply log/log-stream args))
    +
    +(defmacro pprint-message
    --- End diff --
    
    I would prefer to have the name log somewhere in the function name.  perhaps log-pprint


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on the pull request:

    https://github.com/apache/storm/pull/554#issuecomment-142993764
  
    This needs to be upmerged with the nimbus-ha changes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on the pull request:

    https://github.com/apache/storm/pull/554#issuecomment-106583119
  
    Handled some corner cases when the worker process crashes such that executor heartbeats have no metrics data.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/554#discussion_r30925662
  
    --- Diff: storm-core/src/clj/backtype/storm/stats.clj ---
    @@ -373,4 +360,1248 @@
         (ExecutorStats. (window-set-converter (:emitted stats) str)
           (window-set-converter (:transferred stats) str)
           specific-stats
    -      rate)))
    \ No newline at end of file
    +      rate)))
    +
    +(defn- agg-bolt-lat-and-count
    +  "Aggregates number executed and process & execute latencies across all
    +  streams."
    +  [idk->exec-avg idk->proc-avg idk->num-executed]
    +  {:pre (apply = (map #(set (keys %))
    +                      [idk->exec-avg
    +                       idk->proc-avg
    +                       idk->num-executed]))}
    +  (letfn [(weight-avg [[id avg]] (let [num-e (get idk->num-executed id)]
    +                                   (if (and avg num-e)
    +                                     (* avg num-e)
    +                                     0)))]
    +    {:executeLatencyTotal (sum (map weight-avg idk->exec-avg))
    +     :processLatencyTotal (sum (map weight-avg idk->proc-avg))
    +     :executed (sum (vals idk->num-executed))}))
    +
    +(defn- agg-spout-lat-and-count
    +  "Aggregates number acked and complete latencies across all streams."
    +  [sid->comp-avg sid->num-acked]
    +  {:pre (apply = (map #(set (keys %))
    +                      [sid->comp-avg
    +                       sid->num-acked]))}
    +  (letfn [(weight-avg [[id avg]] (* avg (get sid->num-acked id)))]
    +    {:completeLatencyTotal (sum (map weight-avg sid->comp-avg))
    +     :acked (sum (vals sid->num-acked))}))
    +
    +(defn add-pairs
    +  ([] [0 0])
    +  ([[a1 a2] [b1 b2]]
    +   [(+ a1 b1) (+ a2 b2)]))
    +
    +(defn mk-include-sys-fn
    +  [include-sys?]
    +  (if include-sys?
    +    (fn [_] true)
    +    (fn [stream] (and (string? stream) (not (system-id? stream))))))
    +
    +(defn mk-include-sys-filter
    +  "Returns a function that includes or excludes map entries whose keys are
    +  system ids."
    +  [include-sys?]
    +  (if include-sys?
    +    identity
    +    (partial filter-key (mk-include-sys-fn false))))
    +
    +(defn- agg-bolt-streams-lat-and-count
    +  "Aggregates number executed and process & execute latencies."
    +  [idk->exec-avg idk->proc-avg idk->executed]
    +  {:pre (apply = (map #(set (keys %))
    +                      [idk->exec-avg
    +                       idk->proc-avg
    +                       idk->executed]))}
    +  (letfn [(weight-avg [id avg] (let [num-e (idk->executed id)]
    +                                   (if (and avg num-e)
    +                                     (* avg num-e)
    +                                     0)))]
    +    (into {}
    +      (for [k (keys idk->exec-avg)]
    +        [k {:executeLatencyTotal (weight-avg k (idk->exec-avg k))
    +            :processLatencyTotal (weight-avg k (idk->proc-avg k))
    +            :executed (idk->executed k)}]))))
    +
    +(defn- agg-spout-streams-lat-and-count
    +  "Aggregates number acked and complete latencies."
    +  [idk->comp-avg idk->acked]
    +  {:pre (apply = (map #(set (keys %))
    +                      [idk->comp-avg
    +                       idk->acked]))}
    +  (letfn [(weight-avg [id avg] (let [num-e (get idk->acked id)]
    +                                   (if (and avg num-e)
    +                                     (* avg num-e)
    +                                     0)))]
    +    (into {}
    +      (for [k (keys idk->comp-avg)]
    +        [k {:completeLatencyTotal (weight-avg k (get idk->comp-avg k))
    +            :acked (get idk->acked k)}]))))
    --- End diff --
    
    Same here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/554#discussion_r30924759
  
    --- Diff: storm-core/src/clj/backtype/storm/log.clj ---
    @@ -44,3 +46,11 @@
     (defn log-stream
       [& args]
       (apply log/log-stream args))
    +
    +(defmacro pprint-message
    --- End diff --
    
    I'll rename it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by d2r <gi...@git.apache.org>.
Github user d2r closed the pull request at:

    https://github.com/apache/storm/pull/554


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/554#discussion_r30925843
  
    --- Diff: storm-core/src/clj/backtype/storm/stats.clj ---
    @@ -373,4 +360,1248 @@
         (ExecutorStats. (window-set-converter (:emitted stats) str)
           (window-set-converter (:transferred stats) str)
           specific-stats
    -      rate)))
    \ No newline at end of file
    +      rate)))
    +
    +(defn- agg-bolt-lat-and-count
    +  "Aggregates number executed and process & execute latencies across all
    +  streams."
    +  [idk->exec-avg idk->proc-avg idk->num-executed]
    +  {:pre (apply = (map #(set (keys %))
    +                      [idk->exec-avg
    +                       idk->proc-avg
    +                       idk->num-executed]))}
    +  (letfn [(weight-avg [[id avg]] (let [num-e (get idk->num-executed id)]
    +                                   (if (and avg num-e)
    +                                     (* avg num-e)
    +                                     0)))]
    +    {:executeLatencyTotal (sum (map weight-avg idk->exec-avg))
    +     :processLatencyTotal (sum (map weight-avg idk->proc-avg))
    +     :executed (sum (vals idk->num-executed))}))
    +
    +(defn- agg-spout-lat-and-count
    +  "Aggregates number acked and complete latencies across all streams."
    +  [sid->comp-avg sid->num-acked]
    +  {:pre (apply = (map #(set (keys %))
    +                      [sid->comp-avg
    +                       sid->num-acked]))}
    +  (letfn [(weight-avg [[id avg]] (* avg (get sid->num-acked id)))]
    +    {:completeLatencyTotal (sum (map weight-avg sid->comp-avg))
    +     :acked (sum (vals sid->num-acked))}))
    +
    +(defn add-pairs
    +  ([] [0 0])
    +  ([[a1 a2] [b1 b2]]
    +   [(+ a1 b1) (+ a2 b2)]))
    +
    +(defn mk-include-sys-fn
    +  [include-sys?]
    +  (if include-sys?
    +    (fn [_] true)
    +    (fn [stream] (and (string? stream) (not (system-id? stream))))))
    +
    +(defn mk-include-sys-filter
    +  "Returns a function that includes or excludes map entries whose keys are
    +  system ids."
    +  [include-sys?]
    +  (if include-sys?
    +    identity
    +    (partial filter-key (mk-include-sys-fn false))))
    +
    +(defn- agg-bolt-streams-lat-and-count
    +  "Aggregates number executed and process & execute latencies."
    +  [idk->exec-avg idk->proc-avg idk->executed]
    +  {:pre (apply = (map #(set (keys %))
    +                      [idk->exec-avg
    +                       idk->proc-avg
    +                       idk->executed]))}
    +  (letfn [(weight-avg [id avg] (let [num-e (idk->executed id)]
    +                                   (if (and avg num-e)
    +                                     (* avg num-e)
    +                                     0)))]
    +    (into {}
    +      (for [k (keys idk->exec-avg)]
    +        [k {:executeLatencyTotal (weight-avg k (idk->exec-avg k))
    +            :processLatencyTotal (weight-avg k (idk->proc-avg k))
    +            :executed (idk->executed k)}]))))
    +
    +(defn- agg-spout-streams-lat-and-count
    +  "Aggregates number acked and complete latencies."
    +  [idk->comp-avg idk->acked]
    +  {:pre (apply = (map #(set (keys %))
    +                      [idk->comp-avg
    +                       idk->acked]))}
    +  (letfn [(weight-avg [id avg] (let [num-e (get idk->acked id)]
    +                                   (if (and avg num-e)
    +                                     (* avg num-e)
    +                                     0)))]
    +    (into {}
    +      (for [k (keys idk->comp-avg)]
    +        [k {:completeLatencyTotal (weight-avg k (get idk->comp-avg k))
    +            :acked (get idk->acked k)}]))))
    +
    +(defn swap-map-order
    +  "{:a {:A 3, :B 5}, :b {:A 1, :B 2}}
    +    -> {:A {:b 1, :a 3}, :B {:b 2, :a 5}}"
    +  [m]
    +  (apply merge-with
    +         merge
    +         (map (fn [[k v]]
    +                (into {}
    +                      (for [[k2 v2] v]
    +                        [k2 {k v2}])))
    +              m)))
    +
    +(defn- compute-agg-capacity
    +  "Computes the capacity metric for one executor given its heartbeat data and
    +  uptime."
    +  [m uptime]
    +  (when uptime
    +    (->>
    +      ;; For each stream, create weighted averages and counts.
    +      (merge-with (fn weighted-avg+count-fn
    +                    [avg cnt]
    +                    [(* avg cnt) cnt])
    +                  (get (:execute-latencies m) "600")
    +                  (get (:executed m) "600"))
    --- End diff --
    
    I'd rather see "600" def'd somewhere.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/554#discussion_r30919755
  
    --- Diff: storm-core/src/clj/backtype/storm/ui/core.clj ---
    @@ -783,128 +504,180 @@
             "errorLapsedSecs" (get-error-time e)
             "error" (.get_error e)})}))
     
    -(defn spout-stats
    -  [window ^TopologyInfo topology-info component executors include-sys?]
    -  (let [window-hint (str " (" (window-hint window) ")")
    -        stats (get-filled-stats executors)
    -        stream-summary (-> stats (aggregate-spout-stats include-sys?))
    -        summary (-> stream-summary aggregate-spout-streams)]
    -    {"spoutSummary" (spout-summary-json
    -                      (.get_id topology-info) component summary window)
    -     "outputStats" (spout-output-stats stream-summary window)
    -     "executorStats" (spout-executor-stats (.get_id topology-info)
    -                                           executors window include-sys?)}))
    -
    -(defn bolt-summary
    -  [topology-id id stats window]
    -  (let [times (stats-times (:emitted stats))
    -        display-map (into {} (for [t times] [t pretty-uptime-sec]))
    -        display-map (assoc display-map ":all-time" (fn [_] "All time"))]
    -    (for [k (concat times [":all-time"])
    -          :let [disp ((display-map k) k)]]
    -      {"window" k
    -       "windowPretty" disp
    -       "emitted" (get-in stats [:emitted k])
    -       "transferred" (get-in stats [:transferred k])
    -       "executeLatency" (float-str (get-in stats [:execute-latencies k]))
    -       "executed" (get-in stats [:executed k])
    -       "processLatency" (float-str (get-in stats [:process-latencies k]))
    -       "acked" (get-in stats [:acked k])
    -       "failed" (get-in stats [:failed k])})))
    -
    -(defn bolt-output-stats
    -  [stream-summary window]
    -  (let [stream-summary (-> stream-summary
    -                           swap-map-order
    -                           (get window)
    -                           (select-keys [:emitted :transferred])
    -                           swap-map-order)]
    -    (for [[s stats] stream-summary]
    -      {"stream" s
    -        "emitted" (nil-to-zero (:emitted stats))
    -        "transferred" (nil-to-zero (:transferred stats))})))
    -
    -(defn bolt-input-stats
    -  [stream-summary window]
    -  (let [stream-summary
    -        (-> stream-summary
    -            swap-map-order
    -            (get window)
    -            (select-keys [:acked :failed :process-latencies
    -                          :executed :execute-latencies])
    -            swap-map-order)]
    -    (for [[^GlobalStreamId s stats] stream-summary]
    -      {"component" (.get_componentId s)
    -       "encodedComponent" (url-encode (.get_componentId s))
    -       "stream" (.get_streamId s)
    -       "executeLatency" (float-str (:execute-latencies stats))
    -       "processLatency" (float-str (:process-latencies stats))
    -       "executed" (nil-to-zero (:executed stats))
    -       "acked" (nil-to-zero (:acked stats))
    -       "failed" (nil-to-zero (:failed stats))})))
    -
    -(defn bolt-executor-stats
    -  [topology-id executors window include-sys?]
    -  (for [^ExecutorSummary e executors
    -        :let [stats (.get_stats e)
    -              stats (if stats
    -                      (-> stats
    -                          (aggregate-bolt-stats include-sys?)
    -                          (aggregate-bolt-streams)
    -                          swap-map-order
    -                          (get window)))]]
    -    {"id" (pretty-executor-info (.get_executor_info e))
    -     "encodedId" (url-encode (pretty-executor-info (.get_executor_info e)))
    -     "uptime" (pretty-uptime-sec (.get_uptime_secs e))
    -     "host" (.get_host e)
    -     "port" (.get_port e)
    -     "emitted" (nil-to-zero (:emitted stats))
    -     "transferred" (nil-to-zero (:transferred stats))
    -     "capacity" (float-str (nil-to-zero (compute-executor-capacity e)))
    -     "executeLatency" (float-str (:execute-latencies stats))
    -     "executed" (nil-to-zero (:executed stats))
    -     "processLatency" (float-str (:process-latencies stats))
    -     "acked" (nil-to-zero (:acked stats))
    -     "failed" (nil-to-zero (:failed stats))
    -     "workerLogLink" (worker-log-link (.get_host e) (.get_port e) topology-id)}))
    -
    -(defn bolt-stats
    -  [window ^TopologyInfo topology-info component executors include-sys?]
    -  (let [window-hint (str " (" (window-hint window) ")")
    -        stats (get-filled-stats executors)
    -        stream-summary (-> stats (aggregate-bolt-stats include-sys?))
    -        summary (-> stream-summary aggregate-bolt-streams)]
    -    {"boltStats" (bolt-summary (.get_id topology-info) component summary window)
    -     "inputStats" (bolt-input-stats stream-summary window)
    -     "outputStats" (bolt-output-stats stream-summary window)
    -     "executorStats" (bolt-executor-stats
    -                       (.get_id topology-info) executors window include-sys?)}))
    +(defmulti unpack-comp-agg-stat
    +  (fn [[_ ^ComponentAggregateStats s]] (.get_type s)))
    +
    +(defmethod unpack-comp-agg-stat ComponentType/BOLT
    +  [[window ^ComponentAggregateStats s]]
    +  (let [^CommonAggregateStats comm-s (.get_common_stats s)
    +        ^SpecificAggregateStats spec-s (.get_specific_stats s)
    +        ^BoltAggregateStats bolt-s (.get_bolt spec-s)]
    +    {"window" window
    +     "windowPretty" (window-hint window)
    +     "emitted" (.get_emitted comm-s)
    +     "transferred" (.get_transferred comm-s)
    +     "acked" (.get_acked comm-s)
    +     "failed" (.get_failed comm-s)
    +     "executeLatency" (float-str (.get_execute_latency_ms bolt-s))
    +     "processLatency"  (float-str (.get_process_latency_ms bolt-s))
    +     "executed" (.get_executed bolt-s)
    +     "capacity" (float-str (.get_capacity bolt-s))}))
    +
    +(defmethod unpack-comp-agg-stat ComponentType/SPOUT
    +  [[window ^ComponentAggregateStats s]]
    +  (let [^CommonAggregateStats comm-s (.get_common_stats s)
    +        ^SpecificAggregateStats spec-s (.get_specific_stats s)
    +        ^SpoutAggregateStats spout-s (.get_spout spec-s)]
    +    {"window" window
    +     "windowPretty" (window-hint window)
    +     "emitted" (.get_emitted comm-s)
    +     "transferred" (.get_transferred comm-s)
    +     "acked" (.get_acked comm-s)
    +     "failed" (.get_failed comm-s)
    +     "completeLatency" (float-str (.get_complete_latency_ms spout-s))}))
    +
    +(defn- unpack-bolt-input-stat
    +  [[^GlobalStreamId s ^ComponentAggregateStats stats]]
    +  (let [^SpecificAggregateStats sas (.get_specific_stats stats)
    +        ^BoltAggregateStats bas (.get_bolt sas)
    +        ^CommonAggregateStats cas (.get_common_stats stats)
    +        comp-id (.get_componentId s)]
    +    {"component" comp-id
    +     "encodedComponentId" (url-encode comp-id)
    +     "stream" (.get_streamId s)
    +     "executeLatency" (float-str (.get_execute_latency_ms bas))
    +     "processLatency" (float-str (.get_process_latency_ms bas))
    +     "executed" (nil-to-zero (.get_executed bas))
    +     "acked" (nil-to-zero (.get_acked cas))
    +     "failed" (nil-to-zero (.get_failed cas))}))
    +
    +(defmulti unpack-comp-output-stat
    +  (fn [[_ ^ComponentAggregateStats s]] (.get_type s)))
    +
    +(defmethod unpack-comp-output-stat ComponentType/BOLT
    +  [[stream-id ^ComponentAggregateStats stats]]
    +  (let [^CommonAggregateStats cas (.get_common_stats stats)]
    +    {"stream" stream-id
    +     "emitted" (nil-to-zero (.get_emitted cas))
    +     "transferred" (nil-to-zero (.get_transferred cas))}))
    +
    +(defmethod unpack-comp-output-stat ComponentType/SPOUT
    +  [[stream-id ^ComponentAggregateStats stats]]
    +  (let [^CommonAggregateStats cas (.get_common_stats stats)
    +        ^SpecificAggregateStats spec-s (.get_specific_stats stats)
    +        ^SpoutAggregateStats spout-s (.get_spout spec-s)]
    +    {"stream" stream-id
    +     "emitted" (nil-to-zero (.get_emitted cas))
    +     "transferred" (nil-to-zero (.get_transferred cas))
    +     "completeLatency" (float-str (.get_complete_latency_ms spout-s))
    +     "acked" (nil-to-zero (.get_acked cas))
    +     "failed" (nil-to-zero (.get_failed cas))}))
    +
    +(defmulti unpack-comp-exec-stat
    +  (fn [_ ^ComponentAggregateStats cas] (.get_type (.get_stats ^ExecutorAggregateStats cas))))
    +
    +(defmethod unpack-comp-exec-stat ComponentType/BOLT
    +  [topology-id ^ExecutorAggregateStats eas]
    +  (let [^ExecutorSummary summ (.get_exec_summary eas)
    +        ^ExecutorInfo info (.get_executor_info summ)
    +        ^ComponentAggregateStats stats (.get_stats eas)
    +        ^SpecificAggregateStats ss (.get_specific_stats stats)
    +        ^BoltAggregateStats bas (.get_bolt ss)
    +        ^CommonAggregateStats cas (.get_common_stats stats)
    +        host (.get_host summ)
    +        port (.get_port summ)
    +        exec-id (pretty-executor-info info)]
    +    {"id" exec-id
    +     "encodedId" (url-encode exec-id)
    +     "uptime" (pretty-uptime-sec (.get_uptime_secs summ))
    +     "host" host
    +     "port" port
    +     "emitted" (nil-to-zero (.get_emitted cas))
    +     "transferred" (nil-to-zero (.get_transferred cas))
    +     "capacity" (float-str (nil-to-zero (.get_capacity bas)))
    +     "executeLatency" (float-str (.get_execute_latency_ms bas))
    +     "executed" (nil-to-zero (.get_executed bas))
    +     "processLatency" (float-str (.get_process_latency_ms bas))
    +     "acked" (nil-to-zero (.get_acked cas))
    +     "failed" (nil-to-zero (.get_failed cas))
    +     "workerLogLink" (worker-log-link host port topology-id)}))
    +
    +(defmethod unpack-comp-exec-stat ComponentType/SPOUT
    +  [topology-id ^ExecutorAggregateStats eas]
    +  (let [^ExecutorSummary summ (.get_exec_summary eas)
    +        ^ExecutorInfo info (.get_executor_info summ)
    +        ^ComponentAggregateStats stats (.get_stats eas)
    +        ^SpecificAggregateStats ss (.get_specific_stats stats)
    +        ^SpoutAggregateStats sas (.get_spout ss)
    +        ^CommonAggregateStats cas (.get_common_stats stats)
    +        host (.get_host summ)
    +        port (.get_port summ)
    +        exec-id (pretty-executor-info info)]
    +    {"id" exec-id
    +     "encodedId" (url-encode exec-id)
    +     "uptime" (pretty-uptime-sec (.get_uptime_secs summ))
    +     "host" host
    +     "port" port
    +     "emitted" (nil-to-zero (.get_emitted cas))
    +     "transferred" (nil-to-zero (.get_transferred cas))
    +     "completeLatency" (float-str (.get_complete_latency_ms sas))
    +     "acked" (nil-to-zero (.get_acked cas))
    +     "failed" (nil-to-zero (.get_failed cas))
    +     "workerLogLink" (worker-log-link host port topology-id)}))
    +
    +(defmulti unpack-component-page-info
    +  "Unpacks component-specific info to clojure data structures"
    +  (fn [^ComponentPageInfo info & _]
    +    (.get_component_type info)))
    +
    +(defmethod unpack-component-page-info ComponentType/BOLT
    +  [^ComponentPageInfo info topology-id window include-sys?]
    +  (merge
    +    {"boltStats" (map unpack-comp-agg-stat (.get_window_to_stats info))
    +     "inputStats" (map unpack-bolt-input-stat (.get_gsid_to_input_stats info))
    +     "outputStats" (map unpack-comp-output-stat (.get_sid_to_output_stats info))
    +     "executorStats" (map (partial unpack-comp-exec-stat topology-id)
    +                          (.get_exec_stats info))}
    +    (-> info .get_errors (component-errors topology-id))))
    +
    +(defmethod unpack-component-page-info ComponentType/SPOUT
    +  [^ComponentPageInfo info topology-id window include-sys?]
    +  (merge
    +    {"spoutStats" (map unpack-comp-agg-stat (.get_window_to_stats info))
    --- End diff --
    
    Is this a backwards incompatible change in the Restful web service?  I see that the mustache tags changed.  I'm not sure we want to do that if someone is relying on this, even if it was not named the best before.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/554#discussion_r30918588
  
    --- Diff: storm-core/src/clj/backtype/storm/stats.clj ---
    @@ -373,4 +360,1248 @@
         (ExecutorStats. (window-set-converter (:emitted stats) str)
           (window-set-converter (:transferred stats) str)
           specific-stats
    -      rate)))
    \ No newline at end of file
    +      rate)))
    +
    +(defn- agg-bolt-lat-and-count
    +  "Aggregates number executed and process & execute latencies across all
    +  streams."
    +  [idk->exec-avg idk->proc-avg idk->num-executed]
    +  {:pre (apply = (map #(set (keys %))
    +                      [idk->exec-avg
    +                       idk->proc-avg
    +                       idk->num-executed]))}
    +  (letfn [(weight-avg [[id avg]] (let [num-e (get idk->num-executed id)]
    +                                   (if (and avg num-e)
    +                                     (* avg num-e)
    +                                     0)))]
    +    {:executeLatencyTotal (sum (map weight-avg idk->exec-avg))
    +     :processLatencyTotal (sum (map weight-avg idk->proc-avg))
    +     :executed (sum (vals idk->num-executed))}))
    +
    +(defn- agg-spout-lat-and-count
    +  "Aggregates number acked and complete latencies across all streams."
    +  [sid->comp-avg sid->num-acked]
    +  {:pre (apply = (map #(set (keys %))
    +                      [sid->comp-avg
    +                       sid->num-acked]))}
    +  (letfn [(weight-avg [[id avg]] (* avg (get sid->num-acked id)))]
    +    {:completeLatencyTotal (sum (map weight-avg sid->comp-avg))
    +     :acked (sum (vals sid->num-acked))}))
    +
    +(defn add-pairs
    +  ([] [0 0])
    +  ([[a1 a2] [b1 b2]]
    +   [(+ a1 b1) (+ a2 b2)]))
    +
    +(defn mk-include-sys-fn
    +  [include-sys?]
    +  (if include-sys?
    +    (fn [_] true)
    +    (fn [stream] (and (string? stream) (not (system-id? stream))))))
    +
    +(defn mk-include-sys-filter
    +  "Returns a function that includes or excludes map entries whose keys are
    +  system ids."
    +  [include-sys?]
    +  (if include-sys?
    +    identity
    +    (partial filter-key (mk-include-sys-fn false))))
    +
    +(defn- agg-bolt-streams-lat-and-count
    +  "Aggregates number executed and process & execute latencies."
    +  [idk->exec-avg idk->proc-avg idk->executed]
    +  {:pre (apply = (map #(set (keys %))
    +                      [idk->exec-avg
    +                       idk->proc-avg
    +                       idk->executed]))}
    +  (letfn [(weight-avg [id avg] (let [num-e (idk->executed id)]
    +                                   (if (and avg num-e)
    +                                     (* avg num-e)
    +                                     0)))]
    +    (into {}
    +      (for [k (keys idk->exec-avg)]
    +        [k {:executeLatencyTotal (weight-avg k (idk->exec-avg k))
    +            :processLatencyTotal (weight-avg k (idk->proc-avg k))
    +            :executed (idk->executed k)}]))))
    +
    +(defn- agg-spout-streams-lat-and-count
    +  "Aggregates number acked and complete latencies."
    +  [idk->comp-avg idk->acked]
    +  {:pre (apply = (map #(set (keys %))
    +                      [idk->comp-avg
    +                       idk->acked]))}
    +  (letfn [(weight-avg [id avg] (let [num-e (get idk->acked id)]
    +                                   (if (and avg num-e)
    +                                     (* avg num-e)
    +                                     0)))]
    +    (into {}
    +      (for [k (keys idk->comp-avg)]
    +        [k {:completeLatencyTotal (weight-avg k (get idk->comp-avg k))
    +            :acked (get idk->acked k)}]))))
    +
    +(defn swap-map-order
    +  "{:a {:A 3, :B 5}, :b {:A 1, :B 2}}
    +    -> {:A {:b 1, :a 3}, :B {:b 2, :a 5}}"
    +  [m]
    +  (apply merge-with
    +         merge
    +         (map (fn [[k v]]
    +                (into {}
    +                      (for [[k2 v2] v]
    +                        [k2 {k v2}])))
    +              m)))
    +
    +(defn- compute-agg-capacity
    +  "Computes the capacity metric for one executor given its heartbeat data and
    +  uptime."
    +  [m uptime]
    +  (when uptime
    +    (->>
    +      ;; For each stream, create weighted averages and counts.
    +      (merge-with (fn weighted-avg+count-fn
    +                    [avg cnt]
    +                    [(* avg cnt) cnt])
    +                  (get (:execute-latencies m) "600")
    +                  (get (:executed m) "600"))
    +      vals ;; Ignore the stream ids.
    +      (reduce add-pairs
    +              [0. 0]) ;; Combine weighted averages and counts.
    +      ((fn [[weighted-avg cnt]]
    +        (div weighted-avg (* 1000 (min uptime 600))))))))
    +
    +(defn- page+comp-dispatch
    +  [page-type comp-type & _]
    +  [page-type comp-type])
    +
    +(defmulti agg-pre-merge
    --- End diff --
    
    I'm not sure a defmulti is the cleanest here.  All the places it is used are hard code the first two parameters which are used for routing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-820] Aggregate topo stats on nimbus, no...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/554#discussion_r30920107
  
    --- Diff: storm-core/src/storm.thrift ---
    @@ -222,6 +222,86 @@ struct TopologyInfo {
     514: optional string owner;
     }
     
    +struct CommonAggregateStats {
    +513: optional i32 num_executors;
    --- End diff --
    
    Can we perhaps make all of these start with a 1?  There were other places that I used a 512 for security to avoid a collision with others possibly making changes at the same time while it was on a different branch, but I don't think we have that problem here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---