You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/09/17 00:20:54 UTC

[1/3] storm git commit: STORM-1027: Use overflow buffer for emitting metrics

Repository: storm
Updated Branches:
  refs/heads/master e78fcd7ff -> 5f98cd2c1


STORM-1027: Use overflow buffer for emitting metrics


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3144a533
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3144a533
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3144a533

Branch: refs/heads/master
Commit: 3144a533b7ae069cd23483544cc58fa962d88fda
Parents: 154e9ec
Author: Abhishek Agarwal <ab...@inmobi.com>
Authored: Fri Sep 4 17:15:02 2015 +0530
Committer: Abhishek Agarwal <ab...@inmobi.com>
Committed: Fri Sep 4 17:15:02 2015 +0530

----------------------------------------------------------------------
 .../src/clj/backtype/storm/daemon/executor.clj  | 71 +++++++++++---------
 1 file changed, 38 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3144a533/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj
index e9c7a2e..326def1 100644
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -294,27 +294,32 @@
           receive-queue
           [[nil (TupleImpl. worker-context [interval] Constants/SYSTEM_TASK_ID Constants/METRICS_TICK_STREAM_ID)]]))))))
 
-(defn metrics-tick [executor-data task-data ^TupleImpl tuple]
-  (let [{:keys [interval->task->metric-registry ^WorkerTopologyContext worker-context]} executor-data
-        interval (.getInteger tuple 0)
-        task-id (:task-id task-data)
-        name->imetric (-> interval->task->metric-registry (get interval) (get task-id))
-        task-info (IMetricsConsumer$TaskInfo.
-                    (hostname (:storm-conf executor-data))
-                    (.getThisWorkerPort worker-context)
-                    (:component-id executor-data)
-                    task-id
-                    (long (/ (System/currentTimeMillis) 1000))
-                    interval)
-        data-points (->> name->imetric
-                      (map (fn [[name imetric]]
-                             (let [value (.getValueAndReset ^IMetric imetric)]
-                               (if value
-                                 (IMetricsConsumer$DataPoint. name value)))))
-                      (filter identity)
-                      (into []))]
-      (if (seq data-points)
-        (task/send-unanchored task-data Constants/METRICS_STREAM_ID [task-info data-points]))))
+(defn metrics-tick
+  ([executor-data task-data ^TupleImpl tuple overflow-buffer]
+   (let [{:keys [interval->task->metric-registry ^WorkerTopologyContext worker-context]} executor-data
+         interval (.getInteger tuple 0)
+         task-id (:task-id task-data)
+         name->imetric (-> interval->task->metric-registry (get interval) (get task-id))
+         task-info (IMetricsConsumer$TaskInfo.
+                     (hostname (:storm-conf executor-data))
+                     (.getThisWorkerPort worker-context)
+                     (:component-id executor-data)
+                     task-id
+                     (long (/ (System/currentTimeMillis) 1000))
+                     interval)
+         data-points (->> name->imetric
+                          (map (fn [[name imetric]]
+                                 (let [value (.getValueAndReset ^IMetric imetric)]
+                                   (if value
+                                     (IMetricsConsumer$DataPoint. name value)))))
+                          (filter identity)
+                          (into []))]
+     (if (seq data-points)
+       (task/send-unanchored task-data Constants/METRICS_STREAM_ID [task-info data-points] overflow-buffer))))
+  ([executor-data task-data ^TupleImpl tuple]
+    (metrics-tick executor-data task-data tuple nil)
+    ))
+
 
 (defn setup-ticks! [worker executor-data]
   (let [storm-conf (:storm-conf executor-data)
@@ -453,7 +458,16 @@
         last-active (atom false)        
         spouts (ArrayList. (map :object (vals task-datas)))
         rand (Random. (Utils/secureRandomLong))
-        
+
+        ;; the overflow buffer is used to ensure that spouts never block when emitting
+        ;; this ensures that the spout can always clear the incoming buffer (acks and fails), which
+        ;; prevents deadlock from occuring across the topology (e.g. Spout -> Bolt -> Acker -> Spout, and all
+        ;; buffers filled up)
+        ;; when the overflow buffer is full, spouts stop calling nextTuple until it's able to clear the overflow buffer
+        ;; this limits the size of the overflow buffer to however many tuples a spout emits in one call of nextTuple, 
+        ;; preventing memory issues
+        overflow-buffer (ConcurrentLinkedQueue.)
+
         pending (RotatingMap.
                  2 ;; microoptimize for performance of .size method
                  (reify RotatingMap$ExpiredCallback
@@ -465,7 +479,7 @@
                           (let [stream-id (.getSourceStreamId tuple)]
                             (condp = stream-id
                               Constants/SYSTEM_TICK_STREAM_ID (.rotate pending)
-                              Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple)
+                              Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple overflow-buffer)
                               Constants/CREDENTIALS_CHANGED_STREAM_ID 
                                 (let [task-data (get task-datas task-id)
                                       spout-obj (:object task-data)]
@@ -489,16 +503,7 @@
         event-handler (mk-task-receiver executor-data tuple-action-fn)
         has-ackers? (has-ackers? storm-conf)
         emitted-count (MutableLong. 0)
-        empty-emit-streak (MutableLong. 0)
-        
-        ;; the overflow buffer is used to ensure that spouts never block when emitting
-        ;; this ensures that the spout can always clear the incoming buffer (acks and fails), which
-        ;; prevents deadlock from occuring across the topology (e.g. Spout -> Bolt -> Acker -> Spout, and all
-        ;; buffers filled up)
-        ;; when the overflow buffer is full, spouts stop calling nextTuple until it's able to clear the overflow buffer
-        ;; this limits the size of the overflow buffer to however many tuples a spout emits in one call of nextTuple, 
-        ;; preventing memory issues
-        overflow-buffer (ConcurrentLinkedQueue.)]
+        empty-emit-streak (MutableLong. 0)]
    
     [(async-loop
       (fn []


[3/3] storm git commit: Merge branch 'work' of https://github.com/abhishekagarwal87/storm into STORM-1027

Posted by ka...@apache.org.
Merge branch 'work' of https://github.com/abhishekagarwal87/storm into STORM-1027


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5f98cd2c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5f98cd2c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5f98cd2c

Branch: refs/heads/master
Commit: 5f98cd2c1754b3b1b1844d83fd4eb1fb18588fe4
Parents: e78fcd7 62d30a0
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Sep 17 07:19:02 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Sep 17 07:19:02 2015 +0900

----------------------------------------------------------------------
 .../src/clj/backtype/storm/daemon/executor.clj  | 93 +++++++++++---------
 1 file changed, 49 insertions(+), 44 deletions(-)
----------------------------------------------------------------------



[2/3] storm git commit: STORM-1027: Apply overflow buffer to metric emit in bolt

Posted by ka...@apache.org.
STORM-1027: Apply overflow buffer to metric emit in bolt


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/62d30a04
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/62d30a04
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/62d30a04

Branch: refs/heads/master
Commit: 62d30a04371f52ba5cd29279d8dd7e9f19954d8c
Parents: 3144a53
Author: Abhishek Agarwal <ab...@inmobi.com>
Authored: Wed Sep 9 18:53:04 2015 +0530
Committer: Abhishek Agarwal <ab...@inmobi.com>
Committed: Wed Sep 9 18:53:04 2015 +0530

----------------------------------------------------------------------
 .../src/clj/backtype/storm/daemon/executor.clj  | 22 ++++++++++----------
 1 file changed, 11 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/62d30a04/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj
index 326def1..a9a36a2 100644
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -646,6 +646,15 @@
         {:keys [storm-conf component-id worker-context transfer-fn report-error sampler
                 open-or-prepare-was-called?]} executor-data
         rand (Random. (Utils/secureRandomLong))
+
+        ;; the overflow buffer is used to ensure that bolts do not block when emitting
+        ;; this ensures that the bolt can always clear the incoming messages, which
+        ;; prevents deadlock from occurs across the topology
+        ;; (e.g. Spout -> BoltA -> Splitter -> BoltB -> BoltA, and all
+        ;; buffers filled up)
+        ;; the overflow buffer is might gradually fill degrading the performance gradually
+        ;; eventually running out of memory, but at least prevent live-locks/deadlocks.
+        overflow-buffer (if (storm-conf TOPOLOGY-BOLTS-OUTGOING-OVERFLOW-BUFFER-ENABLE) (ConcurrentLinkedQueue.) nil)
         tuple-action-fn (fn [task-id ^TupleImpl tuple]
                           ;; synchronization needs to be done with a key provided by this bolt, otherwise:
                           ;; spout 1 sends synchronization (s1), dies, same spout restarts somewhere else, sends synchronization (s2) and incremental update. s2 and update finish before s1 -> lose the incremental update
@@ -670,7 +679,7 @@
                                       bolt-obj (:object task-data)]
                                   (when (instance? ICredentialsListener bolt-obj)
                                     (.setCredentials bolt-obj (.getValue tuple 0))))
-                              Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple)
+                              Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple overflow-buffer)
                               (let [task-data (get task-datas task-id)
                                     ^IBolt bolt-obj (:object task-data)
                                     user-context (:user-context task-data)
@@ -696,16 +705,7 @@
                                     (stats/bolt-execute-tuple! executor-stats
                                                                (.getSourceComponent tuple)
                                                                (.getSourceStreamId tuple)
-                                                               delta)))))))
-
-        ;; the overflow buffer is used to ensure that bolts do not block when emitting
-        ;; this ensures that the bolt can always clear the incoming messages, which
-        ;; prevents deadlock from occurs across the topology
-        ;; (e.g. Spout -> BoltA -> Splitter -> BoltB -> BoltA, and all
-        ;; buffers filled up)
-        ;; the overflow buffer is might gradually fill degrading the performance gradually
-        ;; eventually running out of memory, but at least prevent live-locks/deadlocks.
-        overflow-buffer (if (storm-conf TOPOLOGY-BOLTS-OUTGOING-OVERFLOW-BUFFER-ENABLE) (ConcurrentLinkedQueue.) nil)]
+                                                               delta)))))))]
     
     ;; TODO: can get any SubscribedState objects out of the context now