You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/11/12 20:19:38 UTC

[1/3] storm git commit: STORM-1145: Have IConnection push tuples instead of pull them. Move deserialization to the IConnectionCallback so only Tuple batches are flowing through the system, not unserialized byte arrays.

Repository: storm
Updated Branches:
  refs/heads/master f4bd90a34 -> 1354c4049


STORM-1145: Have IConnection push tuples instead of pull them.
Move deserialization to the IConnectionCallback so only Tuple batches are flowing through the system, not unserialized byte arrays.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1bdfd7e1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1bdfd7e1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1bdfd7e1

Branch: refs/heads/master
Commit: 1bdfd7e1f04194f7aa88ba2537f41a08409ce344
Parents: f4bd90a
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Wed Oct 28 16:18:08 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Nov 12 11:26:33 2015 -0600

----------------------------------------------------------------------
 .../src/clj/backtype/storm/daemon/executor.clj  |  60 +++----
 .../src/clj/backtype/storm/daemon/worker.clj    |  59 ++++---
 .../src/clj/backtype/storm/messaging/loader.clj |  76 ++-------
 .../src/clj/backtype/storm/messaging/local.clj  |  72 +-------
 storm-core/src/jvm/backtype/storm/Config.java   |   7 -
 .../storm/messaging/AddressedTuple.java         |  46 ++++++
 .../DeserializingConnectionCallback.java        |  60 +++++++
 .../backtype/storm/messaging/IConnection.java   |  10 +-
 .../storm/messaging/IConnectionCallback.java    |  31 ++++
 .../backtype/storm/messaging/local/Context.java | 164 +++++++++++++++++++
 .../backtype/storm/messaging/netty/Client.java  |   3 +-
 .../backtype/storm/messaging/netty/Server.java  | 121 ++------------
 .../backtype/storm/tuple/AddressedTuple.java    |  48 ++++++
 .../storm/messaging/netty_unit_test.clj         | 122 ++++++++------
 .../test/clj/backtype/storm/messaging_test.clj  |  25 ---
 15 files changed, 506 insertions(+), 398 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1bdfd7e1/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj
index 0390987..9687cdd 100644
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -21,7 +21,7 @@
   (:import [java.util List Random HashMap ArrayList LinkedList Map])
   (:import [backtype.storm ICredentialsListener])
   (:import [backtype.storm.hooks ITaskHook])
-  (:import [backtype.storm.tuple Tuple Fields TupleImpl MessageId])
+  (:import [backtype.storm.tuple AddressedTuple Tuple Fields TupleImpl MessageId])
   (:import [backtype.storm.spout ISpoutWaitStrategy ISpout SpoutOutputCollector ISpoutOutputCollector])
   (:import [backtype.storm.hooks.info SpoutAckInfo SpoutFailInfo
             EmitInfo BoltFailInfo BoltAckInfo BoltExecuteInfo])
@@ -30,7 +30,7 @@
   (:import [backtype.storm.generated GlobalStreamId])
   (:import [backtype.storm.utils Utils MutableObject RotatingMap RotatingMap$ExpiredCallback MutableLong Time DisruptorQueue WorkerBackpressureThread])
   (:import [com.lmax.disruptor InsufficientCapacityException])
-  (:import [backtype.storm.serialization KryoTupleSerializer KryoTupleDeserializer])
+  (:import [backtype.storm.serialization KryoTupleSerializer])
   (:import [backtype.storm.daemon Shutdownable])
   (:import [backtype.storm.metric.api IMetric IMetricsConsumer$TaskInfo IMetricsConsumer$DataPoint StateMetric])
   (:import [backtype.storm Config Constants])
@@ -206,9 +206,10 @@
 (defn mk-executor-transfer-fn [batch-transfer->worker storm-conf]
   (fn this
     [task tuple]
-    (when (= true (storm-conf TOPOLOGY-DEBUG))
-      (log-message "TRANSFERING tuple TASK: " task " TUPLE: " tuple))
-    (disruptor/publish batch-transfer->worker [task tuple])))
+    (let [val (AddressedTuple. task tuple)]
+      (when (= true (storm-conf TOPOLOGY-DEBUG))
+        (log-message "TRANSFERING tuple " val))
+      (disruptor/publish batch-transfer->worker val))))
 
 (defn mk-executor-data [worker executor-id]
   (let [worker-context (worker-context worker)
@@ -257,7 +258,6 @@
                                     (exception-cause? java.io.InterruptedIOException error))
                                (log-message "Got interrupted excpetion shutting thread down...")
                                ((:suicide-fn <>))))
-     :deserializer (KryoTupleDeserializer. storm-conf worker-context)
      :sampler (mk-stats-sampler storm-conf)
      :backpressure (atom false)
      :spout-throttling-metrics (if (= executor-type :spout) 
@@ -296,8 +296,7 @@
           (.add alist o)
           (when batch-end?
             (worker-transfer-fn serializer alist)
-            (.setObject cached-emit (ArrayList.))
-            )))
+            (.setObject cached-emit (ArrayList.)))))
       :kill-fn (:report-error-and-die executor-data))))
 
 (defn setup-metrics! [executor-data]
@@ -309,9 +308,8 @@
        interval
        interval
        (fn []
-         (disruptor/publish
-          receive-queue
-          [[nil (TupleImpl. worker-context [interval] Constants/SYSTEM_TASK_ID Constants/METRICS_TICK_STREAM_ID)]]))))))
+         (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. worker-context [interval] Constants/SYSTEM_TASK_ID Constants/METRICS_TICK_STREAM_ID))]]
+           (disruptor/publish receive-queue val)))))))
 
 (defn metrics-tick
   [executor-data task-data ^TupleImpl tuple]
@@ -333,7 +331,7 @@
                                      (IMetricsConsumer$DataPoint. name value)))))
                           (filter identity)
                           (into []))]
-     (if (seq data-points)
+     (when (seq data-points)
        (task/send-unanchored task-data Constants/METRICS_STREAM_ID [task-info data-points]))))
 
 (defn setup-ticks! [worker executor-data]
@@ -351,10 +349,8 @@
           tick-time-secs
           tick-time-secs
           (fn []
-            (disruptor/publish
-              receive-queue
-              [[nil (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID)]]
-              )))))))
+            (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID))]]
+              (disruptor/publish receive-queue val))))))))
 
 (defn mk-executor [worker executor-id initial-credentials]
   (let [executor-data (mk-executor-data worker executor-id)
@@ -394,10 +390,9 @@
         executor-id)
       (credentials-changed [this creds]
         (let [receive-queue (:receive-queue executor-data)
-              context (:worker-context executor-data)]
-          (disruptor/publish
-            receive-queue
-            [[nil (TupleImpl. context [creds] Constants/SYSTEM_TASK_ID Constants/CREDENTIALS_CHANGED_STREAM_ID)]])))
+              context (:worker-context executor-data)
+              val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [creds] Constants/SYSTEM_TASK_ID Constants/CREDENTIALS_CHANGED_STREAM_ID))]]
+          (disruptor/publish receive-queue val)))
       (get-backpressure-flag [this]
         @(:backpressure executor-data))
       Shutdownable
@@ -444,16 +439,16 @@
       (stats/spout-acked-tuple! (:stats executor-data) (:stream tuple-info) time-delta))))
 
 (defn mk-task-receiver [executor-data tuple-action-fn]
-  (let [^KryoTupleDeserializer deserializer (:deserializer executor-data)
-        task-ids (:task-ids executor-data)
+  (let [task-ids (:task-ids executor-data)
         debug? (= true (-> executor-data :storm-conf (get TOPOLOGY-DEBUG)))
         ]
     (disruptor/clojure-handler
       (fn [tuple-batch sequence-id end-of-batch?]
-        (fast-list-iter [[task-id msg] tuple-batch]
-          (let [^TupleImpl tuple (if (instance? Tuple msg) msg (.deserialize deserializer msg))]
+        (fast-list-iter [^AddressedTuple addressed-tuple tuple-batch]
+          (let [^TupleImpl tuple (.getTuple addressed-tuple)
+                task-id (.getDest addressed-tuple)]
             (when debug? (log-message "Processing received message FOR " task-id " TUPLE: " tuple))
-            (if task-id
+            (if (not= task-id AddressedTuple/BROADCAST_DEST)
               (tuple-action-fn task-id tuple)
               ;; null task ids are broadcast tuples
               (fast-list-iter [task-id task-ids]
@@ -479,7 +474,7 @@
           spct    (if (and (not-nil? options) (:enable options)) (:samplingpct options) 0)]
       ;; the thread's initialized random number generator is used to generate
       ;; uniformily distributed random numbers.
-      (if (and (> spct 0) (< (* 100 (.nextDouble random)) spct))
+      (when (and (> spct 0) (< (* 100 (.nextDouble random)) spct))
         (task/send-unanchored
           task-data
           EVENTLOGGER-STREAM-ID
@@ -561,9 +556,7 @@
                                                                                      task-id
                                                                                      out-stream-id
                                                                                      tuple-id)]
-                                                           (transfer-fn out-task
-                                                                        out-tuple)
-                                                           ))
+                                                           (transfer-fn out-task out-tuple)))
                                          (if has-eventloggers?
                                            (send-to-eventlogger executor-data task-data values component-id message-id rand))
                                          (if (and rooted?
@@ -757,12 +750,12 @@
                                                                             (fast-list-iter [root-id root-ids]
                                                                                             (put-xor! anchors-to-ids root-id edge-id))
                                                                             ))))
-                                                        (transfer-fn t
-                                                                   (TupleImpl. worker-context
+                                                        (let [tuple (TupleImpl. worker-context
                                                                                values
                                                                                task-id
                                                                                stream
-                                                                               (MessageId/makeId anchors-to-ids)))))
+                                                                               (MessageId/makeId anchors-to-ids))]
+                                                          (transfer-fn t tuple))))
                                     (if has-eventloggers?
                                       (send-to-eventlogger executor-data task-data values component-id nil rand))
                                     (or out-tasks [])))]]
@@ -796,8 +789,7 @@
                            (fast-map-iter [[root id] (.. tuple getMessageId getAnchorsToIds)]
                                           (task/send-unanchored task-data
                                                                 ACKER-ACK-STREAM-ID
-                                                                [root (bit-xor id ack-val)])
-                                          ))
+                                                                [root (bit-xor id ack-val)])))
                          (let [delta (tuple-time-delta! tuple)
                                debug? (= true (storm-conf TOPOLOGY-DEBUG))]
                            (when debug? 

http://git-wip-us.apache.org/repos/asf/storm/blob/1bdfd7e1/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj
index 7c0b18d..df9f725 100644
--- a/storm-core/src/clj/backtype/storm/daemon/worker.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj
@@ -31,7 +31,7 @@
   (:import [backtype.storm.daemon Shutdownable])
   (:import [backtype.storm.serialization KryoTupleSerializer])
   (:import [backtype.storm.generated StormTopology])
-  (:import [backtype.storm.tuple Fields])
+  (:import [backtype.storm.tuple AddressedTuple Fields])
   (:import [backtype.storm.task WorkerTopologyContext])
   (:import [backtype.storm Constants])
   (:import [backtype.storm.security.auth AuthUtils])
@@ -103,10 +103,15 @@
         flatten
         set )))
 
+(defn get-dest
+  [^AddressedTuple addressed-tuple]
+  "get the destination for an AddressedTuple"
+  (.getDest addressed-tuple))
+
 (defn mk-transfer-local-fn [worker]
   (let [short-executor-receive-queue-map (:short-executor-receive-queue-map worker)
         task->short-executor (:task->short-executor worker)
-        task-getter (comp #(get task->short-executor %) fast-first)]
+        task-getter (comp #(get task->short-executor %) get-dest)]
     (fn [tuple-batch]
       (let [grouped (fast-group-by task-getter tuple-batch)]
         (fast-map-iter [[short-executor pairs] grouped]
@@ -162,19 +167,21 @@
           (fn [^KryoTupleSerializer serializer tuple-batch]
             (let [^ArrayList local (ArrayList.)
                   ^HashMap remoteMap (HashMap.)]
-              (fast-list-iter [[task tuple :as pair] tuple-batch]
-                (if (local-tasks task)
-                  (.add local pair)
-
-                  ;;Using java objects directly to avoid performance issues in java code
-                  (do
-                    (when (not (.get remoteMap task))
-                      (.put remoteMap task (ArrayList.)))
-                    (let [^ArrayList remote (.get remoteMap task)]
-                      (if (not-nil? task)
-                        (.add remote (TaskMessage. ^int task ^bytes (.serialize serializer tuple)))
-                        (log-warn "Can't transfer tuple - task value is nil. tuple type: " (pr-str (type tuple)) " and information: " (pr-str tuple)))
-                     ))))
+              (fast-list-iter [^AddressedTuple addressed-tuple tuple-batch]
+                (let [task (.getDest addressed-tuple)
+                      tuple (.getTuple addressed-tuple)]
+                  (if (local-tasks task)
+                    (.add local addressed-tuple)
+
+                    ;;Using java objects directly to avoid performance issues in java code
+                    (do
+                      (when (not (.get remoteMap task))
+                        (.put remoteMap task (ArrayList.)))
+                      (let [^ArrayList remote (.get remoteMap task)]
+                        (if (not-nil? task)
+                          (.add remote (TaskMessage. task ^bytes (.serialize serializer tuple)))
+                          (log-warn "Can't transfer tuple - task value is nil. tuple type: " (pr-str (type tuple)) " and information: " (pr-str tuple)))
+                       )))))
 
               (when (not (.isEmpty local)) (local-transfer local))
               (when (not (.isEmpty remoteMap)) (disruptor/publish transfer-queue remoteMap))))]
@@ -295,7 +302,6 @@
       :default-shared-resources (mk-default-resources <>)
       :user-shared-resources (mk-user-resources <>)
       :transfer-local-fn (mk-transfer-local-fn <>)
-      :receiver-thread-count (get storm-conf WORKER-RECEIVER-THREAD-COUNT)
       :transfer-fn (mk-transfer-fn <>)
       :load-mapping (LoadMapping.)
       :assignment-versions assignment-versions
@@ -450,16 +456,12 @@
           (schedule timer recur-secs this :check-active false)
             )))))
 
-(defn launch-receive-thread [worker]
-  (log-message "Launching receive-thread for " (:assignment-id worker) ":" (:port worker))
-  (msg-loader/launch-receive-thread!
-    (:mq-context worker)
-    (:receiver worker)
-    (:storm-id worker)
-    (:receiver-thread-count worker)
-    (:port worker)
-    (:transfer-local-fn worker)
-    :kill-fn (fn [t] (exit-process! 11))))
+(defn register-callbacks [worker]
+  (log-message "Registering IConnectionCallbacks for " (:assignment-id worker) ":" (:port worker))
+  (msg-loader/register-callback (:transfer-local-fn worker)
+                                (:receiver worker)
+                                (:storm-conf worker)
+                                (worker-context worker)))
 
 (defn- close-resources [worker]
   (let [dr (:default-shared-resources worker)]
@@ -596,7 +598,7 @@
         _ (schedule-recurring (:heartbeat-timer worker) 0 (conf WORKER-HEARTBEAT-FREQUENCY-SECS) heartbeat-fn)
         _ (schedule-recurring (:executor-heartbeat-timer worker) 0 (conf TASK-HEARTBEAT-FREQUENCY-SECS) #(do-executor-heartbeats worker :executors @executors))
 
-        receive-thread-shutdown (launch-receive-thread worker)
+        _ (register-callbacks worker)
 
         refresh-connections (mk-refresh-connections worker)
         refresh-load (mk-refresh-load worker)
@@ -635,9 +637,6 @@
                       ;; this will do best effort flushing since the linger period
                       ;; was set on creation
                       (.close socket))
-                    (log-message "Shutting down receive thread")
-                    (receive-thread-shutdown)
-                    (log-message "Shut down receive thread")
                     (log-message "Terminating messaging context")
                     (log-message "Shutting down executors")
                     (doseq [executor @executors] (.shutdown executor))

http://git-wip-us.apache.org/repos/asf/storm/blob/1bdfd7e1/storm-core/src/clj/backtype/storm/messaging/loader.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/messaging/loader.clj b/storm-core/src/clj/backtype/storm/messaging/loader.clj
index c154ed8..72dd382 100644
--- a/storm-core/src/clj/backtype/storm/messaging/loader.clj
+++ b/storm-core/src/clj/backtype/storm/messaging/loader.clj
@@ -14,71 +14,21 @@
 ;; See the License for the specific language governing permissions and
 ;; limitations under the License.
 (ns backtype.storm.messaging.loader
-  (:use [backtype.storm util log])
-  (:import [java.util ArrayList Iterator])
-  (:import [backtype.storm.messaging IContext IConnection TaskMessage])
-  (:import [backtype.storm.utils DisruptorQueue MutableObject])
-  (:require [backtype.storm.messaging [local :as local]])
-  (:require [backtype.storm [disruptor :as disruptor]]))
+  (:import [backtype.storm.messaging IConnection DeserializingConnectionCallback])
+  (:require [backtype.storm.messaging [local :as local]]))
 
 (defn mk-local-context []
   (local/mk-context))
 
-(defn- mk-receive-thread [storm-id port transfer-local-fn  daemon kill-fn priority socket thread-id]
-    (async-loop
-       (fn []
-         (log-message "Starting receive-thread: [stormId: " storm-id ", port: " port ", thread-id: " thread-id  " ]")
-         (fn []
-           (let [batched (ArrayList.)
-                 ^Iterator iter (.recv ^IConnection socket 0 thread-id)
-                 closed (atom false)]
-             (when iter
-               (while (and (not @closed) (.hasNext iter)) 
-                  (let [packet (.next iter)
-                        task (if packet (.task ^TaskMessage packet))
-                        message (if packet (.message ^TaskMessage packet))]
-                      (if (= task -1)
-                         (do (log-message "Receiving-thread:[" storm-id ", " port "] received shutdown notice")
-                           (.close socket)
-                           (reset! closed  true))
-                         (when packet (.add batched [task message]))))))
-             
-             (when (not @closed)
-               (do
-                 (if (> (.size batched) 0)
-                   (transfer-local-fn batched))
-                 0)))))
-         :factory? true
-         :daemon daemon
-         :kill-fn kill-fn
-         :priority priority
-         :thread-name (str "worker-receiver-thread-" thread-id)))
+(defn- mk-connection-callback
+  "make an IConnectionCallback"
+  [transfer-local-fn storm-conf worker-context]
+  (DeserializingConnectionCallback. storm-conf
+                                    worker-context
+                                    (fn [batch]
+                                      (transfer-local-fn batch))))
 
-(defn- mk-receive-threads [storm-id port transfer-local-fn  daemon kill-fn priority socket thread-count]
-  (into [] (for [thread-id (range thread-count)] 
-             (mk-receive-thread storm-id port transfer-local-fn  daemon kill-fn priority socket thread-id))))
-
-
-(defnk launch-receive-thread!
-  [context socket storm-id receiver-thread-count port transfer-local-fn
-   :daemon true
-   :kill-fn (fn [t] (System/exit 1))
-   :priority Thread/NORM_PRIORITY]
-  (let [local-hostname (memoized-local-hostname)
-        thread-count (if receiver-thread-count receiver-thread-count 1)
-        vthreads (mk-receive-threads storm-id port transfer-local-fn daemon kill-fn priority socket thread-count)]
-    (fn []
-      (let [kill-socket (.connect ^IContext context storm-id local-hostname port)]
-        (log-message "Shutting down receiving-thread: [" storm-id ", " port "]")
-        (.send ^IConnection kill-socket
-                  -1 (byte-array []))
-        
-        (.close ^IConnection kill-socket)
-        
-        (log-message "Waiting for receiving-thread:[" storm-id ", " port "] to die")
-        
-        (for [thread-id (range thread-count)] 
-             (.join (vthreads thread-id)))
-        
-        (log-message "Shutdown receiving-thread: [" storm-id ", " port "]")
-        ))))
+(defn register-callback
+  "register the local-transfer-fn with the server"
+  [transfer-local-fn ^IConnection socket storm-conf worker-context]
+  (.registerRecv socket (mk-connection-callback transfer-local-fn storm-conf worker-context)))

http://git-wip-us.apache.org/repos/asf/storm/blob/1bdfd7e1/storm-core/src/clj/backtype/storm/messaging/local.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/messaging/local.clj b/storm-core/src/clj/backtype/storm/messaging/local.clj
index 60a6bd2..b99a77a 100644
--- a/storm-core/src/clj/backtype/storm/messaging/local.clj
+++ b/storm-core/src/clj/backtype/storm/messaging/local.clj
@@ -14,76 +14,10 @@
 ;; See the License for the specific language governing permissions and
 ;; limitations under the License.
 (ns backtype.storm.messaging.local
-  (:refer-clojure :exclude [send])
-  (:use [backtype.storm log util])
-  (:import [backtype.storm.messaging IContext IConnection TaskMessage])
-  (:import [backtype.storm.grouping Load])
-  (:import [java.util.concurrent LinkedBlockingQueue])
-  (:import [java.util Map Iterator Collection])
-  (:import [java.util Iterator ArrayList])
-  (:gen-class))
-
-(defn update-load! [cached-task->load lock task->load]
-  (locking lock
-    (swap! cached-task->load merge task->load)))
-
-(defn add-queue! [queues-map lock storm-id port]
-  (let [id (str storm-id "-" port)]
-    (locking lock
-      (when-not (contains? @queues-map id)
-        (swap! queues-map assoc id (LinkedBlockingQueue.))))
-    (@queues-map id)))
-
-(deftype LocalConnection [storm-id port queues-map lock queue task->load]
-  IConnection
-  (^Iterator recv [this ^int flags ^int clientId]
-    (when-not queue
-      (throw (IllegalArgumentException. "Cannot receive on this socket")))
-    (let [ret (ArrayList.)
-          msg (if (= flags 1) (.poll queue) (.take queue))]
-      (if msg
-        (do 
-          (.add ret msg)
-          (.iterator ret))
-        nil)))
-  (^void send [this ^int taskId ^bytes payload]
-    (let [send-queue (add-queue! queues-map lock storm-id port)]
-      (.put send-queue (TaskMessage. taskId payload))
-      ))
-  (^void send [this ^Iterator iter]
-    (let [send-queue (add-queue! queues-map lock storm-id port)]
-      (while (.hasNext iter) 
-         (.put send-queue (.next iter)))
-      ))
-  (^void sendLoadMetrics [this ^Map taskToLoad]
-    (update-load! task->load lock taskToLoad))
-  (^Map getLoad [this ^Collection tasks]
-    (locking lock
-      (into {}
-        (for [task tasks
-              :let [load (.get @task->load task)]
-              :when (not-nil? load)]
-          ;; for now we are ignoring the connection load locally
-          [task (Load. true load 0.0)]))))
-  (^void close [this]))
-
-
-(deftype LocalContext [^{:unsynchronized-mutable true} queues-map
-                       ^{:unsynchronized-mutable true} lock
-                       ^{:unsynchronized-mutable true} task->load]
-  IContext
-  (^void prepare [this ^Map storm-conf]
-    (set! queues-map (atom {}))
-    (set! task->load (atom {}))
-    (set! lock (Object.)))
-  (^IConnection bind [this ^String storm-id ^int port]
-    (LocalConnection. storm-id port queues-map lock (add-queue! queues-map lock storm-id port) task->load))
-  (^IConnection connect [this ^String storm-id ^String host ^int port]
-    (LocalConnection. storm-id port queues-map lock nil task->load))
-  (^void term [this]
-    ))
+  (:import [backtype.storm.messaging IContext])
+  (:import [backtype.storm.messaging.local Context]))
 
 (defn mk-context [] 
-  (let [context  (LocalContext. nil nil nil)]
+  (let [context  (Context.)]
     (.prepare ^IContext context nil)
     context))

http://git-wip-us.apache.org/repos/asf/storm/blob/1bdfd7e1/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 4cde8ad..95e5ccc 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -1136,13 +1136,6 @@ public class Config extends HashMap<String, Object> {
     public static final String WORKER_GC_CHILDOPTS = "worker.gc.childopts";
 
     /**
-     * control how many worker receiver threads we need per worker
-     */
-    @isInteger
-    @isPositiveNumber
-    public static final String WORKER_RECEIVER_THREAD_COUNT = "topology.worker.receiver.thread.count";
-
-    /**
      * How often this worker should heartbeat to the supervisor.
      */
     @isInteger

http://git-wip-us.apache.org/repos/asf/storm/blob/1bdfd7e1/storm-core/src/jvm/backtype/storm/messaging/AddressedTuple.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/AddressedTuple.java b/storm-core/src/jvm/backtype/storm/messaging/AddressedTuple.java
new file mode 100644
index 0000000..de9a3e6
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/AddressedTuple.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging;
+
+import backtype.storm.tuple.Tuple;
+
+/**
+ * A Tuple that is addressed to a destination.
+ */
+public class AddressedTuple {
+    public final Tuple tuple;
+    public final int dest;
+
+    public AddressedTuple(int dest, Tuple tuple) {
+        this.dest = dest;
+        this.tuple = tuple;
+    }
+
+    public Tuple getTuple() {
+        return tuple;
+    }
+
+    public int getDest() {
+        return dest;
+    }
+
+    @Override
+    public String toString() {
+        return "[dest: "+dest+" tuple: "+tuple+"]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1bdfd7e1/storm-core/src/jvm/backtype/storm/messaging/DeserializingConnectionCallback.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/DeserializingConnectionCallback.java b/storm-core/src/jvm/backtype/storm/messaging/DeserializingConnectionCallback.java
new file mode 100644
index 0000000..1e2d3aa
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/DeserializingConnectionCallback.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging;
+
+import backtype.storm.task.GeneralTopologyContext;
+import backtype.storm.tuple.AddressedTuple;
+import backtype.storm.serialization.KryoTupleDeserializer;
+
+import clojure.lang.IFn;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A class that is called when a TaskMessage arrives.
+ */
+public class DeserializingConnectionCallback implements IConnectionCallback {
+    private final IFn _cb;
+    private final Map _conf;
+    private final GeneralTopologyContext _context;
+    private final ThreadLocal<KryoTupleDeserializer> _des =
+         new ThreadLocal<KryoTupleDeserializer>() {
+             @Override
+             protected KryoTupleDeserializer initialValue() {
+                 return new KryoTupleDeserializer(_conf, _context);
+             }
+         };
+
+    public DeserializingConnectionCallback(final Map conf, final GeneralTopologyContext context, IFn callback) {
+        _conf = conf;
+        _context = context;
+        _cb = callback;
+    }
+
+    @Override
+    public void recv(List<TaskMessage> batch) {
+        KryoTupleDeserializer des = _des.get();
+        ArrayList<AddressedTuple> ret = new ArrayList<>(batch.size());
+        for (TaskMessage message: batch) {
+            ret.add(new AddressedTuple(message.task(), des.deserialize(message.message())));
+        }
+        _cb.invoke(ret);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1bdfd7e1/storm-core/src/jvm/backtype/storm/messaging/IConnection.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/IConnection.java b/storm-core/src/jvm/backtype/storm/messaging/IConnection.java
index 88f0c0a..a03b3a2 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/IConnection.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/IConnection.java
@@ -22,14 +22,12 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.Map;
 
-public interface IConnection {   
-    
+public interface IConnection {
     /**
-     * receive a batch message iterator (consists taskId and payload)
-     * @param flags 0: block, 1: non-block
-     * @return
+     * Register a callback to be notified when data is ready to be processed.
+     * @param cb the callback to process the messages.
      */
-    public Iterator<TaskMessage> recv(int flags, int clientId);
+    public void registerRecv(IConnectionCallback cb);
 
     /**
      * Send load metrics to all downstream connections.

http://git-wip-us.apache.org/repos/asf/storm/blob/1bdfd7e1/storm-core/src/jvm/backtype/storm/messaging/IConnectionCallback.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/IConnectionCallback.java b/storm-core/src/jvm/backtype/storm/messaging/IConnectionCallback.java
new file mode 100644
index 0000000..ecf0828
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/IConnectionCallback.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging;
+
+import java.util.List;
+
+/**
+ * A class that is called when a TaskMessage arrives.
+ */
+public interface IConnectionCallback {
+    /**
+     * A batch of new messages have arrived to be processed
+     * @param batch the messages to be processed
+     */
+    public void recv(List<TaskMessage> batch);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1bdfd7e1/storm-core/src/jvm/backtype/storm/messaging/local/Context.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/local/Context.java b/storm-core/src/jvm/backtype/storm/messaging/local/Context.java
new file mode 100644
index 0000000..968fe64
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/local/Context.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.local;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import backtype.storm.Config;
+import backtype.storm.grouping.Load;
+import backtype.storm.messaging.IConnection;
+import backtype.storm.messaging.TaskMessage;
+import backtype.storm.messaging.IConnectionCallback;
+import backtype.storm.messaging.IContext;
+
+public class Context implements IContext {
+    private static final Logger LOG = LoggerFactory.getLogger(Context.class);
+
+    private static class LocalServer implements IConnection {
+        IConnectionCallback _cb;
+        final ConcurrentHashMap<Integer, Double> _load = new ConcurrentHashMap<>();
+
+        @Override
+        public void registerRecv(IConnectionCallback cb) {
+            _cb = cb;
+        }
+
+        @Override
+        public void send(int taskId,  byte[] payload) {
+            throw new IllegalArgumentException("SHOULD NOT HAPPEN");
+        }
+ 
+        @Override
+        public void send(Iterator<TaskMessage> msgs) {
+            throw new IllegalArgumentException("SHOULD NOT HAPPEN");
+        }
+
+        @Override
+        public Map<Integer, Load> getLoad(Collection<Integer> tasks) {
+            Map<Integer, Load> ret = new HashMap<>();
+            for (Integer task : tasks) {
+                Double found = _load.get(task);
+                if (found != null) {
+                    ret.put(task, new Load(true, found, 0));
+                }
+            }
+            return ret; 
+        }
+
+        @Override
+        public void sendLoadMetrics(Map<Integer, Double> taskToLoad) {
+            _load.putAll(taskToLoad);
+        }
+ 
+        @Override
+        public void close() {
+            //NOOP
+        }
+    };
+
+    private static class LocalClient implements IConnection {
+        private final LocalServer _server;
+
+        public LocalClient(LocalServer server) {
+            _server = server;
+        }
+
+        @Override
+        public void registerRecv(IConnectionCallback cb) {
+            throw new IllegalArgumentException("SHOULD NOT HAPPEN");
+        }
+
+        @Override
+        public void send(int taskId,  byte[] payload) {
+            if (_server._cb != null) {
+                _server._cb.recv(Arrays.asList(new TaskMessage(taskId, payload)));
+            }
+        }
+ 
+        @Override
+        public void send(Iterator<TaskMessage> msgs) {
+            if (_server._cb != null) {
+                ArrayList<TaskMessage> ret = new ArrayList<>();
+                while (msgs.hasNext()) {
+                    ret.add(msgs.next());
+                }
+                _server._cb.recv(ret);
+            }
+        }
+
+        @Override
+        public Map<Integer, Load> getLoad(Collection<Integer> tasks) {
+            return _server.getLoad(tasks);
+        }
+
+        @Override
+        public void sendLoadMetrics(Map<Integer, Double> taskToLoad) {
+            _server.sendLoadMetrics(taskToLoad);
+        }
+ 
+        @Override
+        public void close() {
+            //NOOP
+        }
+    };
+
+    private static ConcurrentHashMap<String, LocalServer> _registry = new ConcurrentHashMap<>();
+    private static LocalServer getLocalServer(String nodeId, int port) {
+        String key = nodeId + "-" + port;
+        LocalServer ret = _registry.get(key);
+        if (ret == null) {
+            ret = new LocalServer();
+            LocalServer tmp = _registry.putIfAbsent(key, ret);
+            if (tmp != null) {
+                ret = tmp;
+            }
+        }
+        return ret;
+    }
+        
+    @SuppressWarnings("rawtypes")
+    @Override
+    public void prepare(Map storm_conf) {
+        //NOOP
+    }
+
+    @Override
+    public IConnection bind(String storm_id, int port) {
+        return getLocalServer(storm_id, port);
+    }
+
+    @Override
+    public IConnection connect(String storm_id, String host, int port) {
+        return new LocalClient(getLocalServer(storm_id, port));
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public void term() {
+        //NOOP
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1bdfd7e1/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index ccdef41..4f813ba 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -32,6 +32,7 @@ import backtype.storm.Config;
 import backtype.storm.grouping.Load;
 import backtype.storm.messaging.ConnectionWithStatus;
 import backtype.storm.messaging.TaskMessage;
+import backtype.storm.messaging.IConnectionCallback;
 import backtype.storm.metric.api.IStatefulObject;
 import backtype.storm.utils.StormBoundedExponentialBackoffRetry;
 import backtype.storm.utils.Utils;
@@ -217,7 +218,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
      * @throws java.lang.UnsupportedOperationException whenever this method is being called.
      */
     @Override
-    public Iterator<TaskMessage> recv(int flags, int clientId) {
+    public void registerRecv(IConnectionCallback cb) {
         throw new UnsupportedOperationException("Client connection should not receive any messages");
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/1bdfd7e1/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
index 32c2bd7..bca3936 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
@@ -45,6 +45,8 @@ import org.slf4j.LoggerFactory;
 import backtype.storm.Config;
 import backtype.storm.grouping.Load;
 import backtype.storm.messaging.ConnectionWithStatus;
+import backtype.storm.messaging.IConnection;
+import backtype.storm.messaging.IConnectionCallback;
 import backtype.storm.messaging.TaskMessage;
 import backtype.storm.metric.api.IStatefulObject;
 import backtype.storm.serialization.KryoValuesSerializer;
@@ -58,41 +60,22 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
     int port;
     private final ConcurrentHashMap<String, AtomicInteger> messagesEnqueued = new ConcurrentHashMap<>();
     private final AtomicInteger messagesDequeued = new AtomicInteger(0);
-    private final AtomicInteger[] pendingMessages;
-    
-    // Create multiple queues for incoming messages. The size equals the number of receiver threads.
-    // For message which is sent to same task, it will be stored in the same queue to preserve the message order.
-    private LinkedBlockingQueue<ArrayList<TaskMessage>>[] message_queue;
     
     volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server");
     final ChannelFactory factory;
     final ServerBootstrap bootstrap;
-    
-    private int queueCount;
-    private volatile HashMap<Integer, Integer> taskToQueueId = null;
-    int roundRobinQueueId;
-	
+ 
     private volatile boolean closing = false;
     List<TaskMessage> closeMessage = Arrays.asList(new TaskMessage(-1, null));
     private KryoValuesSerializer _ser;
+    private IConnectionCallback _cb = null; 
     
     @SuppressWarnings("rawtypes")
     Server(Map storm_conf, int port) {
         this.storm_conf = storm_conf;
         this.port = port;
         _ser = new KryoValuesSerializer(storm_conf);
-        
-        queueCount = Utils.getInt(storm_conf.get(Config.WORKER_RECEIVER_THREAD_COUNT), 1);
-        roundRobinQueueId = 0;
-        taskToQueueId = new HashMap<>();
-    
-        message_queue = new LinkedBlockingQueue[queueCount];
-        pendingMessages = new AtomicInteger[queueCount];
-        for (int i = 0; i < queueCount; i++) {
-            message_queue[i] = new LinkedBlockingQueue<>();
-            pendingMessages[i] = new AtomicInteger(0);
-        }
-        
+
         // Configure the server.
         int buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
         int backlog = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_SOCKET_BACKLOG), 500);
@@ -124,48 +107,7 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
         Channel channel = bootstrap.bind(new InetSocketAddress(port));
         allChannels.add(channel);
     }
-    
-    private ArrayList<TaskMessage>[] groupMessages(List<TaskMessage> msgs) {
-        ArrayList<TaskMessage> messageGroups[] = new ArrayList[queueCount];
-
-        for (TaskMessage message : msgs) {
-            int task = message.task();
-
-            if (task == -1) {
-                closing = true;
-                return null;
-            }
-
-            Integer queueId = getMessageQueueId(task);
-
-            if (null == messageGroups[queueId]) {
-                messageGroups[queueId] = new ArrayList<>();
-            }
-            messageGroups[queueId].add(message);
-        }
-        return messageGroups;
-    }
-    
-    private Integer getMessageQueueId(int task) {
-        // try to construct the map from taskId -> queueId in round robin manner.
-        Integer queueId = taskToQueueId.get(task);
-        if (null == queueId) {
-            synchronized (this) {
-                queueId = taskToQueueId.get(task);
-                if (queueId == null) {
-                    queueId = roundRobinQueueId++;
-                    if (roundRobinQueueId == queueCount) {
-                        roundRobinQueueId = 0;
-                    }
-                    HashMap<Integer, Integer> newRef = new HashMap<>(taskToQueueId);
-                    newRef.put(task, queueId);
-                    taskToQueueId = newRef;
-                }
-            }
-        }
-        return queueId;
-    }
-
+ 
     private void addReceiveCount(String from, int amount) {
         //This is possibly lossy in the case where a value is deleted
         // because it has received no messages over the metrics collection
@@ -193,48 +135,14 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
             return;
         }
         addReceiveCount(from, msgs.size());
-        ArrayList<TaskMessage> messageGroups[] = groupMessages(msgs);
-
-        if (null == messageGroups || closing) {
-            return;
-        }
-
-        for (int receiverId = 0; receiverId < messageGroups.length; receiverId++) {
-            ArrayList<TaskMessage> msgGroup = messageGroups[receiverId];
-            if (null != msgGroup) {
-                message_queue[receiverId].put(msgGroup);
-                pendingMessages[receiverId].addAndGet(msgGroup.size());
-            }
+        if (_cb != null) {
+            _cb.recv(msgs);
         }
     }
 
-    public Iterator<TaskMessage> recv(int flags, int receiverId) {
-        if (closing) {
-            return closeMessage.iterator();
-        }
-
-        ArrayList<TaskMessage> ret;
-        int queueId = receiverId % queueCount;
-        if ((flags & 0x01) == 0x01) {
-            //non-blocking
-            ret = message_queue[queueId].poll();
-        } else {
-            try {
-                ArrayList<TaskMessage> request = message_queue[queueId].take();
-                LOG.debug("request to be processed: {}", request);
-                ret = request;
-            } catch (InterruptedException e) {
-                LOG.info("exception within msg receiving", e);
-                ret = null;
-            }
-        }
-
-        if (null != ret) {
-            messagesDequeued.addAndGet(ret.size());
-            pendingMessages[queueId].addAndGet(0 - ret.size());
-            return ret.iterator();
-        }
-        return null;
+    @Override
+    public void registerRecv(IConnectionCallback cb) {
+        _cb = cb;
     }
    
     /**
@@ -326,12 +234,7 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
         LOG.debug("Getting metrics for server on port {}", port);
         HashMap<String, Object> ret = new HashMap<>();
         ret.put("dequeuedMessages", messagesDequeued.getAndSet(0));
-        ArrayList<Integer> pending = new ArrayList<>(pendingMessages.length);
-        for (AtomicInteger p: pendingMessages) {
-            pending.add(p.get());
-        }
-        ret.put("pending", pending);
-        HashMap<String, Integer> enqueued = new HashMap<>();
+        HashMap<String, Integer> enqueued = new HashMap<String, Integer>();
         Iterator<Map.Entry<String, AtomicInteger>> it = messagesEnqueued.entrySet().iterator();
         while (it.hasNext()) {
             Map.Entry<String, AtomicInteger> ent = it.next();

http://git-wip-us.apache.org/repos/asf/storm/blob/1bdfd7e1/storm-core/src/jvm/backtype/storm/tuple/AddressedTuple.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/AddressedTuple.java b/storm-core/src/jvm/backtype/storm/tuple/AddressedTuple.java
new file mode 100644
index 0000000..c3aec72
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/tuple/AddressedTuple.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.tuple;
+
+/**
+ * A Tuple that is addressed to a destination.
+ */
+public class AddressedTuple {
+    /**
+     * Destination used when broadcasting a tuple.
+     */
+    public static final int BROADCAST_DEST = -2;
+    public final Tuple tuple;
+    public final int dest;
+
+    public AddressedTuple(int dest, Tuple tuple) {
+        this.dest = dest;
+        this.tuple = tuple;
+    }
+
+    public Tuple getTuple() {
+        return tuple;
+    }
+
+    public int getDest() {
+        return dest;
+    }
+
+    @Override
+    public String toString() {
+        return "[dest: "+dest+" tuple: "+tuple+"]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1bdfd7e1/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
index aa806a7..1d6f104 100644
--- a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
@@ -15,7 +15,7 @@
 ;; limitations under the License.
 (ns backtype.storm.messaging.netty-unit-test
   (:use [clojure test])
-  (:import [backtype.storm.messaging TransportFactory])
+  (:import [backtype.storm.messaging TransportFactory IConnection TaskMessage IConnectionCallback])
   (:import [backtype.storm.utils Utils])
   (:use [backtype.storm testing util config log])
   (:use [backtype.storm.daemon.worker :only [is-connection-ready]])
@@ -45,20 +45,37 @@
            (throw (RuntimeException. (str "Netty connections were not ready within " max-wait-ms " ms"))))
          (log-message "All Netty connections are ready"))))))
 
+(defn mk-connection-callback
+  "make an IConnectionCallback"
+  [my-fn]
+  (reify IConnectionCallback
+    (recv [this batch]
+      (doseq [msg batch]
+        (my-fn msg)))))
+
+(defn register-callback
+  "register the local-transfer-fn with the server"
+  [my-fn ^IConnection socket]
+  (.registerRecv socket (mk-connection-callback my-fn)))
+
+(defn- wait-for-not-nil
+  [atm]
+  (while-timeout TEST-TIMEOUT-MS (nil? @atm) (Thread/sleep 10)))
 
 (defn- test-basic-fn [storm-conf]
   (log-message "1. Should send and receive a basic message")
   (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
         context (TransportFactory/makeContext storm-conf)
         port (available-port 6700)
+        resp (atom nil)
         server (.bind context nil port)
+        _ (register-callback (fn [message] (reset! resp message)) server)
         client (.connect context nil "localhost" port)
         _ (wait-until-ready [server client])
-        _ (.send client task (.getBytes req_msg))
-        iter (.recv server 0 0)
-        resp (.next iter)]
-    (is (= task (.task resp)))
-    (is (= req_msg (String. (.message resp))))
+        _ (.send client task (.getBytes req_msg))]
+    (wait-for-not-nil resp)
+    (is (= task (.task @resp)))
+    (is (= req_msg (String. (.message @resp))))
     (.close client)
     (.close server)
     (.term context)))
@@ -88,19 +105,20 @@
   (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
         context (TransportFactory/makeContext storm-conf)
         port (available-port 6700)
+        resp (atom nil)
         server (.bind context nil port)
+        _ (register-callback (fn [message] (reset! resp message)) server)
         client (.connect context nil "localhost" port)
         _ (wait-until-ready [server client])
         _ (.send client task (.getBytes req_msg))
-        iter (.recv server 0 0)
-        resp (.next iter)
         _ (.sendLoadMetrics server {(int 1) 0.0 (int 2) 1.0})
         _ (while-timeout 5000 (empty? (.getLoad client [(int 1) (int 2)])) (Thread/sleep 10))
         load (.getLoad client [(int 1) (int 2)])]
     (is (= 0.0 (.getBoltLoad (.get load (int 1)))))
     (is (= 1.0 (.getBoltLoad (.get load (int 2)))))
-    (is (= task (.task resp)))
-    (is (= req_msg (String. (.message resp))))
+    (wait-for-not-nil resp)
+    (is (= task (.task @resp)))
+    (is (= req_msg (String. (.message @resp))))
     (.close client)
     (.close server)
     (.term context)))
@@ -130,14 +148,15 @@
   (let [req_msg (apply str (repeat 2048000 'c'))
         context (TransportFactory/makeContext storm-conf)
         port (available-port 6700)
+        resp (atom nil)
         server (.bind context nil port)
+        _ (register-callback (fn [message] (reset! resp message)) server)
         client (.connect context nil "localhost" port)
         _ (wait-until-ready [server client])
-        _ (.send client task (.getBytes req_msg))
-        iter (.recv server 0 0)
-        resp (.next iter)]
-    (is (= task (.task resp)))
-    (is (= req_msg (String. (.message resp))))
+        _ (.send client task (.getBytes req_msg))]
+    (wait-for-not-nil resp)
+    (is (= task (.task @resp)))
+    (is (= req_msg (String. (.message @resp))))
     (.close client)
     (.close server)
     (.term context)))
@@ -164,27 +183,28 @@
 
 (defn- test-server-delayed-fn [storm-conf]
   (log-message "4. test server delayed")
-    (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
-          context (TransportFactory/makeContext storm-conf)
-          port (available-port 6700)
-          client (.connect context nil "localhost" port)
+  (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
+        context (TransportFactory/makeContext storm-conf)
+        resp (atom nil)
+        port (available-port 6700)
+        client (.connect context nil "localhost" port)
 
-          server (Thread.
-                  (fn []
-                    (Thread/sleep 1000)
-                    (let [server (.bind context nil port)
-                          iter (.recv server 0 0)
-                          resp (.next iter)]
-                      (is (= task (.task resp)))
-                      (is (= req_msg (String. (.message resp))))
-                      (.close server))))]
-      (.start server)
-      (wait-until-ready [server client])
-      (.send client task (.getBytes req_msg))
+        server (Thread.
+                 (fn []
+                   (Thread/sleep 100)
+                   (let [server (.bind context nil port)]
+                     (register-callback (fn [message] (reset! resp message)) server))))]
+    (.start server)
+    (wait-until-ready [server client])
+    (.send client task (.getBytes req_msg))
 
-      (.join server)
-      (.close client)
-      (.term context)))
+    (wait-for-not-nil resp)
+    (is (= task (.task @resp)))
+    (is (= req_msg (String. (.message @resp))))
+
+    (.join server)
+    (.close client)
+    (.term context)))
 
 (deftest test-server-delayed
  (let [storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
@@ -211,28 +231,24 @@
   (log-message "5. test batch")
   (let [num-messages 100000
         _ (log-message "Should send and receive many messages (testing with " num-messages " messages)")
+        resp (ArrayList.)
+        received (atom 0)
         context (TransportFactory/makeContext storm-conf)
         port (available-port 6700)
         server (.bind context nil port)
+        _ (register-callback (fn [message] (.add resp message) (swap! received inc)) server)
         client (.connect context nil "localhost" port)
         _ (wait-until-ready [server client])]
-    (doseq [num  (range 1 num-messages)]
+    (doseq [num (range 1 num-messages)]
       (let [req_msg (str num)]
         (.send client task (.getBytes req_msg))))
 
-    (let [resp (ArrayList.)
-          received (atom 0)]
-      (while (< @received (- num-messages 1))
-        (let [iter (.recv server 0 0)]
-          (while (.hasNext iter)
-            (let [msg (.next iter)]
-              (.add resp msg)
-              (swap! received inc)
-              ))))
-      (doseq [num  (range 1 num-messages)]
+    (while-timeout TEST-TIMEOUT-MS (< (.size resp) (- num-messages 1)) (log-message (.size resp) " " num-messages) (Thread/sleep 10))
+
+    (doseq [num  (range 1 num-messages)]
       (let [req_msg (str num)
             resp_msg (String. (.message (.get resp (- num 1))))]
-        (is (= req_msg resp_msg)))))
+        (is (= req_msg resp_msg))))
 
     (.close client)
     (.close server)
@@ -274,20 +290,18 @@
                       TOPOLOGY-TUPLE-SERIALIZER "backtype.storm.serialization.types.ListDelegateSerializer"
                       TOPOLOGY-FALL-BACK-ON-JAVA-SERIALIZATION false
                       TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS false}
+          resp (atom nil)
           context (TransportFactory/makeContext storm-conf)
           port (available-port 6700)
           client (.connect context nil "localhost" port)
           _ (.send client task (.getBytes req_msg))
           server (.bind context nil port)
+          _ (register-callback (fn [message] (reset! resp message)) server)
           _ (wait-until-ready [server client])
-          _ (.send client task (.getBytes req_msg))
-          iter (future (.recv server 0 0))
-          resp (deref iter 5000 nil)
-          resp-val (if resp (.next resp) nil)]
-      (is resp-val)
-      (when resp-val
-        (is (= task (.task resp-val)))
-        (is (= req_msg (String. (.message resp-val)))))
+          _ (.send client task (.getBytes req_msg))]
+      (wait-for-not-nil resp)
+      (is (= task (.task @resp)))
+      (is (= req_msg (String. (.message @resp))))
       (.close client)
       (.close server)
       (.term context)))

http://git-wip-us.apache.org/repos/asf/storm/blob/1bdfd7e1/storm-core/test/clj/backtype/storm/messaging_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/messaging_test.clj b/storm-core/test/clj/backtype/storm/messaging_test.clj
index 89d8586..86162dd 100644
--- a/storm-core/test/clj/backtype/storm/messaging_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging_test.clj
@@ -61,28 +61,3 @@
     (.cleanup this))
   (startup [this]
     ))
-
-;; Test Adding more receiver threads won't violate the message delivery order gurantee
-(deftest test-receiver-message-order 
-  (with-simulated-time-local-cluster [cluster :supervisors 1 :ports-per-supervisor 2
-                                        :daemon-conf {TOPOLOGY-WORKERS 2
-                                                      ;; Configure multiple receiver threads per worker 
-                                                      WORKER-RECEIVER-THREAD-COUNT 2
-                                                      STORM-LOCAL-MODE-ZMQ  true 
-                                                      STORM-MESSAGING-TRANSPORT 
-                                                      "backtype.storm.messaging.netty.Context"}]
-      (let [topology (thrift/mk-topology
-                       
-                       ;; TestEventLogSpout output(sourceId, eventId), eventId is Monotonically increasing
-                       {"1" (thrift/mk-spout-spec (TestEventLogSpout. 4000) :parallelism-hint 8)}
-                       
-                       ;; field grouping, message from same "source" task will be delivered to same bolt task
-                       ;; When received message order is not kept, Emit an error Tuple 
-                       {"2" (thrift/mk-bolt-spec {"1" ["source"]} (TestEventOrderCheckBolt.)
-                                                 :parallelism-hint 4)
-                        })
-            results (complete-topology cluster
-                                       topology)]
-        
-        ;; No error Tuple from Bolt TestEventOrderCheckBolt
-        (is (empty? (read-tuples results "2"))))))


[2/3] storm git commit: Merge branch 'server-callback' of https://github.com/revans2/incubator-storm into STORM-1145

Posted by bo...@apache.org.
Merge branch 'server-callback' of https://github.com/revans2/incubator-storm into STORM-1145

STORM-1145: Have IConnection push tuples instead of pull them


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cdb8a850
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cdb8a850
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cdb8a850

Branch: refs/heads/master
Commit: cdb8a850a7fb063208333697e79c73045bdc9d06
Parents: f4bd90a 1bdfd7e
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Nov 12 13:11:12 2015 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Nov 12 13:11:12 2015 -0600

----------------------------------------------------------------------
 .../src/clj/backtype/storm/daemon/executor.clj  |  60 +++----
 .../src/clj/backtype/storm/daemon/worker.clj    |  59 ++++---
 .../src/clj/backtype/storm/messaging/loader.clj |  76 ++-------
 .../src/clj/backtype/storm/messaging/local.clj  |  72 +-------
 storm-core/src/jvm/backtype/storm/Config.java   |   7 -
 .../storm/messaging/AddressedTuple.java         |  46 ++++++
 .../DeserializingConnectionCallback.java        |  60 +++++++
 .../backtype/storm/messaging/IConnection.java   |  10 +-
 .../storm/messaging/IConnectionCallback.java    |  31 ++++
 .../backtype/storm/messaging/local/Context.java | 164 +++++++++++++++++++
 .../backtype/storm/messaging/netty/Client.java  |   3 +-
 .../backtype/storm/messaging/netty/Server.java  | 121 ++------------
 .../backtype/storm/tuple/AddressedTuple.java    |  48 ++++++
 .../storm/messaging/netty_unit_test.clj         | 122 ++++++++------
 .../test/clj/backtype/storm/messaging_test.clj  |  25 ---
 15 files changed, 506 insertions(+), 398 deletions(-)
----------------------------------------------------------------------



[3/3] storm git commit: Added STORM-1145 to Changelog

Posted by bo...@apache.org.
Added STORM-1145 to Changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1354c404
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1354c404
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1354c404

Branch: refs/heads/master
Commit: 1354c4049f39bc08d45a5253b2dea485135b7987
Parents: cdb8a85
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Nov 12 13:11:42 2015 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Nov 12 13:11:42 2015 -0600

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1354c404/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 21c9a6d..1f214ea 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.11.0
+ * STORM-1145: Have IConnection push tuples instead of pull them
  * STORM-1191: bump timeout by 50% due to intermittent travis build failures
  * STORM-794: Modify Spout async loop to treat activate/deactivate ASAP
  * STORM-1196: Upgrade to thrift 0.9.3