You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/03/31 21:54:54 UTC

[1/3] storm git commit: STORM-1271: Port backtype.storm.daemon.task to java

Repository: storm
Updated Branches:
  refs/heads/master 223b615d1 -> 084492782


STORM-1271: Port backtype.storm.daemon.task to java


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7d63cb33
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7d63cb33
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7d63cb33

Branch: refs/heads/master
Commit: 7d63cb33a99940e11663257b515186e40d4f239e
Parents: 4264bfc
Author: Abhishek Agarwal <ab...@inmobi.com>
Authored: Fri Mar 25 12:19:12 2016 +0530
Committer: Abhishek Agarwal <ab...@inmobi.com>
Committed: Wed Mar 30 21:43:54 2016 +0530

----------------------------------------------------------------------
 storm-core/src/clj/org/apache/storm/config.clj  |  27 --
 .../clj/org/apache/storm/daemon/executor.clj    | 222 +++++++----------
 .../src/clj/org/apache/storm/daemon/task.clj    | 190 --------------
 .../org/apache/storm/daemon/GrouperFactory.java | 243 ++++++++++++++++++
 .../src/jvm/org/apache/storm/daemon/Task.java   | 247 +++++++++++++++++++
 .../daemon/metrics/BuiltinMetricsUtil.java      |   8 +-
 .../apache/storm/hooks/info/BoltAckInfo.java    |   8 +
 .../storm/hooks/info/BoltExecuteInfo.java       |   8 +
 .../apache/storm/hooks/info/BoltFailInfo.java   |   8 +
 .../org/apache/storm/hooks/info/EmitInfo.java   |   9 +
 .../apache/storm/hooks/info/SpoutAckInfo.java   |   9 +
 .../apache/storm/hooks/info/SpoutFailInfo.java  |   9 +
 .../jvm/org/apache/storm/utils/ConfigUtils.java |  35 ++-
 .../test/clj/org/apache/storm/grouping_test.clj |  19 +-
 14 files changed, 675 insertions(+), 367 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/src/clj/org/apache/storm/config.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/config.clj b/storm-core/src/clj/org/apache/storm/config.clj
index e50f023..d0c4d87 100644
--- a/storm-core/src/clj/org/apache/storm/config.clj
+++ b/storm-core/src/clj/org/apache/storm/config.clj
@@ -42,30 +42,3 @@
 (defn cluster-mode
   [conf & args]
   (keyword (conf STORM-CLUSTER-MODE)))
-
-(defn sampling-rate
-  [conf]
-  (->> (conf TOPOLOGY-STATS-SAMPLE-RATE)
-    (/ 1)
-    int))
-
-(defn- even-sampler
-  [freq]
-  (let [freq (int freq)
-        start (int 0)
-        r (java.util.Random.)
-        curr (MutableInt. -1)
-        target (MutableInt. (.nextInt r freq))]
-    (with-meta
-      (fn []
-        (let [i (.increment curr)]
-          (when (>= i freq)
-            (.set curr start)
-            (.set target (.nextInt r freq))))
-        (= (.get curr) (.get target)))
-      {:rate freq})))
-
-;; TODO this function together with sampling-rate are to be replaced with Java version when util.clj is in
-(defn mk-stats-sampler
-  [conf]
-  (even-sampler (sampling-rate conf)))

http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index e759b1d..8a77a61 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -15,6 +15,7 @@
 ;; limitations under the License.
 (ns org.apache.storm.daemon.executor
   (:use [org.apache.storm.daemon common])
+  (:use [clojure.walk])
   (:import [org.apache.storm.generated Grouping Grouping$_Fields]
            [java.io Serializable]
            [org.apache.storm.stats BoltExecutorStats SpoutExecutorStats]
@@ -33,7 +34,7 @@
   (:import [org.apache.storm.utils Utils ConfigUtils TupleUtils MutableObject RotatingMap RotatingMap$ExpiredCallback MutableLong Time DisruptorQueue WorkerBackpressureThread DisruptorBackpressureCallback])
   (:import [com.lmax.disruptor InsufficientCapacityException])
   (:import [org.apache.storm.serialization KryoTupleSerializer])
-  (:import [org.apache.storm.daemon Shutdownable StormCommon Acker])
+  (:import [org.apache.storm.daemon Shutdownable StormCommon Acker Task GrouperFactory])
   (:import [org.apache.storm.metric.api IMetric IMetricsConsumer$TaskInfo IMetricsConsumer$DataPoint StateMetric])
   (:import [org.apache.storm Config Constants])
   (:import [org.apache.storm.cluster ClusterStateContext DaemonType StormClusterStateImpl ClusterUtils])
@@ -43,74 +44,8 @@
            [org.json.simple JSONValue]
            [com.lmax.disruptor.dsl ProducerType]
            [org.apache.storm StormTimer])
-  (:require [org.apache.storm.daemon [task :as task]])
   (:require [clojure.set :as set]))
 
-(defn- mk-fields-grouper
-  [^Fields out-fields ^Fields group-fields ^List target-tasks]
-  (let [num-tasks (count target-tasks)
-        task-getter (fn [i] (.get target-tasks i))]
-    (fn [task-id ^List values load]
-      (-> (.select out-fields group-fields values)
-          (TupleUtils/listHashCode)
-          (mod num-tasks)
-          task-getter))))
-
-(defn- mk-custom-grouper
-  [^CustomStreamGrouping grouping ^WorkerTopologyContext context ^String component-id ^String stream-id target-tasks]
-  (.prepare grouping context (GlobalStreamId. component-id stream-id) target-tasks)
-  (if (instance? LoadAwareCustomStreamGrouping grouping)
-    (fn [task-id ^List values load]
-      (.chooseTasks ^LoadAwareCustomStreamGrouping grouping task-id values load))
-    (fn [task-id ^List values load]
-      (.chooseTasks grouping task-id values))))
-
-(defn mk-shuffle-grouper
-  [^List target-tasks topo-conf ^WorkerTopologyContext context ^String component-id ^String stream-id]
-  (if (.get topo-conf TOPOLOGY-DISABLE-LOADAWARE-MESSAGING)
-    (mk-custom-grouper (ShuffleGrouping.) context component-id stream-id target-tasks)
-    (mk-custom-grouper (LoadAwareShuffleGrouping.) context component-id stream-id target-tasks)))
-
-(defn- mk-grouper
-  "Returns a function that returns a vector of which task indices to send tuple to, or just a single task index."
-  [^WorkerTopologyContext context component-id stream-id ^Fields out-fields thrift-grouping ^List target-tasks topo-conf]
-  (let [num-tasks (count target-tasks)
-        random (Random.)
-        target-tasks (vec (sort target-tasks))]
-    (condp = (Thrift/groupingType thrift-grouping)
-      Grouping$_Fields/FIELDS
-        (if (Thrift/isGlobalGrouping thrift-grouping)
-          (fn [task-id tuple load]
-            ;; It's possible for target to have multiple tasks if it reads multiple sources
-            (first target-tasks))
-          (let [group-fields (Fields. (Thrift/fieldGrouping thrift-grouping))]
-            (mk-fields-grouper out-fields group-fields target-tasks)
-            ))
-      Grouping$_Fields/ALL
-        (fn [task-id tuple load] target-tasks)
-      Grouping$_Fields/SHUFFLE
-        (mk-shuffle-grouper target-tasks topo-conf context component-id stream-id)
-      Grouping$_Fields/LOCAL_OR_SHUFFLE
-        (let [same-tasks (set/intersection
-                           (set target-tasks)
-                           (set (.getThisWorkerTasks context)))]
-          (if-not (empty? same-tasks)
-            (mk-shuffle-grouper (vec same-tasks) topo-conf context component-id stream-id)
-            (mk-shuffle-grouper target-tasks topo-conf context component-id stream-id)))
-      Grouping$_Fields/NONE
-        (fn [task-id tuple load]
-          (let [i (mod (.nextInt random) num-tasks)]
-            (get target-tasks i)
-            ))
-      Grouping$_Fields/CUSTOM_OBJECT
-        (let [grouping (Thrift/instantiateJavaObject (.get_custom_object thrift-grouping))]
-          (mk-custom-grouper grouping context component-id stream-id target-tasks))
-      Grouping$_Fields/CUSTOM_SERIALIZED
-        (let [grouping (Utils/javaDeserialize (.get_custom_serialized thrift-grouping) Serializable)]
-          (mk-custom-grouper grouping context component-id stream-id target-tasks))
-      Grouping$_Fields/DIRECT
-        :direct
-      )))
 
 ;TODO: when translating this function, you should replace the filter-val with a proper for loop + if condition HERE
 (defn- outbound-groupings
@@ -122,13 +57,13 @@
                         pos?))
        (map (fn [[component tgrouping]]
                [component
-                (mk-grouper worker-context
-                            this-component-id
-                            stream-id
-                            out-fields
-                            tgrouping
-                            (.getComponentTasks worker-context component)
-                            topo-conf)]))
+                (GrouperFactory/mkGrouper worker-context
+                                this-component-id
+                                stream-id
+                                out-fields
+                                tgrouping
+                                (.getComponentTasks worker-context component)
+                                topo-conf)]))
        (into {})
        (HashMap.)))
 
@@ -153,11 +88,11 @@
   (let [topology (.getRawTopology context)
         spouts (.get_spouts topology)
         bolts (.get_bolts topology)]
-    (cond (contains? spouts component-id) :spout
-          (contains? bolts component-id) :bolt
+    (cond (contains? spouts component-id) "spout"
+          (contains? bolts component-id) "bolt"
           :else (throw (RuntimeException. (str "Could not find " component-id " in topology " topology))))))
 
-(defn executor-selector [executor-data & _] (:type executor-data))
+(defn executor-selector [executor-data & _] (keyword (:type executor-data)))
 
 (defmulti mk-threads executor-selector)
 (defmulti mk-executor-stats executor-selector)
@@ -275,9 +210,9 @@
                                     (Utils/exceptionCauseIsInstanceOf java.io.InterruptedIOException error))
                                  (log-message "Got interrupted excpetion shutting thread down...")
                                  ((:suicide-fn <>)))))
-     :sampler (mk-stats-sampler storm-conf)
+     :sampler (ConfigUtils/mkStatsSampler storm-conf)
      :backpressure (atom false)
-     :spout-throttling-metrics (if (= executor-type :spout) 
+     :spout-throttling-metrics (if (= (keyword executor-type) :spout)
                                  (SpoutThrottlingMetrics.)
                                 nil)
      ;; TODO: add in the executor-specific stuff in a :specific... or make a spout-data, bolt-data function?
@@ -319,6 +254,13 @@
       (.getName batch-transfer-queue)
       (:uncaught-exception-handler (:report-error-and-die executor-data)))))
 
+;; TODO: this is all expensive... should be precomputed
+(defn send-unanchored
+  [^Task task-data stream values transfer-fn]
+  (let [out-tuple (.getTuple task-data stream values)]
+    (fast-list-iter [t (.getOutgoingTasks task-data stream values)]
+                    (transfer-fn t out-tuple))))
+
 (defn setup-metrics! [executor-data]
   (let [{:keys [storm-conf receive-queue worker-context interval->task->metric-registry]} executor-data
         distinct-time-bucket-intervals (keys interval->task->metric-registry)]
@@ -335,7 +277,8 @@
   [executor-data task-data ^TupleImpl tuple]
    (let [{:keys [interval->task->metric-registry ^WorkerTopologyContext worker-context]} executor-data
          interval (.getInteger tuple 0)
-         task-id (:task-id task-data)
+         transfer-fn (:transfer-fn executor-data)
+         task-id (.getTaskId task-data)
          name->imetric (-> interval->task->metric-registry (get interval) (get task-id))
          task-info (IMetricsConsumer$TaskInfo.
                      (Utils/hostname (:storm-conf executor-data))
@@ -352,7 +295,7 @@
                           (filter identity)
                           (into []))]
      (when (seq data-points)
-       (task/send-unanchored task-data Constants/METRICS_STREAM_ID [task-info data-points]))))
+       (send-unanchored task-data Constants/METRICS_STREAM_ID [task-info data-points] transfer-fn))))
 
 (defn setup-ticks! [worker executor-data]
   (let [storm-conf (:storm-conf executor-data)
@@ -362,7 +305,7 @@
     (when tick-time-secs
       (if (or (Utils/isSystemId (:component-id executor-data))
               (and (= false (storm-conf TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS))
-                   (= :spout (:type executor-data))))
+                   (= :spout (keyword (:type executor-data)))))
         (log-message "Timeouts disabled for executor " (:component-id executor-data) ":" (:executor-id executor-data))
         (.scheduleRecurring
           (:user-timer worker)
@@ -374,10 +317,18 @@
 
 (defn mk-executor [worker executor-id initial-credentials]
   (let [executor-data (mk-executor-data worker executor-id)
+        transfer-fn (:transfer-fn executor-data)
         _ (log-message "Loading executor " (:component-id executor-data) ":" (pr-str executor-id))
         task-datas (->> executor-data
                         :task-ids
-                        (map (fn [t] [t (task/mk-task executor-data t)]))
+                        (map (fn [t] (let [task (Task. (stringify-keys executor-data)  t)
+                                           stream StormCommon/SYSTEM_STREAM_ID
+                                           values ["startup"]]
+                                       ;; when this is called, the threads for the executor haven't been started yet,
+                                       ;; so we won't be risking trampling on the single-threaded claim strategy disruptor queue
+                                       (send-unanchored task stream values transfer-fn)
+                                       [t task]
+                                       )))
                         (into {})
                         (HashMap.))
         _ (log-message "Loaded executor tasks " (:component-id executor-data) ":" (pr-str executor-id))
@@ -426,36 +377,36 @@
           (.interrupt t)
           (.join t))
         
-        (doseq [user-context (map :user-context (vals task-datas))]
+        (doseq [user-context (map #(.getUserContext %) (vals task-datas))]
           (doseq [hook (.getHooks user-context)]
             (.cleanup hook)))
         (.disconnect (:storm-cluster-state executor-data))
         (when @(:open-or-prepare-was-called? executor-data)
-          (doseq [obj (map :object (vals task-datas))]
+          (doseq [obj (map #(.getTaskObject %) (vals task-datas))]
             (close-component executor-data obj)))
         (log-message "Shut down executor " component-id ":" (pr-str executor-id)))
         )))
 
 (defn- fail-spout-msg [executor-data task-data msg-id tuple-info time-delta reason id]
-  (let [^ISpout spout (:object task-data)
+  (let [^ISpout spout (.getTaskObject task-data)
         storm-conf (:storm-conf executor-data)
-        task-id (:task-id task-data)]
+        task-id (.getTaskId task-data)]
     ;;TODO: need to throttle these when there's lots of failures
     (when (= true (storm-conf TOPOLOGY-DEBUG))
       (log-message "SPOUT Failing " id ": " tuple-info " REASON: " reason " MSG-ID: " msg-id))
     (.fail spout msg-id)
-    (task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id task-id time-delta))
+    (.applyOn (SpoutFailInfo. msg-id task-id time-delta) (.getUserContext task-data))
     (when time-delta
       (.spoutFailedTuple (:stats executor-data) (:stream tuple-info) time-delta))))
 
 (defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta id]
   (let [storm-conf (:storm-conf executor-data)
-        ^ISpout spout (:object task-data)
-        task-id (:task-id task-data)]
+        ^ISpout spout (.getTaskObject task-data)
+        task-id (.getTaskId task-data)]
     (when (= true (storm-conf TOPOLOGY-DEBUG))
       (log-message "SPOUT Acking message " id " " msg-id))
     (.ack spout msg-id)
-    (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta))
+    (.applyOn (SpoutAckInfo. msg-id task-id time-delta) (.getUserContext task-data))
     (when time-delta
       (.spoutAckedTuple (:stats executor-data) (:stream tuple-info) time-delta))))
 
@@ -496,10 +447,11 @@
       ;; the thread's initialized random number generator is used to generate
       ;; uniformily distributed random numbers.
       (when (and (> spct 0) (< (* 100 (.nextDouble random)) spct))
-        (task/send-unanchored
+        (send-unanchored
           task-data
           StormCommon/EVENTLOGGER_STREAM_ID
-          [component-id message-id (System/currentTimeMillis) values]))))
+          [component-id message-id (System/currentTimeMillis) values]
+          (:transfer-fn executor-data)))))
 
 (defmethod mk-threads :spout [executor-data task-datas initial-credentials]
   (let [{:keys [storm-conf component-id worker-context transfer-fn report-error sampler open-or-prepare-was-called?]} executor-data
@@ -507,7 +459,7 @@
         max-spout-pending (executor-max-spout-pending storm-conf (count task-datas))
         ^Integer max-spout-pending (if max-spout-pending (int max-spout-pending))        
         last-active (atom false)        
-        spouts (ArrayList. (map :object (vals task-datas)))
+        spouts (ArrayList. (map #(.getTaskObject %) (vals task-datas)))
         rand (Random. (Utils/secureRandomLong))
         ^DisruptorQueue transfer-queue (executor-data :batch-transfer-queue)
         debug? (= true (storm-conf TOPOLOGY-DEBUG))
@@ -526,7 +478,7 @@
                               Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple)
                               Constants/CREDENTIALS_CHANGED_STREAM_ID 
                                 (let [task-data (get task-datas task-id)
-                                      spout-obj (:object task-data)]
+                                      spout-obj (.getTaskObject task-data)]
                                   (when (instance? ICredentialsListener spout-obj)
                                     (.setCredentials spout-obj (.getValue tuple 0))))
                               Acker/ACKER_RESET_TIMEOUT_STREAM_ID 
@@ -559,15 +511,14 @@
                             (while (not @(:storm-active-atom executor-data))
                               (Thread/sleep 100))
                             (log-message "Opening spout " component-id ":" (keys task-datas))
-                            (.registerAll (:spout-throttling-metrics executor-data) storm-conf (:user-context (first (vals task-datas))))
+                            (.registerAll (:spout-throttling-metrics executor-data) storm-conf (.getUserContext (first (vals task-datas))))
                             (doseq [[task-id task-data] task-datas
-                                    :let [^ISpout spout-obj (:object task-data)
-                                          tasks-fn (:tasks-fn task-data)
+                                    :let [^ISpout spout-obj (.getTaskObject task-data)
                                           send-spout-msg (fn [out-stream-id values message-id out-task-id]
                                                            (.increment emitted-count)
                                                            (let [out-tasks (if out-task-id
-                                                                             (tasks-fn out-task-id out-stream-id values)
-                                                                             (tasks-fn out-stream-id values))
+                                                                             (.getOutgoingTasks task-data out-task-id out-stream-id values)
+                                                                             (.getOutgoingTasks task-data out-stream-id values))
                                                                  rooted? (and message-id has-ackers?)
                                                                  root-id (if rooted? (MessageId/generateId rand))
                                                                  ^List out-ids (fast-list-for [t out-tasks] (if rooted? (MessageId/generateId rand)))]
@@ -590,25 +541,27 @@
                                                                                         message-id
                                                                                         {:stream out-stream-id 
                                                                                          :values (if debug? values nil)}
-                                                                                        (if (sampler) (System/currentTimeMillis))])
-                                                                 (task/send-unanchored task-data
+                                                                                        (if (.call ^Callable sampler) (System/currentTimeMillis))])
+                                                                 (send-unanchored task-data
                                                                                        Acker/ACKER_INIT_STREAM_ID
-                                                                                       [root-id (Utils/bitXorVals out-ids) task-id]))
+                                                                                       [root-id (Utils/bitXorVals out-ids) task-id]
+                                                                                       (:transfer-fn executor-data)))
                                                                (when message-id
                                                                  (ack-spout-msg executor-data task-data message-id
                                                                                 {:stream out-stream-id :values values}
-                                                                                (if (sampler) 0) "0:")))
+                                                                                (if (.call ^Callable sampler) 0) "0:")))
                                                              (or out-tasks [])))]]
 
-                              (.registerAll (:builtin-metrics task-data) storm-conf (:user-context task-data))
+                              (.registerAll (.getBuiltInMetrics task-data) storm-conf (.getUserContext task-data))
                               (BuiltinMetricsUtil/registerQueueMetrics {"sendqueue" (:batch-transfer-queue executor-data)
                                                                        "receive" receive-queue}
-                                                                      storm-conf (:user-context task-data))
+                                                                      storm-conf (.getUserContext task-data))
+
                               (when (instance? ICredentialsListener spout-obj) (.setCredentials spout-obj initial-credentials))
 
                               (.open spout-obj
                                      storm-conf
-                                     (:user-context task-data)
+                                     (.getUserContext task-data)
                                      (SpoutOutputCollector.
                                       (reify ISpoutOutputCollector
                                         (^long getPendingCount[this]
@@ -694,7 +647,7 @@
 
 (defmethod mk-threads :bolt [executor-data task-datas initial-credentials]
   (let [storm-conf (:storm-conf executor-data)
-        execute-sampler (mk-stats-sampler storm-conf)
+        execute-sampler (ConfigUtils/mkStatsSampler storm-conf)
         executor-stats (:stats executor-data)
         {:keys [storm-conf component-id worker-context transfer-fn report-error sampler
                 open-or-prepare-was-called?]} executor-data
@@ -720,16 +673,16 @@
                           (let [stream-id (.getSourceStreamId tuple)]
                             (condp = stream-id
                               Constants/CREDENTIALS_CHANGED_STREAM_ID 
-                                (let [task-data (get task-datas task-id)
-                                      bolt-obj (:object task-data)]
+                                (let [^Task task-data (get task-datas task-id)
+                                      bolt-obj (.getTaskObject task-data)]
                                   (when (instance? ICredentialsListener bolt-obj)
-                                    (.setCredentials bolt-obj (.getValue tuple 0))))
+                                    (.setCredentials ^ICredentialsListener bolt-obj (.getValue tuple 0))))
                               Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple)
-                              (let [task-data (get task-datas task-id)
-                                    ^IBolt bolt-obj (:object task-data)
-                                    user-context (:user-context task-data)
-                                    sampler? (sampler)
-                                    execute-sampler? (execute-sampler)
+                              (let [^Task task-data (get task-datas task-id)
+                                    ^IBolt bolt-obj (.getTaskObject task-data)
+                                    user-context (.getUserContext task-data)
+                                    sampler? (.call ^Callable sampler)
+                                    execute-sampler? (.call ^Callable execute-sampler)
                                     now (if (or sampler? execute-sampler?) (System/currentTimeMillis))
                                     receive-queue (:receive-queue executor-data)]
                                 (when sampler?
@@ -741,7 +694,7 @@
                                   (when (= true (storm-conf TOPOLOGY-DEBUG))
                                     (log-message "Execute done TUPLE " tuple " TASK: " task-id " DELTA: " delta))
 
-                                  (task/apply-hooks user-context .boltExecute (BoltExecuteInfo. tuple task-id delta))
+                                  (.applyOn (BoltExecuteInfo. tuple task-id delta) user-context)
                                   (when delta
                                     (.boltExecuteTuple executor-stats
                                                                (.getSourceComponent tuple)
@@ -755,13 +708,13 @@
 
                            (log-message "Preparing bolt " component-id ":" (keys task-datas))
                            (doseq [[task-id task-data] task-datas
-                                   :let [^IBolt bolt-obj (:object task-data)
-                                         tasks-fn (:tasks-fn task-data)
-                                         user-context (:user-context task-data)
+                                   :let [^IBolt bolt-obj (.getTaskObject task-data)
+                                         user-context (.getUserContext task-data)
+                                         transfer-fn (:transfer-fn executor-data)
                                          bolt-emit (fn [stream anchors values task]
                                                      (let [out-tasks (if task
-                                                                       (tasks-fn task stream values)
-                                                                       (tasks-fn stream values))]
+                                                                       (.getOutgoingTasks task-data task stream values)
+                                                                       (.getOutgoingTasks task-data stream values))]
                                                        (fast-list-iter [t out-tasks]
                                                                        (let [anchors-to-ids (HashMap.)]
                                                                          (fast-list-iter [^TupleImpl a anchors]
@@ -780,8 +733,8 @@
                                                        (if has-eventloggers?
                                                          (send-to-eventlogger executor-data task-data values component-id nil rand))
                                                        (or out-tasks [])))]]
-                             (.registerAll (:builtin-metrics task-data) storm-conf user-context)
-                             (when (instance? ICredentialsListener bolt-obj) (.setCredentials bolt-obj initial-credentials)) 
+                             (.registerAll (.getBuiltInMetrics task-data) storm-conf user-context)
+                             (when (instance? ICredentialsListener bolt-obj) (.setCredentials bolt-obj initial-credentials))
                              (if (= component-id Constants/SYSTEM_COMPONENT_ID)
                                (do
                                  (BuiltinMetricsUtil/registerQueueMetrics {"sendqueue" (:batch-transfer-queue executor-data)
@@ -808,14 +761,15 @@
                                             (let [^TupleImpl tuple tuple
                                                   ack-val (.getAckVal tuple)]
                                               (fast-map-iter [[root id] (.. tuple getMessageId getAnchorsToIds)]
-                                                             (task/send-unanchored task-data
-                                                                                   Acker/ACKER_ACK_STREAM_ID
-                                                                                   [root (bit-xor id ack-val)])))
+                                                             (send-unanchored task-data
+                                                                              Acker/ACKER_ACK_STREAM_ID
+                                                                              [root (bit-xor id ack-val)]
+                                                                              transfer-fn)))
                                             (let [delta (tuple-time-delta! tuple)
                                                   debug? (= true (storm-conf TOPOLOGY-DEBUG))]
                                               (when debug?
                                                 (log-message "BOLT ack TASK: " task-id " TIME: " delta " TUPLE: " tuple))
-                                              (task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta))
+                                              (.applyOn (BoltAckInfo. tuple task-id delta) user-context)
                                               (when delta
                                                 (.boltAckedTuple executor-stats
                                                                          (.getSourceComponent tuple)
@@ -823,14 +777,15 @@
                                                                          delta))))
                                           (^void fail [this ^Tuple tuple]
                                             (fast-list-iter [root (.. tuple getMessageId getAnchors)]
-                                                            (task/send-unanchored task-data
+                                                            (send-unanchored task-data
                                                                                   Acker/ACKER_FAIL_STREAM_ID
-                                                                                  [root]))
+                                                                                  [root]
+                                                                                  transfer-fn))
                                             (let [delta (tuple-time-delta! tuple)
                                                   debug? (= true (storm-conf TOPOLOGY-DEBUG))]
                                               (when debug?
                                                 (log-message "BOLT fail TASK: " task-id " TIME: " delta " TUPLE: " tuple))
-                                              (task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta))
+                                              (.applyOn (BoltFailInfo. tuple task-id delta) user-context)
                                               (when delta
                                                 (.boltFailedTuple executor-stats
                                                                           (.getSourceComponent tuple)
@@ -838,9 +793,10 @@
                                                                           delta))))
                                           (^void resetTimeout [this ^Tuple tuple]
                                             (fast-list-iter [root (.. tuple getMessageId getAnchors)]
-                                                            (task/send-unanchored task-data
+                                                            (send-unanchored task-data
                                                                                   Acker/ACKER_RESET_TIMEOUT_STREAM_ID
-                                                                                  [root])))
+                                                                                  [root]
+                                                                                  transfer-fn)))
                                           (reportError [this error]
                                             (report-error error))))))
                            (reset! open-or-prepare-was-called? true)

http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/src/clj/org/apache/storm/daemon/task.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/task.clj b/storm-core/src/clj/org/apache/storm/daemon/task.clj
deleted file mode 100644
index 2cfad7c..0000000
--- a/storm-core/src/clj/org/apache/storm/daemon/task.clj
+++ /dev/null
@@ -1,190 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.daemon.task
-  (:use [org.apache.storm.daemon common])
-  (:use [org.apache.storm config util log])
-  (:import [org.apache.storm.hooks ITaskHook]
-           [org.apache.storm.daemon.metrics BuiltinMetrics BuiltinMetricsUtil])
-  (:import [org.apache.storm.tuple Tuple TupleImpl])
-  (:import [org.apache.storm.grouping LoadMapping])
-  (:import [org.apache.storm.generated SpoutSpec Bolt StateSpoutSpec StormTopology])
-  (:import [org.apache.storm.hooks.info SpoutAckInfo SpoutFailInfo
-            EmitInfo BoltFailInfo BoltAckInfo])
-  (:import [org.apache.storm.task TopologyContext ShellBolt WorkerTopologyContext])
-  (:import [org.apache.storm.utils Utils ConfigUtils])
-  (:import [org.apache.storm.generated ShellComponent JavaObject])
-  (:import [org.apache.storm.spout ShellSpout])
-  (:import [java.util Collection List ArrayList])
-  (:import [org.apache.storm Thrift]
-           (org.apache.storm.daemon StormCommon)))
-
-(defn mk-topology-context-builder [worker executor-data topology]
-  (let [conf (:conf worker)]
-    #(TopologyContext.
-      topology
-      (:storm-conf worker)
-      (:task->component worker)
-      (:component->sorted-tasks worker)
-      (:component->stream->fields worker)
-      (:storm-id worker)
-      (ConfigUtils/supervisorStormResourcesPath
-        (ConfigUtils/supervisorStormDistRoot conf (:storm-id worker)))
-      (ConfigUtils/workerPidsRoot conf (:worker-id worker))
-      (int %)
-      (:port worker)
-      (:task-ids worker)
-      (:default-shared-resources worker)
-      (:user-shared-resources worker)
-      (:shared-executor-data executor-data)
-      (:interval->task->metric-registry executor-data)
-      (:open-or-prepare-was-called? executor-data))))
-
-(defn system-topology-context [worker executor-data tid]
-  ((mk-topology-context-builder
-    worker
-    executor-data
-    (:system-topology worker))
-   tid))
-
-(defn user-topology-context [worker executor-data tid]
-  ((mk-topology-context-builder
-    worker
-    executor-data
-    (:topology worker))
-   tid))
-
-(defn- get-task-object [^StormTopology topology component-id]
-  (let [spouts (.get_spouts topology)
-        bolts (.get_bolts topology)
-        state-spouts (.get_state_spouts topology)
-        obj (Utils/getSetComponentObject
-             (cond
-              (contains? spouts component-id) (.get_spout_object ^SpoutSpec (get spouts component-id))
-              (contains? bolts component-id) (.get_bolt_object ^Bolt (get bolts component-id))
-              (contains? state-spouts component-id) (.get_state_spout_object ^StateSpoutSpec (get state-spouts component-id))
-              true (throw (RuntimeException. (str "Could not find " component-id " in " topology)))))
-        obj (if (instance? ShellComponent obj)
-              (if (contains? spouts component-id)
-                (ShellSpout. obj)
-                (ShellBolt. obj))
-              obj )
-        obj (if (instance? JavaObject obj)
-              (Thrift/instantiateJavaObject obj)
-              obj )]
-    obj
-    ))
-
-(defn get-context-hooks [^TopologyContext context]
-  (.getHooks context))
-
-(defn hooks-empty? [^Collection hooks]
-  (.isEmpty hooks))
-
-(defmacro apply-hooks [topology-context method-sym info-form]
-  (let [hook-sym (with-meta (gensym "hook") {:tag 'org.apache.storm.hooks.ITaskHook})]
-    `(let [hooks# (get-context-hooks ~topology-context)]
-       (when-not (hooks-empty? hooks#)
-         (let [info# ~info-form]
-           (fast-list-iter [~hook-sym hooks#]
-             (~method-sym ~hook-sym info#)
-             ))))))
-
-
-;; TODO: this is all expensive... should be precomputed
-(defn send-unanchored
-  [task-data stream values]
-    (let [^TopologyContext topology-context (:system-context task-data)
-          tasks-fn (:tasks-fn task-data)
-          transfer-fn (-> task-data :executor-data :transfer-fn)
-          out-tuple (TupleImpl. topology-context
-                                 values
-                                 (.getThisTaskId topology-context)
-                                 stream)]
-      (fast-list-iter [t (tasks-fn stream values)]
-        (transfer-fn t out-tuple))))
-
-(defn mk-tasks-fn [task-data]
-  (let [task-id (:task-id task-data)
-        executor-data (:executor-data task-data)
-        ^LoadMapping load-mapping (:load-mapping (:worker executor-data))
-        component-id (:component-id executor-data)
-        ^WorkerTopologyContext worker-context (:worker-context executor-data)
-        storm-conf (:storm-conf executor-data)
-        emit-sampler (mk-stats-sampler storm-conf)
-        stream->component->grouper (:stream->component->grouper executor-data)
-        user-context (:user-context task-data)
-        executor-stats (:stats executor-data)
-        debug? (= true (storm-conf TOPOLOGY-DEBUG))]
-        
-    (fn ([^Integer out-task-id ^String stream ^List values]
-          (when debug?
-            (log-message "Emitting direct: " out-task-id "; " component-id " " stream " " values))
-          (let [target-component (.getComponentId worker-context out-task-id)
-                component->grouping (get stream->component->grouper stream)
-                grouping (get component->grouping target-component)
-                out-task-id (if grouping out-task-id)]
-            (when (and (not-nil? grouping) (not= :direct grouping))
-              (throw (IllegalArgumentException. "Cannot emitDirect to a task expecting a regular grouping")))                          
-            (apply-hooks user-context .emit (EmitInfo. values stream task-id [out-task-id]))
-            (when (emit-sampler)
-              (.emittedTuple executor-stats stream)
-              (if out-task-id
-                (.transferredTuples executor-stats stream, 1)))
-            (if out-task-id [out-task-id])
-            ))
-        ([^String stream ^List values]
-           (when debug?
-             (log-message "Emitting: " component-id " " stream " " values))
-           (let [out-tasks (ArrayList.)]
-             (if (not (.containsKey stream->component->grouper stream))
-               (throw (IllegalArgumentException. (str "Unknown stream ID: " stream))))
-             (fast-map-iter [[out-component grouper] (get stream->component->grouper stream)]
-               (when (= :direct grouper)
-                  ;;  TODO: this is wrong, need to check how the stream was declared
-                  (throw (IllegalArgumentException. "Cannot do regular emit to direct stream")))
-               (let [comp-tasks (grouper task-id values load-mapping)]
-                 (if (or (sequential? comp-tasks) (instance? Collection comp-tasks))
-                   (.addAll out-tasks comp-tasks)
-                   (.add out-tasks comp-tasks)
-                   )))
-             (apply-hooks user-context .emit (EmitInfo. values stream task-id out-tasks))
-             (when (emit-sampler)
-               (.emittedTuple executor-stats stream)
-               (.transferredTuples executor-stats stream (count out-tasks)))
-             out-tasks)))
-    ))
-
-(defn mk-task-data [executor-data task-id]
-  (recursive-map
-    :executor-data executor-data
-    :task-id task-id
-    :system-context (system-topology-context (:worker executor-data) executor-data task-id)
-    :user-context (user-topology-context (:worker executor-data) executor-data task-id)
-    :builtin-metrics (BuiltinMetricsUtil/mkData (.getName (:type executor-data)) (:stats executor-data))
-    :tasks-fn (mk-tasks-fn <>)
-    :object (get-task-object (.getRawTopology ^TopologyContext (:system-context <>)) (:component-id executor-data))))
-
-
-(defn mk-task [executor-data task-id]
-  (let [task-data (mk-task-data executor-data task-id)
-        storm-conf (:storm-conf executor-data)]
-    (doseq [klass (storm-conf TOPOLOGY-AUTO-TASK-HOOKS)]
-      (.addTaskHook ^TopologyContext (:user-context task-data) (-> klass Class/forName .newInstance)))
-    ;; when this is called, the threads for the executor haven't been started yet,
-    ;; so we won't be risking trampling on the single-threaded claim strategy disruptor queue
-    (send-unanchored task-data StormCommon/SYSTEM_STREAM_ID ["startup"])
-    task-data
-    ))

http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/src/jvm/org/apache/storm/daemon/GrouperFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/GrouperFactory.java b/storm-core/src/jvm/org/apache/storm/daemon/GrouperFactory.java
new file mode 100644
index 0000000..d06682f
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/GrouperFactory.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon;
+
+import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
+
+import org.apache.storm.Config;
+import org.apache.storm.Thrift;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.grouping.CustomStreamGrouping;
+import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
+import org.apache.storm.grouping.LoadAwareShuffleGrouping;
+import org.apache.storm.grouping.LoadMapping;
+import org.apache.storm.grouping.ShuffleGrouping;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.TupleUtils;
+import org.apache.storm.utils.Utils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+public class GrouperFactory {
+
+    public static LoadAwareCustomStreamGrouping mkGrouper(WorkerTopologyContext context, String componentId, String streamId, Fields outFields,
+                                    Grouping thriftGrouping,
+                                    List<Integer> unsortedTargetTasks,
+                                    Map topoConf) {
+        List<Integer> targetTasks = Ordering.natural().sortedCopy(unsortedTargetTasks);
+        final boolean isNotLoadAware = (null != topoConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING) && (boolean) topoConf
+            .get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING));
+        CustomStreamGrouping result = null;
+        switch (Thrift.groupingType(thriftGrouping)) {
+            case FIELDS:
+                if (Thrift.isGlobalGrouping(thriftGrouping)) {
+                    result = new GlobalGrouper();
+                } else {
+                    result = new FieldsGrouper(outFields, thriftGrouping);
+                }
+                break;
+            case SHUFFLE:
+                if (isNotLoadAware) {
+                    result = new ShuffleGrouping();
+                } else {
+                    result = new LoadAwareShuffleGrouping();
+                }
+                break;
+            case ALL:
+                result = new AllGrouper();
+                break;
+            case LOCAL_OR_SHUFFLE:
+                // Prefer local tasks as target tasks if possible
+                Set<Integer> sameTasks = Sets.intersection(Sets.newHashSet(targetTasks), Sets.newHashSet(context.getThisWorkerTasks()));
+                targetTasks = (sameTasks.isEmpty()) ? targetTasks : new ArrayList<>(sameTasks);
+                if (isNotLoadAware) {
+                    result = new ShuffleGrouping();
+                } else {
+                    result = new LoadAwareShuffleGrouping();
+                }
+                break;
+            case NONE:
+                result = new NoneGrouper();
+                break;
+            case CUSTOM_OBJECT:
+                result = (CustomStreamGrouping) Thrift.instantiateJavaObject(thriftGrouping.get_custom_object());
+                break;
+            case CUSTOM_SERIALIZED:
+                result = Utils.javaDeserialize(thriftGrouping.get_custom_serialized(), CustomStreamGrouping.class);
+                break;
+            case DIRECT:
+                result = DIRECT;
+                break;
+            default:
+                result = null;
+                break;
+        }
+
+        if (null != result) {
+            result.prepare(context, new GlobalStreamId(componentId, streamId), targetTasks);
+        }
+
+        if (result instanceof LoadAwareCustomStreamGrouping) {
+            return (LoadAwareCustomStreamGrouping) result;
+        } else {
+            return new BasicLoadAwareCustomStreamGrouping (result);
+        }
+    }
+
+
+    /**
+     * A bridge between CustomStreamGrouping and LoadAwareCustomStreamGrouping
+     */
+    public static class BasicLoadAwareCustomStreamGrouping implements LoadAwareCustomStreamGrouping {
+
+        private final CustomStreamGrouping customStreamGrouping;
+
+        public BasicLoadAwareCustomStreamGrouping(CustomStreamGrouping customStreamGrouping) {
+            this.customStreamGrouping = customStreamGrouping;
+        }
+
+        @Override
+        public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
+            return customStreamGrouping.chooseTasks(taskId, values);
+        }
+
+        @Override
+        public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+            customStreamGrouping.prepare(context, stream, targetTasks);
+        }
+
+        @Override
+        public List<Integer> chooseTasks(int taskId, List<Object> values) {
+            return customStreamGrouping.chooseTasks(taskId, values);
+        }
+    }
+
+    public static class FieldsGrouper implements CustomStreamGrouping {
+
+        private Fields outFields;
+        private List<Integer> targetTasks;
+        private Fields groupFields;
+        private int numTasks;
+
+        public FieldsGrouper(Fields outFields, Grouping thriftGrouping) {
+            this.outFields = outFields;
+            this.groupFields = new Fields(Thrift.fieldGrouping(thriftGrouping));
+
+        }
+
+        @Override
+        public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+            this.targetTasks = targetTasks;
+            this.numTasks = targetTasks.size();
+        }
+
+        @Override
+        public List<Integer> chooseTasks(int taskId, List<Object> values) {
+            int targetTaskIndex = Math.abs(TupleUtils.listHashCode(outFields.select(groupFields, values))) % numTasks;
+            return Collections.singletonList(targetTasks.get(targetTaskIndex));
+        }
+    }
+
+    public static class GlobalGrouper implements CustomStreamGrouping {
+
+        private List<Integer> targetTasks;
+
+        public GlobalGrouper() {
+        }
+
+        @Override
+        public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+            this.targetTasks = targetTasks;
+        }
+
+        @Override
+        public List<Integer> chooseTasks(int taskId, List<Object> values) {
+            if (targetTasks.isEmpty()) {
+                return null;
+            }
+            // It's possible for target to have multiple tasks if it reads multiple sources
+            return Collections.singletonList(targetTasks.get(0));
+        }
+    }
+
+    public static class NoneGrouper implements CustomStreamGrouping {
+
+        private List<Integer> targetTasks;
+        private int numTasks;
+        private final Random random;
+
+        public NoneGrouper() {
+            random = new Random();
+        }
+
+        @Override
+        public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+            this.targetTasks = targetTasks;
+            this.numTasks = targetTasks.size();
+        }
+
+        @Override
+        public List<Integer> chooseTasks(int taskId, List<Object> values) {
+            int index = random.nextInt(numTasks);
+            return Collections.singletonList(targetTasks.get(index));
+        }
+    }
+
+    public static class AllGrouper implements CustomStreamGrouping {
+
+        private List<Integer> targetTasks;
+
+        @Override
+        public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+            this.targetTasks = targetTasks;
+        }
+
+        @Override
+        public List<Integer> chooseTasks(int taskId, List<Object> values) {
+            return targetTasks;
+        }
+    }
+
+    // A no-op grouper
+    public static final LoadAwareCustomStreamGrouping DIRECT = new LoadAwareCustomStreamGrouping() {
+        @Override
+        public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
+            return null;
+        }
+
+        @Override
+        public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+
+        }
+
+        @Override
+        public List<Integer> chooseTasks(int taskId, List<Object> values) {
+            return null;
+        }
+
+    };
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/src/jvm/org/apache/storm/daemon/Task.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/Task.java b/storm-core/src/jvm/org/apache/storm/daemon/Task.java
new file mode 100644
index 0000000..60b570a
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/Task.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon;
+
+import org.apache.storm.Config;
+import org.apache.storm.Thrift;
+import org.apache.storm.daemon.metrics.BuiltinMetrics;
+import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.ComponentObject;
+import org.apache.storm.generated.JavaObject;
+import org.apache.storm.generated.ShellComponent;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StateSpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
+import org.apache.storm.grouping.LoadMapping;
+import org.apache.storm.hooks.ITaskHook;
+import org.apache.storm.hooks.info.EmitInfo;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.spout.ShellSpout;
+import org.apache.storm.stats.CommonStats;
+import org.apache.storm.task.ShellBolt;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+public class Task {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Task.class);
+
+    private Map executorData;
+    private Map workerData;
+    private TopologyContext systemTopologyContext;
+    private TopologyContext userTopologyContext;
+    private WorkerTopologyContext workerTopologyContext;
+    private LoadMapping loadMapping;
+    private Integer taskId;
+    private String componentId;
+    private Object taskObject;  // Spout/Bolt object
+    private Map stormConf;
+    private Callable<Boolean> emitSampler;
+    private CommonStats executorStats;
+    private Map<String, Map<String, LoadAwareCustomStreamGrouping>> streamComponentToGrouper;
+    private BuiltinMetrics builtInMetrics;
+    private boolean debug;
+
+    public Task(Map executorData, Integer taskId) throws IOException {
+        this.taskId = taskId;
+        this.executorData = executorData;
+        this.workerData = (Map) executorData.get("worker");
+        this.stormConf = (Map) executorData.get("storm-conf");
+        this.componentId = (String) executorData.get("component-id");
+        this.streamComponentToGrouper = (Map<String, Map<String, LoadAwareCustomStreamGrouping>>) executorData.get("stream->component->grouper");
+        this.executorStats = (CommonStats) executorData.get("stats");
+        this.builtInMetrics = BuiltinMetricsUtil.mkData((String) executorData.get("type"), this.executorStats);
+        this.workerTopologyContext = (WorkerTopologyContext) executorData.get("worker-context");
+        this.emitSampler = ConfigUtils.mkStatsSampler(stormConf);
+        this.loadMapping = (LoadMapping) workerData.get("load-mapping");
+        this.systemTopologyContext = mkTopologyContext((StormTopology) workerData.get("system-topology"));
+        this.userTopologyContext = mkTopologyContext((StormTopology) workerData.get("topology"));
+        this.taskObject = mkTaskObject();
+        this.debug = stormConf.containsKey(Config.TOPOLOGY_DEBUG) && (Boolean) stormConf.get(Config.TOPOLOGY_DEBUG);
+        this.addTaskHooks();
+    }
+
+    public List<Integer> getOutgoingTasks(Integer outTaskId, String stream, List<Object> values) {
+        if (debug) {
+            LOG.info("Emitting direct: {}; {} {} {} ", outTaskId, componentId, stream, values);
+        }
+        String targetComponent = workerTopologyContext.getComponentId(outTaskId);
+        Map<String, LoadAwareCustomStreamGrouping> componentGrouping = streamComponentToGrouper.get(stream);
+        LoadAwareCustomStreamGrouping grouping = componentGrouping.get(targetComponent);
+        if (null == grouping) {
+            outTaskId = null;
+        }
+        if (grouping != null && grouping != GrouperFactory.DIRECT) {
+            throw new IllegalArgumentException("Cannot emitDirect to a task expecting a regular grouping");
+        }
+        new EmitInfo(values, stream, taskId, Collections.singletonList(outTaskId)).applyOn(userTopologyContext);
+        try {
+            if (emitSampler.call()) {
+                executorStats.emittedTuple(stream);
+                if (null != outTaskId) {
+                    executorStats.transferredTuples(stream, 1);
+                }
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        if (null != outTaskId) {
+            return Collections.singletonList(outTaskId);
+        }
+        return null;
+    }
+
+    public List<Integer> getOutgoingTasks(String stream, List<Object> values) {
+        if (debug) {
+            LOG.info("Emitting: {} {} {}", componentId, stream, values);
+        }
+        List<Integer> outTasks = new ArrayList<>();
+        if (!streamComponentToGrouper.containsKey(stream)) {
+            throw new IllegalArgumentException("Unknown stream ID: " + stream);
+        }
+        if (null != streamComponentToGrouper.get(stream)) {
+            // null value for __system
+            for (LoadAwareCustomStreamGrouping grouper : streamComponentToGrouper.get(stream).values()) {
+                if (grouper == GrouperFactory.DIRECT) {
+                    throw new IllegalArgumentException("Cannot do regular emit to direct stream");
+                }
+                List<Integer> compTasks = grouper.chooseTasks(taskId, values, loadMapping);
+                outTasks.addAll(compTasks);
+            }
+        }
+        new EmitInfo(values, stream, taskId, outTasks).applyOn(userTopologyContext);
+        try {
+            if (emitSampler.call()) {
+                executorStats.emittedTuple(stream);
+                executorStats.transferredTuples(stream, outTasks.size());
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        return outTasks;
+    }
+
+    public Tuple getTuple(String stream, List values) {
+        return new TupleImpl(systemTopologyContext, values, systemTopologyContext.getThisTaskId(), stream);
+    }
+
+    public Integer getTaskId() {
+        return taskId;
+    }
+
+    public String getComponentId() {
+        return componentId;
+    }
+
+    public TopologyContext getUserContext() throws IOException {
+        return userTopologyContext;
+    }
+
+    public Object getTaskObject() {
+        return taskObject;
+    }
+
+    public BuiltinMetrics getBuiltInMetrics() {
+        return builtInMetrics;
+    }
+
+    private TopologyContext mkTopologyContext(StormTopology topology) throws IOException {
+        Map conf = (Map) workerData.get("conf");
+        return new TopologyContext(
+            topology,
+            (Map) workerData.get("storm-conf"),
+            (Map<Integer, String>) workerData.get("task->component"),
+            (Map<String, List<Integer>>) workerData.get("component->sorted-tasks"),
+            (Map<String, Map<String, Fields>>) workerData.get("component->stream->fields"),
+            (String) workerData.get("storm-id"),
+            ConfigUtils.supervisorStormResourcesPath(ConfigUtils.supervisorStormDistRoot(conf, (String) workerData.get("storm-id"))),
+            ConfigUtils.workerPidsRoot(conf, (String) workerData.get("worker-id")),
+            taskId,
+            (Integer) workerData.get("port"),
+            (List<Integer>) workerData.get("task-ids"),
+            (Map<String, Object>) workerData.get("default-shared-resources"),
+            (Map<String, Object>) workerData.get("user-shared-resources"),
+            (Map<String, Object>) executorData.get("shared-executor-data"),
+            (Map<Integer, Map<Integer, Map<String, IMetric>>>) executorData.get("interval->task->metric-registry"),
+            (clojure.lang.Atom) executorData.get("open-or-prepare-was-called?")
+        );
+    }
+
+    private Object mkTaskObject() {
+        StormTopology topology = systemTopologyContext.getRawTopology();
+        Map<String, SpoutSpec> spouts = topology.get_spouts();
+        Map<String, Bolt> bolts = topology.get_bolts();
+        Map<String, StateSpoutSpec> stateSpouts = topology.get_state_spouts();
+        Object result = null;
+        ComponentObject componentObject = null;
+        if (spouts.containsKey(componentId)) {
+            componentObject = spouts.get(componentId).get_spout_object();
+        } else if (bolts.containsKey(componentId)) {
+            componentObject = bolts.get(componentId).get_bolt_object();
+        } else if (stateSpouts.containsKey(componentId)) {
+            componentObject = stateSpouts.get(componentId).get_state_spout_object();
+        } else {
+            throw new RuntimeException("Could not find " + componentId + " in " + topology);
+        }
+        result = Utils.getSetComponentObject(componentObject);
+
+        if (result instanceof ShellComponent) {
+            if (spouts.containsKey(componentId)) {
+                result = new ShellSpout((ShellComponent) result);
+            } else {
+                result = new ShellBolt((ShellComponent) result);
+            }
+        }
+
+        if (result instanceof JavaObject) {
+            result = Thrift.instantiateJavaObject((JavaObject) result);
+        }
+
+        return result;
+    }
+
+    private void addTaskHooks() {
+        List<String> hooksClassList = (List<String>) stormConf.get(Config.TOPOLOGY_AUTO_TASK_HOOKS);
+        if (null != hooksClassList) {
+            for (String hookClass : hooksClassList) {
+                try {
+                    userTopologyContext.addTaskHook(((ITaskHook) Class.forName(hookClass).newInstance()));
+                } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+                    throw new RuntimeException("Failed to add hook: " + hookClass, e);
+                }
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
index 84c75d7..2827420 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
@@ -17,19 +17,21 @@
  */
 package org.apache.storm.daemon.metrics;
 
-import java.util.HashMap;
-import java.util.Map;
 import org.apache.storm.Config;
 import org.apache.storm.metric.api.IMetric;
 import org.apache.storm.metric.api.IStatefulObject;
 import org.apache.storm.metric.api.StateMetric;
 import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.stats.CommonStats;
 import org.apache.storm.stats.SpoutExecutorStats;
 import org.apache.storm.stats.StatsUtil;
 import org.apache.storm.task.TopologyContext;
 
+import java.util.HashMap;
+import java.util.Map;
+
 public class BuiltinMetricsUtil {
-    public static BuiltinMetrics mkData(String type, Object stats) {
+    public static BuiltinMetrics mkData(String type, CommonStats stats) {
         if (StatsUtil.SPOUT.equals(type)) {
             return new BuiltinSpoutMetrics((SpoutExecutorStats) stats);
         } else if (StatsUtil.BOLT.equals(type)) {

http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/src/jvm/org/apache/storm/hooks/info/BoltAckInfo.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/hooks/info/BoltAckInfo.java b/storm-core/src/jvm/org/apache/storm/hooks/info/BoltAckInfo.java
index 905f747..e6f4b11 100644
--- a/storm-core/src/jvm/org/apache/storm/hooks/info/BoltAckInfo.java
+++ b/storm-core/src/jvm/org/apache/storm/hooks/info/BoltAckInfo.java
@@ -17,6 +17,8 @@
  */
 package org.apache.storm.hooks.info;
 
+import org.apache.storm.hooks.ITaskHook;
+import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
 
 public class BoltAckInfo {
@@ -29,4 +31,10 @@ public class BoltAckInfo {
         this.ackingTaskId = ackingTaskId;
         this.processLatencyMs = processLatencyMs;
     }
+
+    public void applyOn(TopologyContext topologyContext) {
+        for (ITaskHook hook : topologyContext.getHooks()) {
+            hook.boltAck(this);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/src/jvm/org/apache/storm/hooks/info/BoltExecuteInfo.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/hooks/info/BoltExecuteInfo.java b/storm-core/src/jvm/org/apache/storm/hooks/info/BoltExecuteInfo.java
index d92a1f8..73a7f33 100644
--- a/storm-core/src/jvm/org/apache/storm/hooks/info/BoltExecuteInfo.java
+++ b/storm-core/src/jvm/org/apache/storm/hooks/info/BoltExecuteInfo.java
@@ -17,6 +17,8 @@
  */
 package org.apache.storm.hooks.info;
 
+import org.apache.storm.hooks.ITaskHook;
+import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
 
 public class BoltExecuteInfo {
@@ -29,4 +31,10 @@ public class BoltExecuteInfo {
         this.executingTaskId = executingTaskId;
         this.executeLatencyMs = executeLatencyMs;
     }
+
+    public void applyOn(TopologyContext topologyContext) {
+        for (ITaskHook hook : topologyContext.getHooks()) {
+            hook.boltExecute(this);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/src/jvm/org/apache/storm/hooks/info/BoltFailInfo.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/hooks/info/BoltFailInfo.java b/storm-core/src/jvm/org/apache/storm/hooks/info/BoltFailInfo.java
index 38e31b7..4e1e32d 100644
--- a/storm-core/src/jvm/org/apache/storm/hooks/info/BoltFailInfo.java
+++ b/storm-core/src/jvm/org/apache/storm/hooks/info/BoltFailInfo.java
@@ -17,6 +17,8 @@
  */
 package org.apache.storm.hooks.info;
 
+import org.apache.storm.hooks.ITaskHook;
+import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
 
 public class BoltFailInfo {
@@ -29,4 +31,10 @@ public class BoltFailInfo {
         this.failingTaskId = failingTaskId;
         this.failLatencyMs = failLatencyMs;
     }
+
+    public void applyOn(TopologyContext topologyContext) {
+        for (ITaskHook hook : topologyContext.getHooks()) {
+            hook.boltFail(this);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/src/jvm/org/apache/storm/hooks/info/EmitInfo.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/hooks/info/EmitInfo.java b/storm-core/src/jvm/org/apache/storm/hooks/info/EmitInfo.java
index 3e3ed8c..52965a1 100644
--- a/storm-core/src/jvm/org/apache/storm/hooks/info/EmitInfo.java
+++ b/storm-core/src/jvm/org/apache/storm/hooks/info/EmitInfo.java
@@ -17,6 +17,9 @@
  */
 package org.apache.storm.hooks.info;
 
+import org.apache.storm.hooks.ITaskHook;
+import org.apache.storm.task.TopologyContext;
+
 import java.util.Collection;
 import java.util.List;
 
@@ -32,4 +35,10 @@ public class EmitInfo {
         this.taskId = taskId;
         this.outTasks = outTasks;
     }
+
+    public void applyOn(TopologyContext topologyContext) {
+        for (ITaskHook hook : topologyContext.getHooks()) {
+            hook.emit(this);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/src/jvm/org/apache/storm/hooks/info/SpoutAckInfo.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/hooks/info/SpoutAckInfo.java b/storm-core/src/jvm/org/apache/storm/hooks/info/SpoutAckInfo.java
index 1b1bc76..4949f0f 100644
--- a/storm-core/src/jvm/org/apache/storm/hooks/info/SpoutAckInfo.java
+++ b/storm-core/src/jvm/org/apache/storm/hooks/info/SpoutAckInfo.java
@@ -17,6 +17,9 @@
  */
 package org.apache.storm.hooks.info;
 
+import org.apache.storm.hooks.ITaskHook;
+import org.apache.storm.task.TopologyContext;
+
 public class SpoutAckInfo {
     public Object messageId;
     public int spoutTaskId;
@@ -27,4 +30,10 @@ public class SpoutAckInfo {
         this.spoutTaskId = spoutTaskId;
         this.completeLatencyMs = completeLatencyMs;
     }
+
+    public void applyOn(TopologyContext topologyContext) {
+        for (ITaskHook hook : topologyContext.getHooks()) {
+            hook.spoutAck(this);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/src/jvm/org/apache/storm/hooks/info/SpoutFailInfo.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/hooks/info/SpoutFailInfo.java b/storm-core/src/jvm/org/apache/storm/hooks/info/SpoutFailInfo.java
index 34b38b7..5b40005 100644
--- a/storm-core/src/jvm/org/apache/storm/hooks/info/SpoutFailInfo.java
+++ b/storm-core/src/jvm/org/apache/storm/hooks/info/SpoutFailInfo.java
@@ -17,6 +17,9 @@
  */
 package org.apache.storm.hooks.info;
 
+import org.apache.storm.hooks.ITaskHook;
+import org.apache.storm.task.TopologyContext;
+
 public class SpoutFailInfo {
     public Object messageId;
     public int spoutTaskId;
@@ -27,4 +30,10 @@ public class SpoutFailInfo {
         this.spoutTaskId = spoutTaskId;
         this.failLatencyMs = failLatencyMs;
     }
+
+    public void applyOn(TopologyContext topologyContext) {
+        for (ITaskHook hook : topologyContext.getHooks()) {
+            hook.spoutFail(this);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
index d7b7dbf..30d314f 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
@@ -18,10 +18,10 @@
 
 package org.apache.storm.utils;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.storm.Config;
-import org.apache.storm.validation.ConfigValidation;
 import org.apache.storm.generated.StormTopology;
-import org.apache.commons.io.FileUtils;
+import org.apache.storm.validation.ConfigValidation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,14 +35,16 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.Reader;
 import java.lang.reflect.Field;
+import java.net.URLEncoder;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
-import java.util.HashSet;
-import java.util.Collections;
-import java.net.URLEncoder;
+import java.util.concurrent.Callable;
 
 public class ConfigUtils {
     private final static Logger LOG = LoggerFactory.getLogger(ConfigUtils.class);
@@ -135,7 +137,28 @@ public class ConfigUtils {
         throw new IllegalArgumentException("Illegal topology.stats.sample.rate in conf: " + rate);
     }
 
-    // public static mkStatsSampler // depends on Utils.evenSampler() TODO, this is sth we need to do after util
+    public static Callable<Boolean> evenSampler(final int samplingFreq) {
+        final Random random = new Random();
+
+        return new Callable<Boolean>() {
+            private int curr = -1;
+            private int target = random.nextInt(samplingFreq);
+
+            @Override
+            public Boolean call() throws Exception {
+                curr++;
+                if (curr >= samplingFreq) {
+                    curr = 0;
+                    target = random.nextInt(samplingFreq);
+                }
+                return (curr == target);
+            }
+        };
+    }
+
+    public static Callable<Boolean> mkStatsSampler(Map conf) {
+        return evenSampler(samplingRate(conf));
+    }
 
     // we use this "wired" wrapper pattern temporarily for mocking in clojure test
     public static Map readStormConfig() {

http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/test/clj/org/apache/storm/grouping_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/grouping_test.clj b/storm-core/test/clj/org/apache/storm/grouping_test.clj
index 61caf68..487d80f 100644
--- a/storm-core/test/clj/org/apache/storm/grouping_test.clj
+++ b/storm-core/test/clj/org/apache/storm/grouping_test.clj
@@ -16,21 +16,24 @@
 (ns org.apache.storm.grouping-test
   (:use [clojure test])
   (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter TestWordBytesCounter NGrouping]
-           [org.apache.storm.generated JavaObject JavaObjectArg])
+           [org.apache.storm.generated JavaObject JavaObjectArg Grouping NullStruct])
   (:import [org.apache.storm.grouping LoadMapping])
   (:use [org.apache.storm testing log config])
   (:use [org.apache.storm.internal clojure])
   (:use [org.apache.storm.daemon common executor])
   (:import [org.apache.storm Thrift])
-  (:import [org.apache.storm.utils Utils]))
+  (:import [org.apache.storm.utils Utils]
+           (org.apache.storm.daemon GrouperFactory)))
+
+(def shuffle-grouping (Grouping/shuffle (NullStruct. )))
 
 (deftest test-shuffle
- (let [shuffle-fn (mk-shuffle-grouper [(int 1) (int 2)] {TOPOLOGY-DISABLE-LOADAWARE-MESSAGING true} nil "comp" "stream")
+ (let [shuffler (GrouperFactory/mkGrouper nil "comp" "stream" nil shuffle-grouping [(int 1) (int 2)] {TOPOLOGY-DISABLE-LOADAWARE-MESSAGING true})
        num-messages 100000
        min-prcnt (int (* num-messages 0.49))
        max-prcnt (int (* num-messages 0.51))
        data [1 2]
-       freq (frequencies (for [x (range 0 num-messages)] (shuffle-fn (int 1) data nil)))
+       freq (frequencies (for [x (range 0 num-messages)] (.chooseTasks shuffler (int 1) data nil)))
        load1 (.get freq [(int 1)])
        load2 (.get freq [(int 2)])]
     (log-message "FREQ:" freq)
@@ -40,14 +43,14 @@
     (is (<= load2 max-prcnt))))
 
 (deftest test-shuffle-load-even
- (let [shuffle-fn (mk-shuffle-grouper [(int 1) (int 2)] {} nil "comp" "stream")
+ (let [shuffler (GrouperFactory/mkGrouper nil "comp" "stream" nil shuffle-grouping [(int 1) (int 2)] {})
        num-messages 100000
        min-prcnt (int (* num-messages 0.49))
        max-prcnt (int (* num-messages 0.51))
        load (LoadMapping.)
        _ (.setLocal load {(int 1) 0.0 (int 2) 0.0})
        data [1 2]
-       freq (frequencies (for [x (range 0 num-messages)] (shuffle-fn (int 1) data load)))
+       freq (frequencies (for [x (range 0 num-messages)] (.chooseTasks shuffler (int 1) data load)))
        load1 (.get freq [(int 1)])
        load2 (.get freq [(int 2)])]
     (log-message "FREQ:" freq)
@@ -57,7 +60,7 @@
     (is (<= load2 max-prcnt))))
 
 (deftest test-shuffle-load-uneven
- (let [shuffle-fn (mk-shuffle-grouper [(int 1) (int 2)] {} nil "comp" "stream")
+ (let [shuffler (GrouperFactory/mkGrouper nil "comp" "stream" nil shuffle-grouping [(int 1) (int 2)] {})
        num-messages 100000
        min1-prcnt (int (* num-messages 0.32))
        max1-prcnt (int (* num-messages 0.34))
@@ -66,7 +69,7 @@
        load (LoadMapping.)
        _ (.setLocal load {(int 1) 0.5 (int 2) 0.0})
        data [1 2]
-       freq (frequencies (for [x (range 0 num-messages)] (shuffle-fn (int 1) data load)))
+       freq (frequencies (for [x (range 0 num-messages)] (.chooseTasks shuffler (int 1) data load)))
        load1 (.get freq [(int 1)])
        load2 (.get freq [(int 2)])]
     (log-message "FREQ:" freq)


[2/3] storm git commit: Merge branch 'task' of https://github.com/abhishekagarwal87/storm into STORM-1271

Posted by bo...@apache.org.
Merge branch 'task' of https://github.com/abhishekagarwal87/storm into STORM-1271

STORM-1271: Port backtype.storm.daemon.task to java


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/31d558ca
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/31d558ca
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/31d558ca

Branch: refs/heads/master
Commit: 31d558cad5c8ee99f0e3540203cfa7e04341d6f8
Parents: 223b615 7d63cb3
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Mar 31 14:27:36 2016 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Mar 31 14:27:36 2016 -0500

----------------------------------------------------------------------
 storm-core/src/clj/org/apache/storm/config.clj  |  27 --
 .../clj/org/apache/storm/daemon/executor.clj    | 222 +++++++----------
 .../src/clj/org/apache/storm/daemon/task.clj    | 190 --------------
 .../org/apache/storm/daemon/GrouperFactory.java | 243 ++++++++++++++++++
 .../src/jvm/org/apache/storm/daemon/Task.java   | 247 +++++++++++++++++++
 .../daemon/metrics/BuiltinMetricsUtil.java      |   8 +-
 .../apache/storm/hooks/info/BoltAckInfo.java    |   8 +
 .../storm/hooks/info/BoltExecuteInfo.java       |   8 +
 .../apache/storm/hooks/info/BoltFailInfo.java   |   8 +
 .../org/apache/storm/hooks/info/EmitInfo.java   |   9 +
 .../apache/storm/hooks/info/SpoutAckInfo.java   |   9 +
 .../apache/storm/hooks/info/SpoutFailInfo.java  |   9 +
 .../jvm/org/apache/storm/utils/ConfigUtils.java |  35 ++-
 .../test/clj/org/apache/storm/grouping_test.clj |  19 +-
 14 files changed, 675 insertions(+), 367 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/31d558ca/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
index d5f80db,30d314f..0f53343
--- a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
@@@ -135,9 -137,30 +137,30 @@@ public class ConfigUtils 
          throw new IllegalArgumentException("Illegal topology.stats.sample.rate in conf: " + rate);
      }
  
-     // public static mkStatsSampler // depends on Utils.evenSampler() TODO, this is sth we need to do after util
+     public static Callable<Boolean> evenSampler(final int samplingFreq) {
+         final Random random = new Random();
+ 
+         return new Callable<Boolean>() {
+             private int curr = -1;
+             private int target = random.nextInt(samplingFreq);
+ 
+             @Override
+             public Boolean call() throws Exception {
+                 curr++;
+                 if (curr >= samplingFreq) {
+                     curr = 0;
+                     target = random.nextInt(samplingFreq);
+                 }
+                 return (curr == target);
+             }
+         };
+     }
+ 
+     public static Callable<Boolean> mkStatsSampler(Map conf) {
+         return evenSampler(samplingRate(conf));
+     }
  
 -    // we use this "wired" wrapper pattern temporarily for mocking in clojure test
 +    // we use this "weird" wrapper pattern temporarily for mocking in clojure test
      public static Map readStormConfig() {
          return _instance.readStormConfigImpl();
      }


[3/3] storm git commit: Added STORM-1271 to Changelog

Posted by bo...@apache.org.
Added STORM-1271 to Changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/08449278
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/08449278
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/08449278

Branch: refs/heads/master
Commit: 084492782356220404b560aae2567cd4ee7f8052
Parents: 31d558c
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Mar 31 14:54:22 2016 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Mar 31 14:54:22 2016 -0500

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/08449278/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index fef71c5..06ab074 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 2.0.0
+ * STORM-1271: Port backtype.storm.daemon.task to java
  * STORM-822: Kafka Spout New Consumer API
  * STORM-1663: Stats couldn't handle null worker HB.
  * STORM-1665: Worker cannot instantiate kryo