You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/11/12 20:19:38 UTC
[1/3] storm git commit: STORM-1145: Have IConnection push tuples
instead of pull them. Move deserialization to the IConnectionCallback so only
Tuple batches are flowing through the system, not unserialized byte arrays.
Repository: storm
Updated Branches:
refs/heads/master f4bd90a34 -> 1354c4049
STORM-1145: Have IConnection push tuples instead of pull them.
Move deserialization to the IConnectionCallback so only Tuple batches are flowing through the system, not unserialized byte arrays.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1bdfd7e1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1bdfd7e1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1bdfd7e1
Branch: refs/heads/master
Commit: 1bdfd7e1f04194f7aa88ba2537f41a08409ce344
Parents: f4bd90a
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Wed Oct 28 16:18:08 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Nov 12 11:26:33 2015 -0600
----------------------------------------------------------------------
.../src/clj/backtype/storm/daemon/executor.clj | 60 +++----
.../src/clj/backtype/storm/daemon/worker.clj | 59 ++++---
.../src/clj/backtype/storm/messaging/loader.clj | 76 ++-------
.../src/clj/backtype/storm/messaging/local.clj | 72 +-------
storm-core/src/jvm/backtype/storm/Config.java | 7 -
.../storm/messaging/AddressedTuple.java | 46 ++++++
.../DeserializingConnectionCallback.java | 60 +++++++
.../backtype/storm/messaging/IConnection.java | 10 +-
.../storm/messaging/IConnectionCallback.java | 31 ++++
.../backtype/storm/messaging/local/Context.java | 164 +++++++++++++++++++
.../backtype/storm/messaging/netty/Client.java | 3 +-
.../backtype/storm/messaging/netty/Server.java | 121 ++------------
.../backtype/storm/tuple/AddressedTuple.java | 48 ++++++
.../storm/messaging/netty_unit_test.clj | 122 ++++++++------
.../test/clj/backtype/storm/messaging_test.clj | 25 ---
15 files changed, 506 insertions(+), 398 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/1bdfd7e1/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj
index 0390987..9687cdd 100644
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -21,7 +21,7 @@
(:import [java.util List Random HashMap ArrayList LinkedList Map])
(:import [backtype.storm ICredentialsListener])
(:import [backtype.storm.hooks ITaskHook])
- (:import [backtype.storm.tuple Tuple Fields TupleImpl MessageId])
+ (:import [backtype.storm.tuple AddressedTuple Tuple Fields TupleImpl MessageId])
(:import [backtype.storm.spout ISpoutWaitStrategy ISpout SpoutOutputCollector ISpoutOutputCollector])
(:import [backtype.storm.hooks.info SpoutAckInfo SpoutFailInfo
EmitInfo BoltFailInfo BoltAckInfo BoltExecuteInfo])
@@ -30,7 +30,7 @@
(:import [backtype.storm.generated GlobalStreamId])
(:import [backtype.storm.utils Utils MutableObject RotatingMap RotatingMap$ExpiredCallback MutableLong Time DisruptorQueue WorkerBackpressureThread])
(:import [com.lmax.disruptor InsufficientCapacityException])
- (:import [backtype.storm.serialization KryoTupleSerializer KryoTupleDeserializer])
+ (:import [backtype.storm.serialization KryoTupleSerializer])
(:import [backtype.storm.daemon Shutdownable])
(:import [backtype.storm.metric.api IMetric IMetricsConsumer$TaskInfo IMetricsConsumer$DataPoint StateMetric])
(:import [backtype.storm Config Constants])
@@ -206,9 +206,10 @@
(defn mk-executor-transfer-fn [batch-transfer->worker storm-conf]
(fn this
[task tuple]
- (when (= true (storm-conf TOPOLOGY-DEBUG))
- (log-message "TRANSFERING tuple TASK: " task " TUPLE: " tuple))
- (disruptor/publish batch-transfer->worker [task tuple])))
+ (let [val (AddressedTuple. task tuple)]
+ (when (= true (storm-conf TOPOLOGY-DEBUG))
+ (log-message "TRANSFERING tuple " val))
+ (disruptor/publish batch-transfer->worker val))))
(defn mk-executor-data [worker executor-id]
(let [worker-context (worker-context worker)
@@ -257,7 +258,6 @@
(exception-cause? java.io.InterruptedIOException error))
(log-message "Got interrupted excpetion shutting thread down...")
((:suicide-fn <>))))
- :deserializer (KryoTupleDeserializer. storm-conf worker-context)
:sampler (mk-stats-sampler storm-conf)
:backpressure (atom false)
:spout-throttling-metrics (if (= executor-type :spout)
@@ -296,8 +296,7 @@
(.add alist o)
(when batch-end?
(worker-transfer-fn serializer alist)
- (.setObject cached-emit (ArrayList.))
- )))
+ (.setObject cached-emit (ArrayList.)))))
:kill-fn (:report-error-and-die executor-data))))
(defn setup-metrics! [executor-data]
@@ -309,9 +308,8 @@
interval
interval
(fn []
- (disruptor/publish
- receive-queue
- [[nil (TupleImpl. worker-context [interval] Constants/SYSTEM_TASK_ID Constants/METRICS_TICK_STREAM_ID)]]))))))
+ (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. worker-context [interval] Constants/SYSTEM_TASK_ID Constants/METRICS_TICK_STREAM_ID))]]
+ (disruptor/publish receive-queue val)))))))
(defn metrics-tick
[executor-data task-data ^TupleImpl tuple]
@@ -333,7 +331,7 @@
(IMetricsConsumer$DataPoint. name value)))))
(filter identity)
(into []))]
- (if (seq data-points)
+ (when (seq data-points)
(task/send-unanchored task-data Constants/METRICS_STREAM_ID [task-info data-points]))))
(defn setup-ticks! [worker executor-data]
@@ -351,10 +349,8 @@
tick-time-secs
tick-time-secs
(fn []
- (disruptor/publish
- receive-queue
- [[nil (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID)]]
- )))))))
+ (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID))]]
+ (disruptor/publish receive-queue val))))))))
(defn mk-executor [worker executor-id initial-credentials]
(let [executor-data (mk-executor-data worker executor-id)
@@ -394,10 +390,9 @@
executor-id)
(credentials-changed [this creds]
(let [receive-queue (:receive-queue executor-data)
- context (:worker-context executor-data)]
- (disruptor/publish
- receive-queue
- [[nil (TupleImpl. context [creds] Constants/SYSTEM_TASK_ID Constants/CREDENTIALS_CHANGED_STREAM_ID)]])))
+ context (:worker-context executor-data)
+ val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [creds] Constants/SYSTEM_TASK_ID Constants/CREDENTIALS_CHANGED_STREAM_ID))]]
+ (disruptor/publish receive-queue val)))
(get-backpressure-flag [this]
@(:backpressure executor-data))
Shutdownable
@@ -444,16 +439,16 @@
(stats/spout-acked-tuple! (:stats executor-data) (:stream tuple-info) time-delta))))
(defn mk-task-receiver [executor-data tuple-action-fn]
- (let [^KryoTupleDeserializer deserializer (:deserializer executor-data)
- task-ids (:task-ids executor-data)
+ (let [task-ids (:task-ids executor-data)
debug? (= true (-> executor-data :storm-conf (get TOPOLOGY-DEBUG)))
]
(disruptor/clojure-handler
(fn [tuple-batch sequence-id end-of-batch?]
- (fast-list-iter [[task-id msg] tuple-batch]
- (let [^TupleImpl tuple (if (instance? Tuple msg) msg (.deserialize deserializer msg))]
+ (fast-list-iter [^AddressedTuple addressed-tuple tuple-batch]
+ (let [^TupleImpl tuple (.getTuple addressed-tuple)
+ task-id (.getDest addressed-tuple)]
(when debug? (log-message "Processing received message FOR " task-id " TUPLE: " tuple))
- (if task-id
+ (if (not= task-id AddressedTuple/BROADCAST_DEST)
(tuple-action-fn task-id tuple)
;; null task ids are broadcast tuples
(fast-list-iter [task-id task-ids]
@@ -479,7 +474,7 @@
spct (if (and (not-nil? options) (:enable options)) (:samplingpct options) 0)]
;; the thread's initialized random number generator is used to generate
;; uniformily distributed random numbers.
- (if (and (> spct 0) (< (* 100 (.nextDouble random)) spct))
+ (when (and (> spct 0) (< (* 100 (.nextDouble random)) spct))
(task/send-unanchored
task-data
EVENTLOGGER-STREAM-ID
@@ -561,9 +556,7 @@
task-id
out-stream-id
tuple-id)]
- (transfer-fn out-task
- out-tuple)
- ))
+ (transfer-fn out-task out-tuple)))
(if has-eventloggers?
(send-to-eventlogger executor-data task-data values component-id message-id rand))
(if (and rooted?
@@ -757,12 +750,12 @@
(fast-list-iter [root-id root-ids]
(put-xor! anchors-to-ids root-id edge-id))
))))
- (transfer-fn t
- (TupleImpl. worker-context
+ (let [tuple (TupleImpl. worker-context
values
task-id
stream
- (MessageId/makeId anchors-to-ids)))))
+ (MessageId/makeId anchors-to-ids))]
+ (transfer-fn t tuple))))
(if has-eventloggers?
(send-to-eventlogger executor-data task-data values component-id nil rand))
(or out-tasks [])))]]
@@ -796,8 +789,7 @@
(fast-map-iter [[root id] (.. tuple getMessageId getAnchorsToIds)]
(task/send-unanchored task-data
ACKER-ACK-STREAM-ID
- [root (bit-xor id ack-val)])
- ))
+ [root (bit-xor id ack-val)])))
(let [delta (tuple-time-delta! tuple)
debug? (= true (storm-conf TOPOLOGY-DEBUG))]
(when debug?
http://git-wip-us.apache.org/repos/asf/storm/blob/1bdfd7e1/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj
index 7c0b18d..df9f725 100644
--- a/storm-core/src/clj/backtype/storm/daemon/worker.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj
@@ -31,7 +31,7 @@
(:import [backtype.storm.daemon Shutdownable])
(:import [backtype.storm.serialization KryoTupleSerializer])
(:import [backtype.storm.generated StormTopology])
- (:import [backtype.storm.tuple Fields])
+ (:import [backtype.storm.tuple AddressedTuple Fields])
(:import [backtype.storm.task WorkerTopologyContext])
(:import [backtype.storm Constants])
(:import [backtype.storm.security.auth AuthUtils])
@@ -103,10 +103,15 @@
flatten
set )))
+(defn get-dest
+ [^AddressedTuple addressed-tuple]
+ "get the destination for an AddressedTuple"
+ (.getDest addressed-tuple))
+
(defn mk-transfer-local-fn [worker]
(let [short-executor-receive-queue-map (:short-executor-receive-queue-map worker)
task->short-executor (:task->short-executor worker)
- task-getter (comp #(get task->short-executor %) fast-first)]
+ task-getter (comp #(get task->short-executor %) get-dest)]
(fn [tuple-batch]
(let [grouped (fast-group-by task-getter tuple-batch)]
(fast-map-iter [[short-executor pairs] grouped]
@@ -162,19 +167,21 @@
(fn [^KryoTupleSerializer serializer tuple-batch]
(let [^ArrayList local (ArrayList.)
^HashMap remoteMap (HashMap.)]
- (fast-list-iter [[task tuple :as pair] tuple-batch]
- (if (local-tasks task)
- (.add local pair)
-
- ;;Using java objects directly to avoid performance issues in java code
- (do
- (when (not (.get remoteMap task))
- (.put remoteMap task (ArrayList.)))
- (let [^ArrayList remote (.get remoteMap task)]
- (if (not-nil? task)
- (.add remote (TaskMessage. ^int task ^bytes (.serialize serializer tuple)))
- (log-warn "Can't transfer tuple - task value is nil. tuple type: " (pr-str (type tuple)) " and information: " (pr-str tuple)))
- ))))
+ (fast-list-iter [^AddressedTuple addressed-tuple tuple-batch]
+ (let [task (.getDest addressed-tuple)
+ tuple (.getTuple addressed-tuple)]
+ (if (local-tasks task)
+ (.add local addressed-tuple)
+
+ ;;Using java objects directly to avoid performance issues in java code
+ (do
+ (when (not (.get remoteMap task))
+ (.put remoteMap task (ArrayList.)))
+ (let [^ArrayList remote (.get remoteMap task)]
+ (if (not-nil? task)
+ (.add remote (TaskMessage. task ^bytes (.serialize serializer tuple)))
+ (log-warn "Can't transfer tuple - task value is nil. tuple type: " (pr-str (type tuple)) " and information: " (pr-str tuple)))
+ )))))
(when (not (.isEmpty local)) (local-transfer local))
(when (not (.isEmpty remoteMap)) (disruptor/publish transfer-queue remoteMap))))]
@@ -295,7 +302,6 @@
:default-shared-resources (mk-default-resources <>)
:user-shared-resources (mk-user-resources <>)
:transfer-local-fn (mk-transfer-local-fn <>)
- :receiver-thread-count (get storm-conf WORKER-RECEIVER-THREAD-COUNT)
:transfer-fn (mk-transfer-fn <>)
:load-mapping (LoadMapping.)
:assignment-versions assignment-versions
@@ -450,16 +456,12 @@
(schedule timer recur-secs this :check-active false)
)))))
-(defn launch-receive-thread [worker]
- (log-message "Launching receive-thread for " (:assignment-id worker) ":" (:port worker))
- (msg-loader/launch-receive-thread!
- (:mq-context worker)
- (:receiver worker)
- (:storm-id worker)
- (:receiver-thread-count worker)
- (:port worker)
- (:transfer-local-fn worker)
- :kill-fn (fn [t] (exit-process! 11))))
+(defn register-callbacks [worker]
+ (log-message "Registering IConnectionCallbacks for " (:assignment-id worker) ":" (:port worker))
+ (msg-loader/register-callback (:transfer-local-fn worker)
+ (:receiver worker)
+ (:storm-conf worker)
+ (worker-context worker)))
(defn- close-resources [worker]
(let [dr (:default-shared-resources worker)]
@@ -596,7 +598,7 @@
_ (schedule-recurring (:heartbeat-timer worker) 0 (conf WORKER-HEARTBEAT-FREQUENCY-SECS) heartbeat-fn)
_ (schedule-recurring (:executor-heartbeat-timer worker) 0 (conf TASK-HEARTBEAT-FREQUENCY-SECS) #(do-executor-heartbeats worker :executors @executors))
- receive-thread-shutdown (launch-receive-thread worker)
+ _ (register-callbacks worker)
refresh-connections (mk-refresh-connections worker)
refresh-load (mk-refresh-load worker)
@@ -635,9 +637,6 @@
;; this will do best effort flushing since the linger period
;; was set on creation
(.close socket))
- (log-message "Shutting down receive thread")
- (receive-thread-shutdown)
- (log-message "Shut down receive thread")
(log-message "Terminating messaging context")
(log-message "Shutting down executors")
(doseq [executor @executors] (.shutdown executor))
http://git-wip-us.apache.org/repos/asf/storm/blob/1bdfd7e1/storm-core/src/clj/backtype/storm/messaging/loader.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/messaging/loader.clj b/storm-core/src/clj/backtype/storm/messaging/loader.clj
index c154ed8..72dd382 100644
--- a/storm-core/src/clj/backtype/storm/messaging/loader.clj
+++ b/storm-core/src/clj/backtype/storm/messaging/loader.clj
@@ -14,71 +14,21 @@
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns backtype.storm.messaging.loader
- (:use [backtype.storm util log])
- (:import [java.util ArrayList Iterator])
- (:import [backtype.storm.messaging IContext IConnection TaskMessage])
- (:import [backtype.storm.utils DisruptorQueue MutableObject])
- (:require [backtype.storm.messaging [local :as local]])
- (:require [backtype.storm [disruptor :as disruptor]]))
+ (:import [backtype.storm.messaging IConnection DeserializingConnectionCallback])
+ (:require [backtype.storm.messaging [local :as local]]))
(defn mk-local-context []
(local/mk-context))
-(defn- mk-receive-thread [storm-id port transfer-local-fn daemon kill-fn priority socket thread-id]
- (async-loop
- (fn []
- (log-message "Starting receive-thread: [stormId: " storm-id ", port: " port ", thread-id: " thread-id " ]")
- (fn []
- (let [batched (ArrayList.)
- ^Iterator iter (.recv ^IConnection socket 0 thread-id)
- closed (atom false)]
- (when iter
- (while (and (not @closed) (.hasNext iter))
- (let [packet (.next iter)
- task (if packet (.task ^TaskMessage packet))
- message (if packet (.message ^TaskMessage packet))]
- (if (= task -1)
- (do (log-message "Receiving-thread:[" storm-id ", " port "] received shutdown notice")
- (.close socket)
- (reset! closed true))
- (when packet (.add batched [task message]))))))
-
- (when (not @closed)
- (do
- (if (> (.size batched) 0)
- (transfer-local-fn batched))
- 0)))))
- :factory? true
- :daemon daemon
- :kill-fn kill-fn
- :priority priority
- :thread-name (str "worker-receiver-thread-" thread-id)))
+(defn- mk-connection-callback
+ "make an IConnectionCallback"
+ [transfer-local-fn storm-conf worker-context]
+ (DeserializingConnectionCallback. storm-conf
+ worker-context
+ (fn [batch]
+ (transfer-local-fn batch))))
-(defn- mk-receive-threads [storm-id port transfer-local-fn daemon kill-fn priority socket thread-count]
- (into [] (for [thread-id (range thread-count)]
- (mk-receive-thread storm-id port transfer-local-fn daemon kill-fn priority socket thread-id))))
-
-
-(defnk launch-receive-thread!
- [context socket storm-id receiver-thread-count port transfer-local-fn
- :daemon true
- :kill-fn (fn [t] (System/exit 1))
- :priority Thread/NORM_PRIORITY]
- (let [local-hostname (memoized-local-hostname)
- thread-count (if receiver-thread-count receiver-thread-count 1)
- vthreads (mk-receive-threads storm-id port transfer-local-fn daemon kill-fn priority socket thread-count)]
- (fn []
- (let [kill-socket (.connect ^IContext context storm-id local-hostname port)]
- (log-message "Shutting down receiving-thread: [" storm-id ", " port "]")
- (.send ^IConnection kill-socket
- -1 (byte-array []))
-
- (.close ^IConnection kill-socket)
-
- (log-message "Waiting for receiving-thread:[" storm-id ", " port "] to die")
-
- (for [thread-id (range thread-count)]
- (.join (vthreads thread-id)))
-
- (log-message "Shutdown receiving-thread: [" storm-id ", " port "]")
- ))))
+(defn register-callback
+ "register the local-transfer-fn with the server"
+ [transfer-local-fn ^IConnection socket storm-conf worker-context]
+ (.registerRecv socket (mk-connection-callback transfer-local-fn storm-conf worker-context)))
http://git-wip-us.apache.org/repos/asf/storm/blob/1bdfd7e1/storm-core/src/clj/backtype/storm/messaging/local.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/messaging/local.clj b/storm-core/src/clj/backtype/storm/messaging/local.clj
index 60a6bd2..b99a77a 100644
--- a/storm-core/src/clj/backtype/storm/messaging/local.clj
+++ b/storm-core/src/clj/backtype/storm/messaging/local.clj
@@ -14,76 +14,10 @@
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns backtype.storm.messaging.local
- (:refer-clojure :exclude [send])
- (:use [backtype.storm log util])
- (:import [backtype.storm.messaging IContext IConnection TaskMessage])
- (:import [backtype.storm.grouping Load])
- (:import [java.util.concurrent LinkedBlockingQueue])
- (:import [java.util Map Iterator Collection])
- (:import [java.util Iterator ArrayList])
- (:gen-class))
-
-(defn update-load! [cached-task->load lock task->load]
- (locking lock
- (swap! cached-task->load merge task->load)))
-
-(defn add-queue! [queues-map lock storm-id port]
- (let [id (str storm-id "-" port)]
- (locking lock
- (when-not (contains? @queues-map id)
- (swap! queues-map assoc id (LinkedBlockingQueue.))))
- (@queues-map id)))
-
-(deftype LocalConnection [storm-id port queues-map lock queue task->load]
- IConnection
- (^Iterator recv [this ^int flags ^int clientId]
- (when-not queue
- (throw (IllegalArgumentException. "Cannot receive on this socket")))
- (let [ret (ArrayList.)
- msg (if (= flags 1) (.poll queue) (.take queue))]
- (if msg
- (do
- (.add ret msg)
- (.iterator ret))
- nil)))
- (^void send [this ^int taskId ^bytes payload]
- (let [send-queue (add-queue! queues-map lock storm-id port)]
- (.put send-queue (TaskMessage. taskId payload))
- ))
- (^void send [this ^Iterator iter]
- (let [send-queue (add-queue! queues-map lock storm-id port)]
- (while (.hasNext iter)
- (.put send-queue (.next iter)))
- ))
- (^void sendLoadMetrics [this ^Map taskToLoad]
- (update-load! task->load lock taskToLoad))
- (^Map getLoad [this ^Collection tasks]
- (locking lock
- (into {}
- (for [task tasks
- :let [load (.get @task->load task)]
- :when (not-nil? load)]
- ;; for now we are ignoring the connection load locally
- [task (Load. true load 0.0)]))))
- (^void close [this]))
-
-
-(deftype LocalContext [^{:unsynchronized-mutable true} queues-map
- ^{:unsynchronized-mutable true} lock
- ^{:unsynchronized-mutable true} task->load]
- IContext
- (^void prepare [this ^Map storm-conf]
- (set! queues-map (atom {}))
- (set! task->load (atom {}))
- (set! lock (Object.)))
- (^IConnection bind [this ^String storm-id ^int port]
- (LocalConnection. storm-id port queues-map lock (add-queue! queues-map lock storm-id port) task->load))
- (^IConnection connect [this ^String storm-id ^String host ^int port]
- (LocalConnection. storm-id port queues-map lock nil task->load))
- (^void term [this]
- ))
+ (:import [backtype.storm.messaging IContext])
+ (:import [backtype.storm.messaging.local Context]))
(defn mk-context []
- (let [context (LocalContext. nil nil nil)]
+ (let [context (Context.)]
(.prepare ^IContext context nil)
context))
http://git-wip-us.apache.org/repos/asf/storm/blob/1bdfd7e1/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 4cde8ad..95e5ccc 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -1136,13 +1136,6 @@ public class Config extends HashMap<String, Object> {
public static final String WORKER_GC_CHILDOPTS = "worker.gc.childopts";
/**
- * control how many worker receiver threads we need per worker
- */
- @isInteger
- @isPositiveNumber
- public static final String WORKER_RECEIVER_THREAD_COUNT = "topology.worker.receiver.thread.count";
-
- /**
* How often this worker should heartbeat to the supervisor.
*/
@isInteger
http://git-wip-us.apache.org/repos/asf/storm/blob/1bdfd7e1/storm-core/src/jvm/backtype/storm/messaging/AddressedTuple.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/AddressedTuple.java b/storm-core/src/jvm/backtype/storm/messaging/AddressedTuple.java
new file mode 100644
index 0000000..de9a3e6
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/AddressedTuple.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging;
+
+import backtype.storm.tuple.Tuple;
+
+/**
+ * A Tuple that is addressed to a destination.
+ */
+public class AddressedTuple {
+ public final Tuple tuple;
+ public final int dest;
+
+ public AddressedTuple(int dest, Tuple tuple) {
+ this.dest = dest;
+ this.tuple = tuple;
+ }
+
+ public Tuple getTuple() {
+ return tuple;
+ }
+
+ public int getDest() {
+ return dest;
+ }
+
+ @Override
+ public String toString() {
+ return "[dest: "+dest+" tuple: "+tuple+"]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/1bdfd7e1/storm-core/src/jvm/backtype/storm/messaging/DeserializingConnectionCallback.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/DeserializingConnectionCallback.java b/storm-core/src/jvm/backtype/storm/messaging/DeserializingConnectionCallback.java
new file mode 100644
index 0000000..1e2d3aa
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/DeserializingConnectionCallback.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging;
+
+import backtype.storm.task.GeneralTopologyContext;
+import backtype.storm.tuple.AddressedTuple;
+import backtype.storm.serialization.KryoTupleDeserializer;
+
+import clojure.lang.IFn;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A class that is called when a TaskMessage arrives.
+ */
+public class DeserializingConnectionCallback implements IConnectionCallback {
+ private final IFn _cb;
+ private final Map _conf;
+ private final GeneralTopologyContext _context;
+ private final ThreadLocal<KryoTupleDeserializer> _des =
+ new ThreadLocal<KryoTupleDeserializer>() {
+ @Override
+ protected KryoTupleDeserializer initialValue() {
+ return new KryoTupleDeserializer(_conf, _context);
+ }
+ };
+
+ public DeserializingConnectionCallback(final Map conf, final GeneralTopologyContext context, IFn callback) {
+ _conf = conf;
+ _context = context;
+ _cb = callback;
+ }
+
+ @Override
+ public void recv(List<TaskMessage> batch) {
+ KryoTupleDeserializer des = _des.get();
+ ArrayList<AddressedTuple> ret = new ArrayList<>(batch.size());
+ for (TaskMessage message: batch) {
+ ret.add(new AddressedTuple(message.task(), des.deserialize(message.message())));
+ }
+ _cb.invoke(ret);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/1bdfd7e1/storm-core/src/jvm/backtype/storm/messaging/IConnection.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/IConnection.java b/storm-core/src/jvm/backtype/storm/messaging/IConnection.java
index 88f0c0a..a03b3a2 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/IConnection.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/IConnection.java
@@ -22,14 +22,12 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
-public interface IConnection {
-
+public interface IConnection {
/**
- * receive a batch message iterator (consists taskId and payload)
- * @param flags 0: block, 1: non-block
- * @return
+ * Register a callback to be notified when data is ready to be processed.
+ * @param cb the callback to process the messages.
*/
- public Iterator<TaskMessage> recv(int flags, int clientId);
+ public void registerRecv(IConnectionCallback cb);
/**
* Send load metrics to all downstream connections.
http://git-wip-us.apache.org/repos/asf/storm/blob/1bdfd7e1/storm-core/src/jvm/backtype/storm/messaging/IConnectionCallback.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/IConnectionCallback.java b/storm-core/src/jvm/backtype/storm/messaging/IConnectionCallback.java
new file mode 100644
index 0000000..ecf0828
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/IConnectionCallback.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging;
+
+import java.util.List;
+
+/**
+ * A class that is called when a TaskMessage arrives.
+ */
+public interface IConnectionCallback {
+ /**
+ * A batch of new messages have arrived to be processed
+ * @param batch the messages to be processed
+ */
+ public void recv(List<TaskMessage> batch);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/1bdfd7e1/storm-core/src/jvm/backtype/storm/messaging/local/Context.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/local/Context.java b/storm-core/src/jvm/backtype/storm/messaging/local/Context.java
new file mode 100644
index 0000000..968fe64
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/local/Context.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.local;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import backtype.storm.Config;
+import backtype.storm.grouping.Load;
+import backtype.storm.messaging.IConnection;
+import backtype.storm.messaging.TaskMessage;
+import backtype.storm.messaging.IConnectionCallback;
+import backtype.storm.messaging.IContext;
+
+public class Context implements IContext {
+ private static final Logger LOG = LoggerFactory.getLogger(Context.class);
+
+ private static class LocalServer implements IConnection {
+ IConnectionCallback _cb;
+ final ConcurrentHashMap<Integer, Double> _load = new ConcurrentHashMap<>();
+
+ @Override
+ public void registerRecv(IConnectionCallback cb) {
+ _cb = cb;
+ }
+
+ @Override
+ public void send(int taskId, byte[] payload) {
+ throw new IllegalArgumentException("SHOULD NOT HAPPEN");
+ }
+
+ @Override
+ public void send(Iterator<TaskMessage> msgs) {
+ throw new IllegalArgumentException("SHOULD NOT HAPPEN");
+ }
+
+ @Override
+ public Map<Integer, Load> getLoad(Collection<Integer> tasks) {
+ Map<Integer, Load> ret = new HashMap<>();
+ for (Integer task : tasks) {
+ Double found = _load.get(task);
+ if (found != null) {
+ ret.put(task, new Load(true, found, 0));
+ }
+ }
+ return ret;
+ }
+
+ @Override
+ public void sendLoadMetrics(Map<Integer, Double> taskToLoad) {
+ _load.putAll(taskToLoad);
+ }
+
+ @Override
+ public void close() {
+ //NOOP
+ }
+ };
+
+ private static class LocalClient implements IConnection {
+ private final LocalServer _server;
+
+ public LocalClient(LocalServer server) {
+ _server = server;
+ }
+
+ @Override
+ public void registerRecv(IConnectionCallback cb) {
+ throw new IllegalArgumentException("SHOULD NOT HAPPEN");
+ }
+
+ @Override
+ public void send(int taskId, byte[] payload) {
+ if (_server._cb != null) {
+ _server._cb.recv(Arrays.asList(new TaskMessage(taskId, payload)));
+ }
+ }
+
+ @Override
+ public void send(Iterator<TaskMessage> msgs) {
+ if (_server._cb != null) {
+ ArrayList<TaskMessage> ret = new ArrayList<>();
+ while (msgs.hasNext()) {
+ ret.add(msgs.next());
+ }
+ _server._cb.recv(ret);
+ }
+ }
+
+ @Override
+ public Map<Integer, Load> getLoad(Collection<Integer> tasks) {
+ return _server.getLoad(tasks);
+ }
+
+ @Override
+ public void sendLoadMetrics(Map<Integer, Double> taskToLoad) {
+ _server.sendLoadMetrics(taskToLoad);
+ }
+
+ @Override
+ public void close() {
+ //NOOP
+ }
+ };
+
+ private static ConcurrentHashMap<String, LocalServer> _registry = new ConcurrentHashMap<>();
+ private static LocalServer getLocalServer(String nodeId, int port) {
+ String key = nodeId + "-" + port;
+ LocalServer ret = _registry.get(key);
+ if (ret == null) {
+ ret = new LocalServer();
+ LocalServer tmp = _registry.putIfAbsent(key, ret);
+ if (tmp != null) {
+ ret = tmp;
+ }
+ }
+ return ret;
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void prepare(Map storm_conf) {
+ //NOOP
+ }
+
+ @Override
+ public IConnection bind(String storm_id, int port) {
+ return getLocalServer(storm_id, port);
+ }
+
+ @Override
+ public IConnection connect(String storm_id, String host, int port) {
+ return new LocalClient(getLocalServer(storm_id, port));
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void term() {
+ //NOOP
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/1bdfd7e1/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index ccdef41..4f813ba 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -32,6 +32,7 @@ import backtype.storm.Config;
import backtype.storm.grouping.Load;
import backtype.storm.messaging.ConnectionWithStatus;
import backtype.storm.messaging.TaskMessage;
+import backtype.storm.messaging.IConnectionCallback;
import backtype.storm.metric.api.IStatefulObject;
import backtype.storm.utils.StormBoundedExponentialBackoffRetry;
import backtype.storm.utils.Utils;
@@ -217,7 +218,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
* @throws java.lang.UnsupportedOperationException whenever this method is being called.
*/
@Override
- public Iterator<TaskMessage> recv(int flags, int clientId) {
+ public void registerRecv(IConnectionCallback cb) {
throw new UnsupportedOperationException("Client connection should not receive any messages");
}
http://git-wip-us.apache.org/repos/asf/storm/blob/1bdfd7e1/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
index 32c2bd7..bca3936 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
@@ -45,6 +45,8 @@ import org.slf4j.LoggerFactory;
import backtype.storm.Config;
import backtype.storm.grouping.Load;
import backtype.storm.messaging.ConnectionWithStatus;
+import backtype.storm.messaging.IConnection;
+import backtype.storm.messaging.IConnectionCallback;
import backtype.storm.messaging.TaskMessage;
import backtype.storm.metric.api.IStatefulObject;
import backtype.storm.serialization.KryoValuesSerializer;
@@ -58,41 +60,22 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
int port;
private final ConcurrentHashMap<String, AtomicInteger> messagesEnqueued = new ConcurrentHashMap<>();
private final AtomicInteger messagesDequeued = new AtomicInteger(0);
- private final AtomicInteger[] pendingMessages;
-
- // Create multiple queues for incoming messages. The size equals the number of receiver threads.
- // For message which is sent to same task, it will be stored in the same queue to preserve the message order.
- private LinkedBlockingQueue<ArrayList<TaskMessage>>[] message_queue;
volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server");
final ChannelFactory factory;
final ServerBootstrap bootstrap;
-
- private int queueCount;
- private volatile HashMap<Integer, Integer> taskToQueueId = null;
- int roundRobinQueueId;
-
+
private volatile boolean closing = false;
List<TaskMessage> closeMessage = Arrays.asList(new TaskMessage(-1, null));
private KryoValuesSerializer _ser;
+ private IConnectionCallback _cb = null;
@SuppressWarnings("rawtypes")
Server(Map storm_conf, int port) {
this.storm_conf = storm_conf;
this.port = port;
_ser = new KryoValuesSerializer(storm_conf);
-
- queueCount = Utils.getInt(storm_conf.get(Config.WORKER_RECEIVER_THREAD_COUNT), 1);
- roundRobinQueueId = 0;
- taskToQueueId = new HashMap<>();
-
- message_queue = new LinkedBlockingQueue[queueCount];
- pendingMessages = new AtomicInteger[queueCount];
- for (int i = 0; i < queueCount; i++) {
- message_queue[i] = new LinkedBlockingQueue<>();
- pendingMessages[i] = new AtomicInteger(0);
- }
-
+
// Configure the server.
int buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
int backlog = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_SOCKET_BACKLOG), 500);
@@ -124,48 +107,7 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
Channel channel = bootstrap.bind(new InetSocketAddress(port));
allChannels.add(channel);
}
-
- private ArrayList<TaskMessage>[] groupMessages(List<TaskMessage> msgs) {
- ArrayList<TaskMessage> messageGroups[] = new ArrayList[queueCount];
-
- for (TaskMessage message : msgs) {
- int task = message.task();
-
- if (task == -1) {
- closing = true;
- return null;
- }
-
- Integer queueId = getMessageQueueId(task);
-
- if (null == messageGroups[queueId]) {
- messageGroups[queueId] = new ArrayList<>();
- }
- messageGroups[queueId].add(message);
- }
- return messageGroups;
- }
-
- private Integer getMessageQueueId(int task) {
- // try to construct the map from taskId -> queueId in round robin manner.
- Integer queueId = taskToQueueId.get(task);
- if (null == queueId) {
- synchronized (this) {
- queueId = taskToQueueId.get(task);
- if (queueId == null) {
- queueId = roundRobinQueueId++;
- if (roundRobinQueueId == queueCount) {
- roundRobinQueueId = 0;
- }
- HashMap<Integer, Integer> newRef = new HashMap<>(taskToQueueId);
- newRef.put(task, queueId);
- taskToQueueId = newRef;
- }
- }
- }
- return queueId;
- }
-
+
private void addReceiveCount(String from, int amount) {
//This is possibly lossy in the case where a value is deleted
// because it has received no messages over the metrics collection
@@ -193,48 +135,14 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
return;
}
addReceiveCount(from, msgs.size());
- ArrayList<TaskMessage> messageGroups[] = groupMessages(msgs);
-
- if (null == messageGroups || closing) {
- return;
- }
-
- for (int receiverId = 0; receiverId < messageGroups.length; receiverId++) {
- ArrayList<TaskMessage> msgGroup = messageGroups[receiverId];
- if (null != msgGroup) {
- message_queue[receiverId].put(msgGroup);
- pendingMessages[receiverId].addAndGet(msgGroup.size());
- }
+ if (_cb != null) {
+ _cb.recv(msgs);
}
}
- public Iterator<TaskMessage> recv(int flags, int receiverId) {
- if (closing) {
- return closeMessage.iterator();
- }
-
- ArrayList<TaskMessage> ret;
- int queueId = receiverId % queueCount;
- if ((flags & 0x01) == 0x01) {
- //non-blocking
- ret = message_queue[queueId].poll();
- } else {
- try {
- ArrayList<TaskMessage> request = message_queue[queueId].take();
- LOG.debug("request to be processed: {}", request);
- ret = request;
- } catch (InterruptedException e) {
- LOG.info("exception within msg receiving", e);
- ret = null;
- }
- }
-
- if (null != ret) {
- messagesDequeued.addAndGet(ret.size());
- pendingMessages[queueId].addAndGet(0 - ret.size());
- return ret.iterator();
- }
- return null;
+ @Override
+ public void registerRecv(IConnectionCallback cb) {
+ _cb = cb;
}
/**
@@ -326,12 +234,7 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
LOG.debug("Getting metrics for server on port {}", port);
HashMap<String, Object> ret = new HashMap<>();
ret.put("dequeuedMessages", messagesDequeued.getAndSet(0));
- ArrayList<Integer> pending = new ArrayList<>(pendingMessages.length);
- for (AtomicInteger p: pendingMessages) {
- pending.add(p.get());
- }
- ret.put("pending", pending);
- HashMap<String, Integer> enqueued = new HashMap<>();
+ HashMap<String, Integer> enqueued = new HashMap<String, Integer>();
Iterator<Map.Entry<String, AtomicInteger>> it = messagesEnqueued.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, AtomicInteger> ent = it.next();
http://git-wip-us.apache.org/repos/asf/storm/blob/1bdfd7e1/storm-core/src/jvm/backtype/storm/tuple/AddressedTuple.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/AddressedTuple.java b/storm-core/src/jvm/backtype/storm/tuple/AddressedTuple.java
new file mode 100644
index 0000000..c3aec72
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/tuple/AddressedTuple.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.tuple;
+
+/**
+ * A Tuple that is addressed to a destination.
+ */
+public class AddressedTuple {
+ /**
+ * Destination used when broadcasting a tuple.
+ */
+ public static final int BROADCAST_DEST = -2;
+ public final Tuple tuple;
+ public final int dest;
+
+ public AddressedTuple(int dest, Tuple tuple) {
+ this.dest = dest;
+ this.tuple = tuple;
+ }
+
+ public Tuple getTuple() {
+ return tuple;
+ }
+
+ public int getDest() {
+ return dest;
+ }
+
+ @Override
+ public String toString() {
+ return "[dest: "+dest+" tuple: "+tuple+"]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/1bdfd7e1/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
index aa806a7..1d6f104 100644
--- a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
@@ -15,7 +15,7 @@
;; limitations under the License.
(ns backtype.storm.messaging.netty-unit-test
(:use [clojure test])
- (:import [backtype.storm.messaging TransportFactory])
+ (:import [backtype.storm.messaging TransportFactory IConnection TaskMessage IConnectionCallback])
(:import [backtype.storm.utils Utils])
(:use [backtype.storm testing util config log])
(:use [backtype.storm.daemon.worker :only [is-connection-ready]])
@@ -45,20 +45,37 @@
(throw (RuntimeException. (str "Netty connections were not ready within " max-wait-ms " ms"))))
(log-message "All Netty connections are ready"))))))
+(defn mk-connection-callback
+ "make an IConnectionCallback"
+ [my-fn]
+ (reify IConnectionCallback
+ (recv [this batch]
+ (doseq [msg batch]
+ (my-fn msg)))))
+
+(defn register-callback
+ "register the local-transfer-fn with the server"
+ [my-fn ^IConnection socket]
+ (.registerRecv socket (mk-connection-callback my-fn)))
+
+(defn- wait-for-not-nil
+ [atm]
+ (while-timeout TEST-TIMEOUT-MS (nil? @atm) (Thread/sleep 10)))
(defn- test-basic-fn [storm-conf]
(log-message "1. Should send and receive a basic message")
(let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
context (TransportFactory/makeContext storm-conf)
port (available-port 6700)
+ resp (atom nil)
server (.bind context nil port)
+ _ (register-callback (fn [message] (reset! resp message)) server)
client (.connect context nil "localhost" port)
_ (wait-until-ready [server client])
- _ (.send client task (.getBytes req_msg))
- iter (.recv server 0 0)
- resp (.next iter)]
- (is (= task (.task resp)))
- (is (= req_msg (String. (.message resp))))
+ _ (.send client task (.getBytes req_msg))]
+ (wait-for-not-nil resp)
+ (is (= task (.task @resp)))
+ (is (= req_msg (String. (.message @resp))))
(.close client)
(.close server)
(.term context)))
@@ -88,19 +105,20 @@
(let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
context (TransportFactory/makeContext storm-conf)
port (available-port 6700)
+ resp (atom nil)
server (.bind context nil port)
+ _ (register-callback (fn [message] (reset! resp message)) server)
client (.connect context nil "localhost" port)
_ (wait-until-ready [server client])
_ (.send client task (.getBytes req_msg))
- iter (.recv server 0 0)
- resp (.next iter)
_ (.sendLoadMetrics server {(int 1) 0.0 (int 2) 1.0})
_ (while-timeout 5000 (empty? (.getLoad client [(int 1) (int 2)])) (Thread/sleep 10))
load (.getLoad client [(int 1) (int 2)])]
(is (= 0.0 (.getBoltLoad (.get load (int 1)))))
(is (= 1.0 (.getBoltLoad (.get load (int 2)))))
- (is (= task (.task resp)))
- (is (= req_msg (String. (.message resp))))
+ (wait-for-not-nil resp)
+ (is (= task (.task @resp)))
+ (is (= req_msg (String. (.message @resp))))
(.close client)
(.close server)
(.term context)))
@@ -130,14 +148,15 @@
(let [req_msg (apply str (repeat 2048000 'c'))
context (TransportFactory/makeContext storm-conf)
port (available-port 6700)
+ resp (atom nil)
server (.bind context nil port)
+ _ (register-callback (fn [message] (reset! resp message)) server)
client (.connect context nil "localhost" port)
_ (wait-until-ready [server client])
- _ (.send client task (.getBytes req_msg))
- iter (.recv server 0 0)
- resp (.next iter)]
- (is (= task (.task resp)))
- (is (= req_msg (String. (.message resp))))
+ _ (.send client task (.getBytes req_msg))]
+ (wait-for-not-nil resp)
+ (is (= task (.task @resp)))
+ (is (= req_msg (String. (.message @resp))))
(.close client)
(.close server)
(.term context)))
@@ -164,27 +183,28 @@
(defn- test-server-delayed-fn [storm-conf]
(log-message "4. test server delayed")
- (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
- context (TransportFactory/makeContext storm-conf)
- port (available-port 6700)
- client (.connect context nil "localhost" port)
+ (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
+ context (TransportFactory/makeContext storm-conf)
+ resp (atom nil)
+ port (available-port 6700)
+ client (.connect context nil "localhost" port)
- server (Thread.
- (fn []
- (Thread/sleep 1000)
- (let [server (.bind context nil port)
- iter (.recv server 0 0)
- resp (.next iter)]
- (is (= task (.task resp)))
- (is (= req_msg (String. (.message resp))))
- (.close server))))]
- (.start server)
- (wait-until-ready [server client])
- (.send client task (.getBytes req_msg))
+ server (Thread.
+ (fn []
+ (Thread/sleep 100)
+ (let [server (.bind context nil port)]
+ (register-callback (fn [message] (reset! resp message)) server))))]
+ (.start server)
+ (wait-until-ready [server client])
+ (.send client task (.getBytes req_msg))
- (.join server)
- (.close client)
- (.term context)))
+ (wait-for-not-nil resp)
+ (is (= task (.task @resp)))
+ (is (= req_msg (String. (.message @resp))))
+
+ (.join server)
+ (.close client)
+ (.term context)))
(deftest test-server-delayed
(let [storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
@@ -211,28 +231,24 @@
(log-message "5. test batch")
(let [num-messages 100000
_ (log-message "Should send and receive many messages (testing with " num-messages " messages)")
+ resp (ArrayList.)
+ received (atom 0)
context (TransportFactory/makeContext storm-conf)
port (available-port 6700)
server (.bind context nil port)
+ _ (register-callback (fn [message] (.add resp message) (swap! received inc)) server)
client (.connect context nil "localhost" port)
_ (wait-until-ready [server client])]
- (doseq [num (range 1 num-messages)]
+ (doseq [num (range 1 num-messages)]
(let [req_msg (str num)]
(.send client task (.getBytes req_msg))))
- (let [resp (ArrayList.)
- received (atom 0)]
- (while (< @received (- num-messages 1))
- (let [iter (.recv server 0 0)]
- (while (.hasNext iter)
- (let [msg (.next iter)]
- (.add resp msg)
- (swap! received inc)
- ))))
- (doseq [num (range 1 num-messages)]
+ (while-timeout TEST-TIMEOUT-MS (< (.size resp) (- num-messages 1)) (log-message (.size resp) " " num-messages) (Thread/sleep 10))
+
+ (doseq [num (range 1 num-messages)]
(let [req_msg (str num)
resp_msg (String. (.message (.get resp (- num 1))))]
- (is (= req_msg resp_msg)))))
+ (is (= req_msg resp_msg))))
(.close client)
(.close server)
@@ -274,20 +290,18 @@
TOPOLOGY-TUPLE-SERIALIZER "backtype.storm.serialization.types.ListDelegateSerializer"
TOPOLOGY-FALL-BACK-ON-JAVA-SERIALIZATION false
TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS false}
+ resp (atom nil)
context (TransportFactory/makeContext storm-conf)
port (available-port 6700)
client (.connect context nil "localhost" port)
_ (.send client task (.getBytes req_msg))
server (.bind context nil port)
+ _ (register-callback (fn [message] (reset! resp message)) server)
_ (wait-until-ready [server client])
- _ (.send client task (.getBytes req_msg))
- iter (future (.recv server 0 0))
- resp (deref iter 5000 nil)
- resp-val (if resp (.next resp) nil)]
- (is resp-val)
- (when resp-val
- (is (= task (.task resp-val)))
- (is (= req_msg (String. (.message resp-val)))))
+ _ (.send client task (.getBytes req_msg))]
+ (wait-for-not-nil resp)
+ (is (= task (.task @resp)))
+ (is (= req_msg (String. (.message @resp))))
(.close client)
(.close server)
(.term context)))
http://git-wip-us.apache.org/repos/asf/storm/blob/1bdfd7e1/storm-core/test/clj/backtype/storm/messaging_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/messaging_test.clj b/storm-core/test/clj/backtype/storm/messaging_test.clj
index 89d8586..86162dd 100644
--- a/storm-core/test/clj/backtype/storm/messaging_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging_test.clj
@@ -61,28 +61,3 @@
(.cleanup this))
(startup [this]
))
-
-;; Test Adding more receiver threads won't violate the message delivery order gurantee
-(deftest test-receiver-message-order
- (with-simulated-time-local-cluster [cluster :supervisors 1 :ports-per-supervisor 2
- :daemon-conf {TOPOLOGY-WORKERS 2
- ;; Configure multiple receiver threads per worker
- WORKER-RECEIVER-THREAD-COUNT 2
- STORM-LOCAL-MODE-ZMQ true
- STORM-MESSAGING-TRANSPORT
- "backtype.storm.messaging.netty.Context"}]
- (let [topology (thrift/mk-topology
-
- ;; TestEventLogSpout output(sourceId, eventId), eventId is Monotonically increasing
- {"1" (thrift/mk-spout-spec (TestEventLogSpout. 4000) :parallelism-hint 8)}
-
- ;; field grouping, message from same "source" task will be delivered to same bolt task
- ;; When received message order is not kept, Emit an error Tuple
- {"2" (thrift/mk-bolt-spec {"1" ["source"]} (TestEventOrderCheckBolt.)
- :parallelism-hint 4)
- })
- results (complete-topology cluster
- topology)]
-
- ;; No error Tuple from Bolt TestEventOrderCheckBolt
- (is (empty? (read-tuples results "2"))))))
[2/3] storm git commit: Merge branch 'server-callback' of
https://github.com/revans2/incubator-storm into STORM-1145
Posted by bo...@apache.org.
Merge branch 'server-callback' of https://github.com/revans2/incubator-storm into STORM-1145
STORM-1145: Have IConnection push tuples instead of pull them
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cdb8a850
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cdb8a850
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cdb8a850
Branch: refs/heads/master
Commit: cdb8a850a7fb063208333697e79c73045bdc9d06
Parents: f4bd90a 1bdfd7e
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Nov 12 13:11:12 2015 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Nov 12 13:11:12 2015 -0600
----------------------------------------------------------------------
.../src/clj/backtype/storm/daemon/executor.clj | 60 +++----
.../src/clj/backtype/storm/daemon/worker.clj | 59 ++++---
.../src/clj/backtype/storm/messaging/loader.clj | 76 ++-------
.../src/clj/backtype/storm/messaging/local.clj | 72 +-------
storm-core/src/jvm/backtype/storm/Config.java | 7 -
.../storm/messaging/AddressedTuple.java | 46 ++++++
.../DeserializingConnectionCallback.java | 60 +++++++
.../backtype/storm/messaging/IConnection.java | 10 +-
.../storm/messaging/IConnectionCallback.java | 31 ++++
.../backtype/storm/messaging/local/Context.java | 164 +++++++++++++++++++
.../backtype/storm/messaging/netty/Client.java | 3 +-
.../backtype/storm/messaging/netty/Server.java | 121 ++------------
.../backtype/storm/tuple/AddressedTuple.java | 48 ++++++
.../storm/messaging/netty_unit_test.clj | 122 ++++++++------
.../test/clj/backtype/storm/messaging_test.clj | 25 ---
15 files changed, 506 insertions(+), 398 deletions(-)
----------------------------------------------------------------------
[3/3] storm git commit: Added STORM-1145 to Changelog
Posted by bo...@apache.org.
Added STORM-1145 to Changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1354c404
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1354c404
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1354c404
Branch: refs/heads/master
Commit: 1354c4049f39bc08d45a5253b2dea485135b7987
Parents: cdb8a85
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Nov 12 13:11:42 2015 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Nov 12 13:11:42 2015 -0600
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/1354c404/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 21c9a6d..1f214ea 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 0.11.0
+ * STORM-1145: Have IConnection push tuples instead of pull them
* STORM-1191: bump timeout by 50% due to intermittent travis build failures
* STORM-794: Modify Spout async loop to treat activate/deactivate ASAP
* STORM-1196: Upgrade to thrift 0.9.3