You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/06/09 15:48:06 UTC

[01/32] git commit: 1. Async netty message transfer 2. Batch send and batch receive api and implementation 3. allow to configure the number of receiver thread 4. name the threads

Repository: incubator-storm
Updated Branches:
  refs/heads/master dd1d21360 -> 1a57fcf6b


1. Async netty message transfer 2. Batch send and batch receive api and implementation 3. allow to configure the number of receiver thread 4. name the threads


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/861a92ea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/861a92ea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/861a92ea

Branch: refs/heads/master
Commit: 861a92eab8740cfc0f83ac4d7ade9a2ab04a8b9b
Parents: 22215b5
Author: Sean Zhong <cl...@gmail.com>
Authored: Wed May 7 11:10:07 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Wed May 7 11:10:07 2014 +0800

----------------------------------------------------------------------
 .../src/clj/backtype/storm/daemon/executor.clj  |   1 +
 .../src/clj/backtype/storm/daemon/worker.clj    |  50 +--
 storm-core/src/clj/backtype/storm/disruptor.clj |   9 +-
 .../src/clj/backtype/storm/messaging/loader.clj |  81 ++--
 .../src/clj/backtype/storm/messaging/local.clj  |  30 +-
 storm-core/src/jvm/backtype/storm/Config.java   |  25 ++
 .../backtype/storm/messaging/IConnection.java   |  21 +-
 .../backtype/storm/messaging/netty/Client.java  | 379 +++++++++++--------
 .../backtype/storm/messaging/netty/Context.java |  11 +-
 .../storm/messaging/netty/ControlMessage.java   |   6 +-
 .../storm/messaging/netty/MessageBatch.java     |   5 +
 .../storm/messaging/netty/MessageDecoder.java   | 107 ++++--
 .../backtype/storm/messaging/netty/Server.java  | 142 ++++++-
 .../messaging/netty/StormClientHandler.java     |  52 +--
 .../messaging/netty/StormServerHandler.java     |  40 +-
 .../backtype/storm/utils/DisruptorQueue.java    |   9 +-
 .../src/jvm/backtype/storm/utils/Utils.java     |  63 ++-
 .../storm/messaging/netty_unit_test.clj         |  46 ++-
 18 files changed, 692 insertions(+), 385 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/861a92ea/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj
index f133a1b..390bba8 100644
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -204,6 +204,7 @@
         storm-conf (normalized-component-conf (:storm-conf worker) worker-context component-id)
         executor-type (executor-type worker-context component-id)
         batch-transfer->worker (disruptor/disruptor-queue
+                                  (str "executor"  executor-id "-send-queue")
                                   (storm-conf TOPOLOGY-EXECUTOR-SEND-BUFFER-SIZE)
                                   :claim-strategy :single-threaded
                                   :wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/861a92ea/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj
index 0d1f6c6..2648237 100644
--- a/storm-core/src/clj/backtype/storm/daemon/worker.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj
@@ -18,8 +18,10 @@
   (:use [backtype.storm bootstrap])
   (:require [backtype.storm.daemon [executor :as executor]])
   (:import [java.util.concurrent Executors])
+  (:import [java.util ArrayList HashMap])
+  (:import [backtype.storm.utils TransferDrainer])
   (:import [backtype.storm.messaging TransportFactory])
-  (:import [backtype.storm.messaging IContext IConnection])
+  (:import [backtype.storm.messaging TaskMessage IContext IConnection])
   (:gen-class))
 
 (bootstrap)
@@ -109,25 +111,30 @@
 (defn mk-transfer-fn [worker]
   (let [local-tasks (-> worker :task-ids set)
         local-transfer (:transfer-local-fn worker)
-        ^DisruptorQueue transfer-queue (:transfer-queue worker)]
+        ^DisruptorQueue transfer-queue (:transfer-queue worker)
+        task->node+port (:cached-task->node+port worker)]
     (fn [^KryoTupleSerializer serializer tuple-batch]
       (let [local (ArrayList.)
-            remote (ArrayList.)]
+            remoteMap (HashMap.)]
         (fast-list-iter [[task tuple :as pair] tuple-batch]
           (if (local-tasks task)
             (.add local pair)
-            (.add remote pair)
-            ))
+            (let [node+port (get @task->node+port task)]
+              (when (not (.get remoteMap node+port))
+                (.put remoteMap node+port (ArrayList.)))
+              (let [remote (.get remoteMap node+port)]
+                (.add remote (TaskMessage. task (.serialize serializer tuple)))
+                 ))))
+        
         (local-transfer local)
-        ;; not using map because the lazy seq shows up in perf profiles
-        (let [serialized-pairs (fast-list-for [[task ^TupleImpl tuple] remote] [task (.serialize serializer tuple)])]
-          (disruptor/publish transfer-queue serialized-pairs)
-          )))))
+        (disruptor/publish transfer-queue remoteMap)
+          ))))
 
 (defn- mk-receive-queue-map [storm-conf executors]
   (->> executors
        ;; TODO: this depends on the type of executor
-       (map (fn [e] [e (disruptor/disruptor-queue (storm-conf TOPOLOGY-EXECUTOR-RECEIVE-BUFFER-SIZE)
+       (map (fn [e] [e (disruptor/disruptor-queue (str "receive-queue" e)
+                                                  (storm-conf TOPOLOGY-EXECUTOR-RECEIVE-BUFFER-SIZE)
                                                   :wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY))]))
        (into {})
        ))
@@ -169,7 +176,7 @@
         storm-cluster-state (cluster/mk-storm-cluster-state cluster-state)
         storm-conf (read-supervisor-storm-conf conf storm-id)
         executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port))
-        transfer-queue (disruptor/disruptor-queue (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE)
+        transfer-queue (disruptor/disruptor-queue "worker-transfer-queue" (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE)
                                                   :wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY))
         executor-receive-queue-map (mk-receive-queue-map storm-conf executors)
         
@@ -218,6 +225,7 @@
       :default-shared-resources (mk-default-resources <>)
       :user-shared-resources (mk-user-resources <>)
       :transfer-local-fn (mk-transfer-local-fn <>)
+      :receiver-thread-count (get storm-conf WORKER-RECEIVER-THREAD-COUNT)
       :transfer-fn (mk-transfer-fn <>)
       )))
 
@@ -296,28 +304,19 @@
 ;; TODO: consider having a max batch size besides what disruptor does automagically to prevent latency issues
 (defn mk-transfer-tuples-handler [worker]
   (let [^DisruptorQueue transfer-queue (:transfer-queue worker)
-        drainer (ArrayList.)
+        drainer (TransferDrainer.)
         node+port->socket (:cached-node+port->socket worker)
         task->node+port (:cached-task->node+port worker)
         endpoint-socket-lock (:endpoint-socket-lock worker)
         ]
     (disruptor/clojure-handler
       (fn [packets _ batch-end?]
-        (.addAll drainer packets)
+        (.add drainer packets)
+        
         (when batch-end?
           (read-locked endpoint-socket-lock
-            (let [node+port->socket @node+port->socket
-                  task->node+port @task->node+port]
-              ;; consider doing some automatic batching here (would need to not be serialized at this point to remove per-tuple overhead)
-              ;; try using multipart messages ... first sort the tuples by the target node (without changing the local ordering)
-            
-              (fast-list-iter [[task ser-tuple] drainer]
-                ;; TODO: consider write a batch of tuples here to every target worker  
-                ;; group by node+port, do multipart send              
-                (let [node-port (get task->node+port task)]
-                  (when node-port
-                    (.send ^IConnection (get node+port->socket node-port) task ser-tuple))
-                    ))))
+            (let [node+port->socket @node+port->socket]
+              (.send drainer node+port->socket)))
           (.clear drainer))))))
 
 (defn launch-receive-thread [worker]
@@ -325,6 +324,7 @@
   (msg-loader/launch-receive-thread!
     (:mq-context worker)
     (:storm-id worker)
+    (:receiver-thread-count worker)
     (:port worker)
     (:transfer-local-fn worker)
     (-> worker :storm-conf (get TOPOLOGY-RECEIVER-BUFFER-SIZE))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/861a92ea/storm-core/src/clj/backtype/storm/disruptor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/disruptor.clj b/storm-core/src/clj/backtype/storm/disruptor.clj
index 9456d1a..28393eb 100644
--- a/storm-core/src/clj/backtype/storm/disruptor.clj
+++ b/storm-core/src/clj/backtype/storm/disruptor.clj
@@ -47,8 +47,9 @@
 ;; This would manifest itself in Trident when doing 1 batch at a time processing, and the ack_init message
 ;; wouldn't make it to the acker until the batch timed out and another tuple was played into the queue, 
 ;; unblocking the consumer
-(defnk disruptor-queue [buffer-size :claim-strategy :multi-threaded :wait-strategy :block]
-  (DisruptorQueue. ((CLAIM-STRATEGY claim-strategy) buffer-size)
+(defnk disruptor-queue [^String queue-name buffer-size :claim-strategy :multi-threaded :wait-strategy :block]
+  (DisruptorQueue. queue-name
+                   ((CLAIM-STRATEGY claim-strategy) buffer-size)
                    (mk-wait-strategy wait-strategy)
                    ))
 
@@ -89,7 +90,7 @@
                 (consume-batch-when-available queue handler)
                 0 )
               :kill-fn kill-fn
-              :thread-name thread-name
+              :thread-name (.getName queue)
               )]
      (consumer-started! queue)
      ret
@@ -97,5 +98,5 @@
 
 (defmacro consume-loop [queue & handler-args]
   `(let [handler# (handler ~@handler-args)]
-     (consume-loop* ~queue handler#)
+     (consume-loop* ~queue handler# :thread-name (.getName queue))
      ))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/861a92ea/storm-core/src/clj/backtype/storm/messaging/loader.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/messaging/loader.clj b/storm-core/src/clj/backtype/storm/messaging/loader.clj
index 9e43c26..e13b5a8 100644
--- a/storm-core/src/clj/backtype/storm/messaging/loader.clj
+++ b/storm-core/src/clj/backtype/storm/messaging/loader.clj
@@ -15,7 +15,7 @@
 ;; limitations under the License.
 (ns backtype.storm.messaging.loader
   (:use [backtype.storm util log])
-  (:import [java.util ArrayList])
+  (:import [java.util ArrayList Iterator])
   (:import [backtype.storm.messaging IContext IConnection TaskMessage])
   (:import [backtype.storm.utils DisruptorQueue MutableObject])
   (:require [backtype.storm.messaging [local :as local]])
@@ -24,45 +24,62 @@
 (defn mk-local-context []
   (local/mk-context))
 
+(defn- mk-receive-thread [context storm-id port transfer-local-fn  daemon kill-fn priority socket max-buffer-size thread-id]
+    (async-loop
+       (fn []
+         (log-message "Starting receive-thread: [stormId: " storm-id ", port: " port ", thread-id: " thread-id  " ]")
+         (fn []
+           (let [batched (ArrayList.)
+                 ^Iterator iter (.recv ^IConnection socket 0 thread-id)
+                 closed (atom false)]
+             (when iter
+               (while (and (not @closed) (.hasNext iter)) 
+                  (let [packet (.next iter)
+                        task (if packet (.task ^TaskMessage packet))
+                        message (if packet (.message ^TaskMessage packet))]
+                      (if (= task -1)
+                         (do (log-message "Receiving-thread:[" storm-id ", " port "] received shutdown notice")
+                           (.close socket)
+                           (reset! closed  true))
+                         (when packet (.add batched [task message]))))))
+             
+             (when (not @closed)
+               (do
+                 (if (> (.size batched) 0)
+                   (transfer-local-fn batched))
+                 0)))))
+         :factory? true
+         :daemon daemon
+         :kill-fn kill-fn
+         :priority priority
+         :thread-name (str "worker-receiver-thread-" thread-id)))
+
+(defn- mk-receive-threads [context storm-id port transfer-local-fn  daemon kill-fn priority socket max-buffer-size thread-count]
+  (into [] (for [thread-id (range thread-count)] 
+             (mk-receive-thread context storm-id port transfer-local-fn  daemon kill-fn priority socket max-buffer-size thread-id))))
+
+
 (defnk launch-receive-thread!
-  [context storm-id port transfer-local-fn max-buffer-size
+  [context storm-id receiver-thread-count port transfer-local-fn max-buffer-size
    :daemon true
    :kill-fn (fn [t] (System/exit 1))
    :priority Thread/NORM_PRIORITY]
   (let [max-buffer-size (int max-buffer-size)
-        vthread (async-loop
-                 (fn []
-                   (let [socket (.bind ^IContext context storm-id port)]
-                     (fn []
-                       (let [batched (ArrayList.)
-                             init (.recv ^IConnection socket 0)]
-                         (loop [packet init]
-                           (let [task (if packet (.task ^TaskMessage packet))
-                                 message (if packet (.message ^TaskMessage packet))]
-                             (if (= task -1)
-                               (do (log-message "Receiving-thread:[" storm-id ", " port "] received shutdown notice")
-                                 (.close socket)
-                                 nil )
-                               (do
-                                 (when packet (.add batched [task message]))
-                                 (if (and packet (< (.size batched) max-buffer-size))
-                                   (recur (.recv ^IConnection socket 1))
-                                   (do (transfer-local-fn batched)
-                                     0 ))))))))))
-                 :factory? true
-                 :daemon daemon
-                 :kill-fn kill-fn
-                 :priority priority)]
+        socket (.bind ^IContext context storm-id port)
+        thread-count (if receiver-thread-count receiver-thread-count 1)
+        vthreads (mk-receive-threads context storm-id port transfer-local-fn daemon kill-fn priority socket max-buffer-size thread-count)]
     (fn []
       (let [kill-socket (.connect ^IContext context storm-id "localhost" port)]
         (log-message "Shutting down receiving-thread: [" storm-id ", " port "]")
         (.send ^IConnection kill-socket
-                  -1
-                  (byte-array []))
-        (log-message "Waiting for receiving-thread:[" storm-id ", " port "] to die")
-        (.join vthread)
+                  -1 (byte-array []))
+        
         (.close ^IConnection kill-socket)
+        
+        (log-message "Waiting for receiving-thread:[" storm-id ", " port "] to die")
+        
+        (for [thread-id (range thread-count)] 
+             (.join (vthreads thread-id)))
+        
         (log-message "Shutdown receiving-thread: [" storm-id ", " port "]")
-        ))))
-
-
+        ))))
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/861a92ea/storm-core/src/clj/backtype/storm/messaging/local.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/messaging/local.clj b/storm-core/src/clj/backtype/storm/messaging/local.clj
index bf4d5b2..de14806 100644
--- a/storm-core/src/clj/backtype/storm/messaging/local.clj
+++ b/storm-core/src/clj/backtype/storm/messaging/local.clj
@@ -18,7 +18,8 @@
   (:use [backtype.storm log])
   (:import [backtype.storm.messaging IContext IConnection TaskMessage])
   (:import [java.util.concurrent LinkedBlockingQueue])
-  (:import [java.util Map])
+  (:import [java.util Map Iterator])
+  (:import [java.util Iterator ArrayList])
   (:gen-class))
 
 (defn add-queue! [queues-map lock storm-id port]
@@ -30,16 +31,35 @@
 
 (deftype LocalConnection [storm-id port queues-map lock queue]
   IConnection
-  (^TaskMessage recv [this ^int flags]
+  (^Iterator recv [this ^int flags]
     (when-not queue
       (throw (IllegalArgumentException. "Cannot receive on this socket")))
-    (if (= flags 1)
-      (.poll queue)
-      (.take queue)))
+    (let [ret (ArrayList.)
+          msg (if (= flags 1) (.poll queue) (.take queue))]
+      (if msg
+        (do 
+          (.add ret msg)
+          (.iterator ret))
+        nil)))
+  (^Iterator recv [this ^int flags ^int clientId]
+    (when-not queue
+      (throw (IllegalArgumentException. "Cannot receive on this socket")))
+    (let [ret (ArrayList.)
+          msg (if (= flags 1) (.poll queue) (.take queue))]
+      (if msg
+        (do 
+          (.add ret msg)
+          (.iterator ret))
+        nil)))
   (^void send [this ^int taskId ^bytes payload]
     (let [send-queue (add-queue! queues-map lock storm-id port)]
       (.put send-queue (TaskMessage. taskId payload))
       ))
+  (^void send [this ^Iterator iter]
+    (let [send-queue (add-queue! queues-map lock storm-id port)]
+      (while (.hasNext iter) 
+         (.put send-queue (.next iter)))
+      ))
   (^void close [this]
     ))
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/861a92ea/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 1fc0d78..98718a3 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -84,8 +84,27 @@ public class Config extends HashMap<String, Object> {
      */
     public static final String STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS = "storm.messaging.netty.client_worker_threads"; 
     public static final Object STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS_SCHEMA = Number.class;
+    
+    /**
+     * If the Netty messaging layer is busy, the Netty client will try to batch message as more as possible up to the size of STORM_NETTY_MESSAGE_BATCH_SIZE bytes
+     */
+    public static final String STORM_NETTY_MESSAGE_BATCH_SIZE = "netty.transfer.batch.size";
+    public static final Object STORM_NETTY_MESSAGE_BATCH_SIZE_SCHEMA = Number.class;
 
     /**
+     * This control whether we do Netty message transfer in a synchronized way or async way. 
+     */
+    public static final String STORM_NETTY_BLOCKING = "netty.blocking";
+    public static final Object STORM_NETTY_BLOCKING_SCHEMA = Boolean.class;
+    
+    /**
+     * We check with this interval that whether the Netty channel is writable and try to write pending messages
+     */
+    public static final String STORM_NETTY_FLUSH_CHECK_INTERVAL_MS = "netty.flush.check.interval.ms";
+    public static final Object STORM_NETTY_FLUSH_CHECK_INTERVAL_MS_SCHEMA = Number.class;
+    
+    
+    /**
      * A list of hosts of ZooKeeper servers used to manage the cluster.
      */
     public static final String STORM_ZOOKEEPER_SERVERS = "storm.zookeeper.servers";
@@ -462,6 +481,12 @@ public class Config extends HashMap<String, Object> {
     public static final Object WORKER_CHILDOPTS_SCHEMA = ConfigValidation.StringOrStringListValidator;
 
     /**
+     * control how many worker receiver threads we need per worker
+     */
+    public static final String WORKER_RECEIVER_THREAD_COUNT = "worker.receiver.thread.count";
+    public static final Object WORKER_RECEIVER_THREAD_COUNT_SCHEMA = Number.class;
+    
+    /**
      * How often this worker should heartbeat to the supervisor.
      */
     public static final String WORKER_HEARTBEAT_FREQUENCY_SECS = "worker.heartbeat.frequency.secs";

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/861a92ea/storm-core/src/jvm/backtype/storm/messaging/IConnection.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/IConnection.java b/storm-core/src/jvm/backtype/storm/messaging/IConnection.java
index 41ae3f5..fe9caa7 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/IConnection.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/IConnection.java
@@ -17,13 +17,23 @@
  */
 package backtype.storm.messaging;
 
+import java.util.Iterator;
+
 public interface IConnection {   
     /**
-     * receive a message (consists taskId and payload)
+     * receive a batch message iterator (consists taskId and payload)
      * @param flags 0: block, 1: non-block
      * @return
      */
-    public TaskMessage recv(int flags);
+    public Iterator<TaskMessage> recv(int flags);
+    
+    /**
+     * receive a batch message iterator (consists taskId and payload)
+     * @param flags 0: block, 1: non-block
+     * @return
+     */
+    public Iterator<TaskMessage> recv(int flags, int clientId);
+    
     /**
      * send a message with taskId and payload
      * @param taskId task ID
@@ -32,6 +42,13 @@ public interface IConnection {
     public void send(int taskId,  byte[] payload);
     
     /**
+     * send batch messages
+     * @param msgs
+     */
+
+    public void send(Iterator<TaskMessage> msgs);
+    
+    /**
      * close this connection
      */
     public void close();

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/861a92ea/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 6996b49..09b045b 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -21,52 +21,53 @@ import backtype.storm.Config;
 import backtype.storm.messaging.IConnection;
 import backtype.storm.messaging.TaskMessage;
 import backtype.storm.utils.Utils;
-
 import org.jboss.netty.bootstrap.ClientBootstrap;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.ChannelFutureListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.Random;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
-class Client implements IConnection {
+public class Client implements IConnection {
     private static final Logger LOG = LoggerFactory.getLogger(Client.class);
-    private static final Timer TIMER = new Timer("netty-client-timer", true);
-
+    private static final String PREFIX = "Netty-Client-";
     private final int max_retries;
     private final long base_sleep_ms;
     private final long max_sleep_ms;
-    private LinkedBlockingQueue<Object> message_queue; //entry should either be TaskMessage or ControlMessage
     private AtomicReference<Channel> channelRef;
     private final ClientBootstrap bootstrap;
-    InetSocketAddress remote_addr;
-    private AtomicInteger retries;
+    private InetSocketAddress remote_addr;
+    
     private final Random random = new Random();
     private final ChannelFactory factory;
     private final int buffer_size;
-    private final AtomicBoolean being_closed;
-    private boolean wait_for_requests;
+    private boolean closing;
+
+    private Integer messageBatchSize;
+    private Boolean blocking = false;
+    
+    private AtomicLong pendings;
+
+    MessageBatch messageBatch = null;
+    private AtomicLong flushCheckTimer;
+    private int flushCheckInterval;
 
     @SuppressWarnings("rawtypes")
     Client(Map storm_conf, ChannelFactory factory, String host, int port) {
         this.factory = factory;
-        message_queue = new LinkedBlockingQueue<Object>();
-        retries = new AtomicInteger(0);
         channelRef = new AtomicReference<Channel>(null);
-        being_closed = new AtomicBoolean(false);
-        wait_for_requests = false;
+        closing = false;
+        pendings = new AtomicLong(0);
+        flushCheckTimer = new AtomicLong(Long.MAX_VALUE);
 
         // Configure
         buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
@@ -74,6 +75,14 @@ class Client implements IConnection {
         base_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
         max_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
 
+        this.messageBatchSize = Utils.getInt(storm_conf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144);
+        blocking = Utils.getBoolean(storm_conf.get(Config.STORM_NETTY_BLOCKING), false);
+        
+        flushCheckInterval = Utils.getInt(storm_conf.get(Config.STORM_NETTY_FLUSH_CHECK_INTERVAL_MS), 10); // default 10 ms
+
+        LOG.info("New Netty Client, connect to " + host + ", " + port
+                + ", config: " + ", buffer_size: " + buffer_size);
+
         bootstrap = new ClientBootstrap(factory);
         bootstrap.setOption("tcpNoDelay", true);
         bootstrap.setOption("sendBufferSize", buffer_size);
@@ -84,43 +93,87 @@ class Client implements IConnection {
 
         // Start the connection attempt.
         remote_addr = new InetSocketAddress(host, port);
-        bootstrap.connect(remote_addr);
+        
+        Thread flushChecker = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                //make sure we have a connection
+                connect();
+                
+                while(!closing) {
+                    long flushCheckTime = flushCheckTimer.get();
+                    long now = System.currentTimeMillis();
+                    if (now > flushCheckTime) {
+                        Channel channel = channelRef.get();
+                        if (null != channel && channel.isWritable()) {
+                            flush();
+                        }
+                    }
+                    try {
+                        Thread.sleep(flushCheckInterval);
+                    } catch (InterruptedException e) {
+                        break;
+                    }
+                }
+                
+            }
+        });
+        
+        flushChecker.setDaemon(true);
+        flushChecker.start();
     }
 
     /**
      * We will retry connection with exponential back-off policy
      */
-    void reconnect() {
-        close_n_release();
-
-        //reconnect only if it's not being closed
-        if (being_closed.get()) return;
-
-        final int tried_count = retries.incrementAndGet();
-        if (tried_count <= max_retries) {
-            long sleep = getSleepTimeMs();
-            LOG.info("Waiting {} ms before trying connection to {}", sleep, remote_addr);
-            TIMER.schedule(new TimerTask() {
-                @Override
-                public void run() { 
-                    LOG.info("Reconnect ... [{}] to {}", tried_count, remote_addr);
-                    bootstrap.connect(remote_addr);
-                }}, sleep);
-        } else {
-            LOG.warn(remote_addr+" is not reachable. We will close this client.");
-            close();
+    private synchronized void connect() {
+        try {
+            if (channelRef.get() != null) {
+                return;
+            }
+            
+            Channel channel = null;
+
+            int tried = 0;
+            while (tried <= max_retries) {
+
+                LOG.info("Reconnect started for {}... [{}]", name(), tried);
+                LOG.debug("connection started...");
+
+                ChannelFuture future = bootstrap.connect(remote_addr);
+                future.awaitUninterruptibly();
+                Channel current = future.getChannel();
+                if (!future.isSuccess()) {
+                    if (null != current) {
+                        current.close();
+                    }
+                } else {
+                    channel = current;
+                    break;
+                }
+                Thread.sleep(getSleepTimeMs(tried));
+                tried++;  
+            }
+            if (null != channel) {
+                LOG.info("connection established to a remote host " + name() + ", " + channel.toString());
+                channelRef.set(channel);
+            } else {
+                close();
+                throw new RuntimeException("Remote address is not reachable. We will close this client " + name());
+            }
+        } catch (InterruptedException e) {
+            throw new RuntimeException("connection failed " + name(), e);
         }
     }
 
     /**
      * # of milliseconds to wait per exponential back-off policy
      */
-    private long getSleepTimeMs()
-    {
-        if (retries.get() > 30) {
+    private long getSleepTimeMs(int retries) {
+        if (retries > 30) {
            return max_sleep_ms;
         }
-        int backoff = 1 << retries.get();
+        int backoff = 1 << retries;
         long sleepMs = base_sleep_ms * Math.max(1, random.nextInt(backoff));
         if ( sleepMs > max_sleep_ms )
             sleepMs = max_sleep_ms;
@@ -128,133 +181,103 @@ class Client implements IConnection {
     }
 
     /**
-     * Enqueue a task message to be sent to server
+     * Enqueue task messages to be sent to server
      */
-    public void send(int task, byte[] message) {
-        //throw exception if the client is being closed
-        if (being_closed.get()) {
+    synchronized public void send(Iterator<TaskMessage> msgs) {
+
+        // throw exception if the client is being closed
+        if (closing) {
             throw new RuntimeException("Client is being closed, and does not take requests any more");
         }
-
-        try {
-            message_queue.put(new TaskMessage(task, message));
-
-            //resume delivery if it is waiting for requests
-            tryDeliverMessages(true);
-        } catch (InterruptedException e) {
-            throw new RuntimeException(e);
+        
+        if (null == msgs || !msgs.hasNext()) {
+            return;
         }
-    }
 
-    /**
-     * Retrieve messages from queue, and delivery to server if any
-     */
-    synchronized void tryDeliverMessages(boolean only_if_waiting) throws InterruptedException {
-        //just skip if delivery only if waiting, and we are not waiting currently
-        if (only_if_waiting && !wait_for_requests)  return;
-
-        //make sure that channel was not closed
         Channel channel = channelRef.get();
-        if (channel == null)  return;
-        if (!channel.isOpen()) {
-            LOG.info("Channel to {} is no longer open.",remote_addr);
-            //The channel is not open yet. Reconnect?
-            reconnect();
-            return;
+        if (null == channel) {
+            connect();
+            channel = channelRef.get();
         }
 
-        final MessageBatch requests = tryTakeMessages();
-        if (requests==null) {
-            wait_for_requests = true;
-            return;
-        }
+        while (msgs.hasNext()) {
+            TaskMessage message = msgs.next();
+            if (null == messageBatch) {
+                messageBatch = new MessageBatch(messageBatchSize);
+            }
 
-        //if channel is being closed and we have no outstanding messages,  let's close the channel
-        if (requests.isEmpty() && being_closed.get()) {
-            close_n_release();
-            return;
+            messageBatch.add(message);
+            if (messageBatch.isFull()) {
+                MessageBatch toBeFlushed = messageBatch;
+                flushRequest(channel, toBeFlushed, blocking);
+                messageBatch = null;
+            }
         }
 
-        //we are busily delivering messages, and will check queue upon response.
-        //When send() is called by senders, we should not thus call tryDeliverMessages().
-        wait_for_requests = false;
-
-        //write request into socket channel
-        ChannelFuture future = channel.write(requests);
-        future.addListener(new ChannelFutureListener() {
-            public void operationComplete(ChannelFuture future)
-                    throws Exception {
-                if (!future.isSuccess()) {
-                    LOG.info("failed to send "+requests.size()+" requests to "+remote_addr, future.getCause());
-                    reconnect();
-                } else {
-                    LOG.debug("{} request(s) sent", requests.size());
-
-                    //Now that our requests have been sent, channel could be closed if needed
-                    if (being_closed.get())
-                        close_n_release();
-                }
+        if (null != messageBatch && !messageBatch.isEmpty()) {
+            if (channel.isWritable()) {
+                flushCheckTimer.set(Long.MAX_VALUE);
+                
+                // Flush as fast as we can to reduce the latency
+                MessageBatch toBeFlushed = messageBatch;
+                messageBatch = null;
+                flushRequest(channel, toBeFlushed, blocking);
+                
+            } else {
+                flushCheckTimer.set(System.currentTimeMillis() + flushCheckInterval);
             }
-        });
+        }
+
     }
 
-    /**
-     * Take all enqueued messages from queue
-     * @return  batch of messages
-     * @throws InterruptedException
-     *
-     * synchronized ... ensure that messages are delivered in the same order
-     * as they are added into queue
-     */
-    private MessageBatch tryTakeMessages() throws InterruptedException {
-        //1st message
-        Object msg = message_queue.poll();
-        if (msg == null) return null;
-
-        MessageBatch batch = new MessageBatch(buffer_size);
-        //we will discard any message after CLOSE
-        if (msg == ControlMessage.CLOSE_MESSAGE) {
-            LOG.info("Connection to {} is being closed", remote_addr);
-            being_closed.set(true);
-            return batch;
+    public String name() {
+        if (null != remote_addr) {
+            return PREFIX + remote_addr.toString();
         }
+        return "";
+    }
 
-        batch.add((TaskMessage)msg);
-        while (!batch.isFull() && ((msg = message_queue.peek())!=null)) {
-            //Is it a CLOSE message?
-            if (msg == ControlMessage.CLOSE_MESSAGE) {
-                message_queue.take();
-                LOG.info("Connection to {} is being closed", remote_addr);
-                being_closed.set(true);
-                break;
+    private synchronized void flush() {
+        if (!closing) {
+            if (null != messageBatch && !messageBatch.isEmpty()) {
+                MessageBatch toBeFlushed = messageBatch;
+                Channel channel = channelRef.get();
+                if (channel != null) {
+                    flushCheckTimer.set(Long.MAX_VALUE);
+                    flushRequest(channel, toBeFlushed, true);
+                }
+                messageBatch = null;
             }
-
-            //try to add this msg into batch
-            if (!batch.tryAdd((TaskMessage) msg))
-                break;
-
-            //remove this message
-            message_queue.take();
         }
-
-        return batch;
     }
-
+    
     /**
      * gracefully close this client.
-     *
-     * We will send all existing requests, and then invoke close_n_release() method
+     * 
+     * We will send all existing requests, and then invoke close_n_release()
+     * method
      */
-    public void close() {
-        //enqueue a CLOSE message so that shutdown() will be invoked
-        try {
-            message_queue.put(ControlMessage.CLOSE_MESSAGE);
-
-            //resume delivery if it is waiting for requests
-            tryDeliverMessages(true);
-        } catch (InterruptedException e) {
-            LOG.info("Interrupted Connection to {} is being closed", remote_addr);
-            being_closed.set(true);
+    public synchronized void close() {
+        if (!closing) {
+            closing = true;
+            if (null != messageBatch && !messageBatch.isEmpty()) {
+                MessageBatch toBeFlushed = messageBatch;
+                Channel channel = channelRef.get();
+                if (channel != null) {
+                    flushRequest(channel, toBeFlushed, true);
+                }
+                messageBatch = null;
+            }
+        
+            //wait for pendings to exit
+            while(pendings.get() != 0) {
+                try {
+                    Thread.sleep(1000); //sleep 1s
+                } catch (InterruptedException e) {
+                    break;
+                } 
+            }
+            
             close_n_release();
         }
     }
@@ -262,27 +285,59 @@ class Client implements IConnection {
     /**
      * close_n_release() is invoked after all messages have been sent.
      */
-    synchronized void close_n_release() {
+    private void close_n_release() {
         if (channelRef.get() != null) {
             channelRef.get().close();
             LOG.debug("channel {} closed",remote_addr);
-            setChannel(null);
         }
     }
 
-    public TaskMessage recv(int flags) {
+    public Iterator<TaskMessage> recv(int flags) {
         throw new RuntimeException("Client connection should not receive any messages");
     }
 
-    void setChannel(Channel channel) {
-        if (channel != null && channel.isOpen()) {
-            //Assume the most recent connection attempt was successful.
-            retries.set(0);
-        }
-        channelRef.set(channel);
-        //reset retries
-        if (channel != null)
-            retries.set(0);
+    public Iterator<TaskMessage> recv(int flags, int clientId) {
+        throw new RuntimeException("Client connection should not receive any messages");
+    }
+
+    @Override
+    public void send(int taskId, byte[] payload) {
+        TaskMessage msg = new TaskMessage(taskId, payload);
+        List<TaskMessage> wrapper = new ArrayList<TaskMessage>(1);
+        wrapper.add(msg);
+        send(wrapper.iterator());
     }
 
-}
+    private void flushRequest(Channel channel, final MessageBatch requests,
+            boolean blocking) {
+        if (requests == null)
+            return;
+
+        pendings.incrementAndGet();
+        ChannelFuture future = channel.write(requests);
+        future.addListener(new ChannelFutureListener() {
+            public void operationComplete(ChannelFuture future)
+                    throws Exception {
+
+                pendings.decrementAndGet();
+                if (!future.isSuccess()) {
+                    LOG.info(
+                            "failed to send requests to " + remote_addr.toString() + ": ", future.getCause());
+
+                    Channel channel = future.getChannel();
+
+                    if (null != channel) {
+                        channel.close();
+                        channelRef.compareAndSet(channel, null);
+                    }
+                } else {
+                    LOG.debug("{} request(s) sent", requests.size());
+                }
+            }
+        });
+
+        if (blocking) {
+            future.awaitUninterruptibly();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/861a92ea/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
index 80b4443..2e762ce 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
@@ -20,6 +20,7 @@ package backtype.storm.messaging.netty;
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import java.util.Map;
 import java.util.Vector;
 
@@ -47,12 +48,14 @@ public class Context implements IContext {
 
         //each context will have a single client channel factory
         int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
+		ThreadFactory bossFactory = new NettyRenameThreadFactory("client" + "-boss");
+        ThreadFactory workerFactory = new NettyRenameThreadFactory("client" + "-worker");
         if (maxWorkers > 0) {
-            clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
-                    Executors.newCachedThreadPool(), maxWorkers);
+            clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
+                    Executors.newCachedThreadPool(workerFactory), maxWorkers);
         } else {
-            clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
-                    Executors.newCachedThreadPool());
+            clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
+                    Executors.newCachedThreadPool(workerFactory));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/861a92ea/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java b/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
index a552cf7..b7335b3 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
@@ -17,6 +17,8 @@
  */
 package backtype.storm.messaging.netty;
 
+import java.io.IOException;
+
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBufferOutputStream;
 import org.jboss.netty.buffer.ChannelBuffers;
@@ -54,14 +56,14 @@ enum ControlMessage {
      * encode the current Control Message into a channel buffer
      * @throws Exception
      */
-    ChannelBuffer buffer() throws Exception {
+    ChannelBuffer buffer() throws IOException {
         ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encodeLength()));      
         write(bout);
         bout.close();
         return bout.buffer();
     }
 
-    void write(ChannelBufferOutputStream bout) throws Exception {
+    void write(ChannelBufferOutputStream bout) throws IOException {
         bout.writeShort(code);        
     } 
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/861a92ea/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
index cd8d4e3..63c861a 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
@@ -44,6 +44,11 @@ class MessageBatch {
         encoded_length += msgEncodeLength(msg);
     }
 
+
+    TaskMessage get(int index) {
+        return msgs.get(index);
+    }
+
     /**
      * try to add a TaskMessage to a batch
      * @param taskMsg

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/861a92ea/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
index 3365e58..8291d78 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
@@ -17,6 +17,9 @@
  */
 package backtype.storm.messaging.netty;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import backtype.storm.messaging.TaskMessage;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.channel.Channel;
@@ -34,52 +37,78 @@ public class MessageDecoder extends FrameDecoder {
      */
     protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception {
         // Make sure that we have received at least a short 
-        if (buf.readableBytes() < 2) {
+        long available = buf.readableBytes();
+        if (available < 2) {
             //need more data
             return null;
         }
 
-        // Mark the current buffer position before reading task/len field
-        // because the whole frame might not be in the buffer yet.
-        // We will reset the buffer position to the marked position if
-        // there's not enough bytes in the buffer.
-        buf.markReaderIndex();
-
-        //read the short field
-        short code = buf.readShort();
-        
-        //case 1: Control message
-        ControlMessage ctrl_msg = ControlMessage.mkMessage(code);
-        if (ctrl_msg != null) return ctrl_msg;
-        
-        //case 2: task Message
-        short task = code;
-        
-        // Make sure that we have received at least an integer (length) 
-        if (buf.readableBytes() < 4) {
-            //need more data
-            buf.resetReaderIndex();
-            return null;
-        }
+        List<Object> ret = new ArrayList<Object>();
+
+        while (available >= 2) {
+
+            // Mark the current buffer position before reading task/len field
+            // because the whole frame might not be in the buffer yet.
+            // We will reset the buffer position to the marked position if
+            // there's not enough bytes in the buffer.
+            buf.markReaderIndex();
+
+            // read the short field
+            short code = buf.readShort();
+            available -= 2;
+
+            // case 1: Control message
+            ControlMessage ctrl_msg = ControlMessage.mkMessage(code);
+            if (ctrl_msg != null) {
+
+                if (ctrl_msg == ControlMessage.EOB_MESSAGE) {
+                    continue;
+                } else {
+                    return ctrl_msg;
+                }
+            }
+
+            // case 2: task Message
+            short task = code;
 
-        // Read the length field.
-        int length = buf.readInt();
-        if (length<=0) {
-            return new TaskMessage(task, null);
+            // Make sure that we have received at least an integer (length)
+            if (available < 4) {
+                // need more data
+                buf.resetReaderIndex();
+                break;
+            }
+
+            // Read the length field.
+            int length = buf.readInt();
+
+            available -= 4;
+
+            if (length <= 0) {
+                ret.add(new TaskMessage(task, null));
+                break;
+            }
+
+            // Make sure if there's enough bytes in the buffer.
+            if (available < length) {
+                // The whole bytes were not received yet - return null.
+                buf.resetReaderIndex();
+                break;
+            }
+            available -= length;
+
+            // There's enough bytes in the buffer. Read it.
+            ChannelBuffer payload = buf.readBytes(length);
+
+
+            // Successfully decoded a frame.
+            // Return a TaskMessage object
+            ret.add(new TaskMessage(task, payload.array()));
         }
-        
-        // Make sure if there's enough bytes in the buffer.
-        if (buf.readableBytes() < length) {
-            // The whole bytes were not received yet - return null.
-            buf.resetReaderIndex();
+
+        if (ret.size() == 0) {
             return null;
+        } else {
+            return ret;
         }
-
-        // There's enough bytes in the buffer. Read it.
-        ChannelBuffer payload = buf.readBytes(length);
-
-        // Successfully decoded a frame.
-        // Return a TaskMessage object
-        return new TaskMessage(task,payload.array());
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/861a92ea/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
index 83e4187..71f01e0 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
@@ -31,35 +31,65 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
 
 class Server implements IConnection {
     private static final Logger LOG = LoggerFactory.getLogger(Server.class);
     @SuppressWarnings("rawtypes")
     Map storm_conf;
     int port;
-    private LinkedBlockingQueue<TaskMessage> message_queue;
+    private LinkedBlockingQueue<ArrayList<TaskMessage>>[] message_queue;
     volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server");
     final ChannelFactory factory;
     final ServerBootstrap bootstrap;
-
+    
+    private int queueCount;
+    HashMap<Integer, Integer> taskToQueueId = null;
+    int roundRobinQueueId;
+	
+    boolean closing = false;
+    List<TaskMessage> closeMessage = Arrays.asList(new TaskMessage(-1, null));
+    
+    
     @SuppressWarnings("rawtypes")
     Server(Map storm_conf, int port) {
         this.storm_conf = storm_conf;
         this.port = port;
-        message_queue = new LinkedBlockingQueue<TaskMessage>();
-
+        
+        queueCount = Utils.getInt(storm_conf.get("worker.receiver.thread.count"), 1);
+        roundRobinQueueId = 0;
+        taskToQueueId = new HashMap<Integer, Integer>();
+    
+        message_queue = new LinkedBlockingQueue[queueCount];
+		    for (int i = 0; i < queueCount; i++) {
+            message_queue[i] = new LinkedBlockingQueue<ArrayList<TaskMessage>>();
+        }
+        
         // Configure the server.
         int buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
         int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS));
 
+        ThreadFactory bossFactory = new NettyRenameThreadFactory(name() + "-boss");
+        ThreadFactory workerFactory = new NettyRenameThreadFactory(name() + "-worker");
+        
         if (maxWorkers > 0) {
-            factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), maxWorkers);
+            factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), 
+                Executors.newCachedThreadPool(workerFactory), maxWorkers);
         } else {
-            factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
+            factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), 
+                Executors.newCachedThreadPool(workerFactory));
         }
+        
+        LOG.info("Create Netty Server " + name() + ", buffer_size: " + buffer_size + ", maxWorkers: " + maxWorkers);
+        
         bootstrap = new ServerBootstrap(factory);
         bootstrap.setOption("child.tcpNoDelay", true);
         bootstrap.setOption("child.receiveBufferSize", buffer_size);
@@ -72,34 +102,106 @@ class Server implements IConnection {
         Channel channel = bootstrap.bind(new InetSocketAddress(port));
         allChannels.add(channel);
     }
+    
+    private ArrayList<TaskMessage>[] groupMessages(List<TaskMessage> msgs) {
+      ArrayList<TaskMessage> messageGroups[] = new ArrayList[queueCount];
+      
+      for (int i = 0; i < msgs.size(); i++) {
+        TaskMessage message = msgs.get(i);
+        int task = message.task();
+        
+        if (task == -1) {
+          closing = true;
+          return null;
+        }
+        
+        Integer queueId = getMessageQueueId(task);
+        
+        if (null == messageGroups[queueId]) {
+          messageGroups[queueId] = new ArrayList<TaskMessage>();
+        }
+        messageGroups[queueId].add(message);
+      }
+      return messageGroups;
+    }
+    
+    private Integer getMessageQueueId(int task) {
+      Integer queueId = taskToQueueId.get(task);
+      if (null == queueId) {
+        synchronized(taskToQueueId) {
+          //assgin task to queue in round-robin manner
+          if (null == taskToQueueId.get(task)) {
+            queueId = roundRobinQueueId++;
+            taskToQueueId.put(task, queueId);
+            if (roundRobinQueueId == queueCount) {
+              roundRobinQueueId = 0;
+            }
+          }
+        }
+      }
+      return queueId;
+    }
 
     /**
      * enqueue a received message 
      * @param message
      * @throws InterruptedException
      */
-    protected void enqueue(TaskMessage message) throws InterruptedException {
-        message_queue.put(message);
-        LOG.debug("message received with task: {}, payload size: {}", message.task(), message.message().length);
-    }
+    protected void enqueue(List<TaskMessage> msgs) throws InterruptedException {
+      
+      if (null == msgs || msgs.size() == 0 || closing) {
+        return;
+      }
+      
+      ArrayList<TaskMessage> messageGroups[] = groupMessages(msgs);
+      
+      if (null == messageGroups || closing) {
+        return;
+      }
+      
+      for (int receiverId = 0; receiverId < messageGroups.length; receiverId++) {
+        ArrayList<TaskMessage> msgGroup = messageGroups[receiverId];
+        if (null != msgGroup) {
+          message_queue[receiverId].put(msgGroup);
+        }
+      }
+   }
     
     /**
      * fetch a message from message queue synchronously (flags != 1) or asynchronously (flags==1)
      */
-    public TaskMessage recv(int flags)  {
-        if ((flags & 0x01) == 0x01) { 
+    public Iterator<TaskMessage> recv(int flags)  {
+      if (queueCount > 1) {
+        throw new RuntimeException("Use recv(int flags, int clientId) instead, as we have worker.receiver.thread.count=" + queueCount + " receive threads, clientId should be 0 <= clientId < " + queueCount);
+      }
+      return recv(flags, 0);
+    }
+    
+    public Iterator<TaskMessage> recv(int flags, int receiverId)  {
+      if (closing) {
+        return closeMessage.iterator();
+      }
+      
+      ArrayList<TaskMessage> ret = null; 
+      int queueId = receiverId % queueCount;
+      if ((flags & 0x01) == 0x01) { 
             //non-blocking
-            return message_queue.poll();
+            ret = message_queue[queueId].poll();
         } else {
             try {
-                TaskMessage request = message_queue.take();
+                ArrayList<TaskMessage> request = message_queue[queueId].take();
                 LOG.debug("request to be processed: {}", request);
-                return request;
+                ret = request;
             } catch (InterruptedException e) {
                 LOG.info("exception within msg receiving", e);
-                return null;
+                ret = null;
             }
         }
+      
+      if (null != ret) {
+        return ret.iterator();
+      }
+      return null;
     }
 
     /**
@@ -133,4 +235,12 @@ class Server implements IConnection {
     public void send(int task, byte[] message) {
         throw new RuntimeException("Server connection should not send any messages");
     }
+    
+    public void send(Iterator<TaskMessage> msgs) {
+      throw new RuntimeException("Server connection should not send any messages");
+    }
+	
+	 public String name() {
+      return "Netty-server-localhost-" + port;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/861a92ea/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
index 43a8c39..4c70518 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
@@ -18,70 +18,24 @@
 package backtype.storm.messaging.netty;
 
 import java.net.ConnectException;
-import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.ConnectException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 public class StormClientHandler extends SimpleChannelUpstreamHandler  {
     private static final Logger LOG = LoggerFactory.getLogger(StormClientHandler.class);
     private Client client;
-    long start_time;
     
     StormClientHandler(Client client) {
         this.client = client;
-        start_time = System.currentTimeMillis();
-    }
-
-    @Override
-    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent event) {
-        //register the newly established channel
-        Channel channel = event.getChannel();
-        client.setChannel(channel);
-        LOG.info("connection established from "+channel.getLocalAddress()+" to "+channel.getRemoteAddress());
-        
-        //send next batch of requests if any
-        try {
-            client.tryDeliverMessages(false);
-        } catch (Exception ex) {
-            LOG.info("exception when sending messages:", ex.getMessage());
-            client.reconnect();
-        }
-    }
-
-    @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) {
-        LOG.debug("send/recv time (ms): {}", (System.currentTimeMillis() - start_time));
-        
-        //examine the response message from server
-        ControlMessage msg = (ControlMessage)event.getMessage();
-        if (msg==ControlMessage.FAILURE_RESPONSE)
-            LOG.info("failure response:{}", msg);
-
-        //send next batch of requests if any
-        try {
-            client.tryDeliverMessages(false);
-        } catch (Exception ex) {
-            LOG.info("exception when sending messages:", ex.getMessage());
-            client.reconnect();
-        }
     }
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) {
         Throwable cause = event.getCause();
         if (!(cause instanceof ConnectException)) {
-            LOG.info("Connection to "+client.remote_addr+" failed:", cause);
-        }
-        client.reconnect();
+            LOG.info("Connection failed " + client.name(), cause);
+        } 
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/861a92ea/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
index 093fb61..8b93e31 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
@@ -18,10 +18,14 @@
 package backtype.storm.messaging.netty;
 
 import backtype.storm.messaging.TaskMessage;
-import org.jboss.netty.channel.*;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
+import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
 class StormServerHandler extends SimpleChannelUpstreamHandler  {
@@ -41,30 +45,22 @@ class StormServerHandler extends SimpleChannelUpstreamHandler  {
     
     @Override
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
-        Object msg = e.getMessage();  
-        if (msg == null) return;
-
-        //end of batch?
-        if (msg==ControlMessage.EOB_MESSAGE) {
-            Channel channel = ctx.getChannel();
-            LOG.debug("Send back response ...");
-            if (failure_count.get()==0)
-                channel.write(ControlMessage.OK_RESPONSE);
-            else channel.write(ControlMessage.FAILURE_RESPONSE);
-            return;
-        }
-        
-        //enqueue the received message for processing
-        try {
-            server.enqueue((TaskMessage)msg);
-        } catch (InterruptedException e1) {
-            LOG.info("failed to enqueue a request message", e);
-            failure_count.incrementAndGet();
-        }
+      List<TaskMessage> msgs = (List<TaskMessage>) e.getMessage();
+      if (msgs == null) {
+        return;
+      }
+      
+      try {
+        server.enqueue(msgs);
+      } catch (InterruptedException e1) {
+        LOG.info("failed to enqueue a request message", e);
+        failure_count.incrementAndGet();
+      }
     }
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+        e.getCause().printStackTrace();
         server.closeChannel(e.getChannel());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/861a92ea/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index 56751c6..8c5b466 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -51,8 +51,11 @@ public class DisruptorQueue implements IStatefulObject {
     // TODO: consider having a threadlocal cache of this variable to speed up reads?
     volatile boolean consumerStartedFlag = false;
     ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue();
+    private static String PREFIX = "disruptor-";
+    private String _queueName = "";
     
-    public DisruptorQueue(ClaimStrategy claim, WaitStrategy wait) {
+    public DisruptorQueue(String queueName, ClaimStrategy claim, WaitStrategy wait) {
+         this._queueName = PREFIX + queueName;
         _buffer = new RingBuffer<MutableObject>(new ObjectEventFactory(), claim, wait);
         _consumer = new Sequence();
         _barrier = _buffer.newBarrier();
@@ -62,6 +65,10 @@ public class DisruptorQueue implements IStatefulObject {
         }
     }
     
+    public String getName() {
+      return _queueName;
+    }
+    
     public void consumeBatch(EventHandler<Object> handler) {
         consumeBatchToCursor(_barrier.getCursor(), handler);
     }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/861a92ea/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java
index a1fed96..b1892f1 100644
--- a/storm-core/src/jvm/backtype/storm/utils/Utils.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java
@@ -25,6 +25,7 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.io.PrintStream;
 import java.net.URL;
 import java.net.URLDecoder;
 import java.nio.ByteBuffer;
@@ -37,6 +38,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Scanner;
 import java.util.TreeMap;
 import java.util.UUID;
 
@@ -301,15 +303,39 @@ public class Utils {
     }
     
     public static Integer getInt(Object o) {
-        if(o instanceof Long) {
-            return ((Long) o ).intValue();
-        } else if (o instanceof Integer) {
-            return (Integer) o;
-        } else if (o instanceof Short) {
-            return ((Short) o).intValue();
-        } else {
-            throw new IllegalArgumentException("Don't know how to convert " + o + " + to int");
-        }
+      Integer result = getInt(o, null);
+      if (null == result) {
+        throw new IllegalArgumentException("Don't know how to convert null + to int");
+      }
+      return result;
+    }
+    
+    public static Integer getInt(Object o, Integer defaultValue) {
+      if (null == o) {
+        return defaultValue;
+      }
+      
+      if(o instanceof Long) {
+          return ((Long) o ).intValue();
+      } else if (o instanceof Integer) {
+          return (Integer) o;
+      } else if (o instanceof Short) {
+          return ((Short) o).intValue();
+      } else {
+          throw new IllegalArgumentException("Don't know how to convert " + o + " + to int");
+      }
+    }
+
+    public static boolean getBoolean(Object o, boolean defaultValue) {
+      if (null == o) {
+        return defaultValue;
+      }
+      
+      if(o instanceof Boolean) {
+          return (Boolean) o;
+      } else {
+          throw new IllegalArgumentException("Don't know how to convert " + o + " + to boolean");
+      }
     }
     
     public static long secureRandomLong() {
@@ -373,6 +399,25 @@ public class Utils {
         ret.start();
         return ret;
     }
+    
+    public static void redirectStreamAsync(Process process) {
+      redirectStreamAsync(process.getInputStream(), System.out);
+      redirectStreamAsync(process.getErrorStream(), System.err);
+    }
+    
+    static void redirectStreamAsync(final InputStream input,
+        final PrintStream output) {
+      new Thread(new Runnable() {
+        @Override
+        public void run() {
+          Scanner scanner = new Scanner(input);
+          while (scanner.hasNextLine()) {
+            output.println(scanner.nextLine());
+          }
+        }
+      }).start();
+    }
+ 
 
     public static CuratorFramework newCuratorStarted(Map conf, List<String> servers, Object port) {
         CuratorFramework ret = newCurator(conf, servers, port);

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/861a92ea/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
index f271607..d76e245 100644
--- a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
@@ -37,7 +37,8 @@
         server (.bind context nil port)
         client (.connect context nil "localhost" port)
         _ (.send client task (.getBytes req_msg))
-        resp (.recv server 0)]
+        iter (.recv server 0)
+        resp (.next iter)]
     (is (= task (.task resp)))
     (is (= req_msg (String. (.message resp))))
     (.close client)
@@ -58,7 +59,8 @@
         server (.bind context nil port)
         client (.connect context nil "localhost" port)
         _ (.send client task (.getBytes req_msg))
-        resp (.recv server 0)]
+        iter (.recv server 0)
+        resp (.next iter)]
     (is (= task (.task resp)))
     (is (= req_msg (String. (.message resp))))
     (.close client)
@@ -77,15 +79,23 @@
                     }
         context (TransportFactory/makeContext storm-conf)
         client (.connect context nil "localhost" port)
+        
+        server (Thread.
+                (fn []
+                  (Thread/sleep 1000)
+                  (let [server (.bind context nil port)
+                        iter (.recv server 0)
+                        resp (.next iter)]
+                    (is (= task (.task resp)))
+                    (is (= req_msg (String. (.message resp))))
+                    (.close server) 
+                  )))
+        _ (.start server)
         _ (.send client task (.getBytes req_msg))
-        _ (Thread/sleep 1000)
-        server (.bind context nil port)
-        resp (.recv server 0)]
-    (is (= task (.task resp)))
-    (is (= req_msg (String. (.message resp))))
+        ]
     (.close client)
-    (.close server)
-    (.term context)))    
+    (.join server)
+    (.term context)))
 
 (deftest test-batch
   (let [storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
@@ -102,11 +112,21 @@
     (doseq [num  (range 1 100000)]
       (let [req_msg (str num)]
         (.send client task (.getBytes req_msg))))
-    (doseq [num  (range 1 100000)]
+    
+    (let [resp (ArrayList.)
+          received (atom 0)]
+      (while (< @received (- 100000 1))
+        (let [iter (.recv server 0)]
+          (while (.hasNext iter)
+            (let [msg (.next iter)]
+              (.add resp msg)
+              (swap! received inc)
+              ))))
+      (doseq [num  (range 1 100000)]
       (let [req_msg (str num)
-            resp (.recv server 0)
-            resp_msg (String. (.message resp))]
-        (is (= req_msg resp_msg))))
+            resp_msg (String. (.message (.get resp (- num 1))))]
+        (is (= req_msg resp_msg)))))
+   
     (.close client)
     (.close server)
     (.term context)))


[26/32] git commit: Merge branch 'storm_async_netty_and_batch_api' of https://github.com/clockfly/incubator-storm into STORM-297

Posted by bo...@apache.org.
Merge branch 'storm_async_netty_and_batch_api' of https://github.com/clockfly/incubator-storm into STORM-297

STORM-297: Storm performance scaling with CPU


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/701ac94d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/701ac94d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/701ac94d

Branch: refs/heads/master
Commit: 701ac94d9ad29192368ad63e7d6975794dc3c2f7
Parents: dd1d213 f6915d4
Author: Robert (Bobby) Evans <bo...@apache.org>
Authored: Sat Jun 7 10:30:33 2014 -0500
Committer: Robert (Bobby) Evans <bo...@apache.org>
Committed: Sat Jun 7 10:30:33 2014 -0500

----------------------------------------------------------------------
 conf/defaults.yaml                              |   9 +
 .../src/clj/backtype/storm/daemon/executor.clj  |   1 +
 .../src/clj/backtype/storm/daemon/worker.clj    |  67 ++--
 storm-core/src/clj/backtype/storm/disruptor.clj |  10 +-
 .../src/clj/backtype/storm/messaging/loader.clj |  81 ++--
 .../src/clj/backtype/storm/messaging/local.clj  |  20 +-
 storm-core/src/clj/backtype/storm/timer.clj     |   5 +-
 storm-core/src/jvm/backtype/storm/Config.java   |  19 +
 .../backtype/storm/messaging/IConnection.java   |  15 +-
 .../backtype/storm/messaging/netty/Client.java  | 388 +++++++++++--------
 .../backtype/storm/messaging/netty/Context.java |  41 +-
 .../storm/messaging/netty/ControlMessage.java   |   6 +-
 .../storm/messaging/netty/MessageBatch.java     |   5 +
 .../storm/messaging/netty/MessageDecoder.java   | 108 ++++--
 .../netty/NettyRenameThreadFactory.java         |  35 ++
 .../backtype/storm/messaging/netty/Server.java  | 145 ++++++-
 .../netty/StormClientErrorHandler.java          |  41 ++
 .../messaging/netty/StormClientHandler.java     |  87 -----
 .../netty/StormClientPipelineFactory.java       |   2 +-
 .../messaging/netty/StormServerHandler.java     |  40 +-
 .../storm/testing/TestEventLogSpout.java        | 105 +++++
 .../storm/testing/TestEventOrderCheckBolt.java  |  76 ++++
 .../backtype/storm/utils/DisruptorQueue.java    |   9 +-
 .../backtype/storm/utils/TransferDrainer.java   | 113 ++++++
 .../src/jvm/backtype/storm/utils/Utils.java     |  46 ++-
 .../storm/messaging/netty_unit_test.clj         |  46 ++-
 .../test/clj/backtype/storm/messaging_test.clj  |  35 +-
 27 files changed, 1110 insertions(+), 445 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/701ac94d/conf/defaults.yaml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/701ac94d/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/701ac94d/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/701ac94d/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------


[10/32] git commit: STORM-297: remove config storm.messaging.netty.blocking as we want async to be the default

Posted by bo...@apache.org.
STORM-297: remove config storm.messaging.netty.blocking as we want async to be the default


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/19e72e85
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/19e72e85
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/19e72e85

Branch: refs/heads/master
Commit: 19e72e852cb25caa1ee18b6c5ae1ae627e337bf1
Parents: 7b88cf3
Author: Sean Zhong <cl...@gmail.com>
Authored: Tue May 20 11:24:42 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Tue May 20 11:24:42 2014 +0800

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/Config.java      |  6 ------
 .../jvm/backtype/storm/messaging/netty/Client.java | 17 +++++------------
 2 files changed, 5 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/19e72e85/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index fa5d9d1..d069293 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -92,12 +92,6 @@ public class Config extends HashMap<String, Object> {
     public static final Object STORM_NETTY_MESSAGE_BATCH_SIZE_SCHEMA = Number.class;
 
     /**
-     * This control whether we do Netty message transfer in a synchronized way or async way. 
-     */
-    public static final String STORM_NETTY_BLOCKING = "storm.messaging.netty.blocking";
-    public static final Object STORM_NETTY_BLOCKING_SCHEMA = Boolean.class;
-    
-    /**
      * We check with this interval that whether the Netty channel is writable and try to write pending messages
      */
     public static final String STORM_NETTY_FLUSH_CHECK_INTERVAL_MS = "storm.messaging.netty.flush.check.interval.ms";

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/19e72e85/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 8f0d7af..18582b4 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -53,7 +53,6 @@ public class Client implements IConnection {
     private boolean closing;
 
     private Integer messageBatchSize;
-    private Boolean blocking = false;
     
     private AtomicLong pendings;
 
@@ -76,7 +75,6 @@ public class Client implements IConnection {
         max_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
 
         this.messageBatchSize = Utils.getInt(storm_conf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144);
-        blocking = Utils.getBoolean(storm_conf.get(Config.STORM_NETTY_BLOCKING), false);
         
         flushCheckInterval = Utils.getInt(storm_conf.get(Config.STORM_NETTY_FLUSH_CHECK_INTERVAL_MS), 10); // default 10 ms
 
@@ -209,7 +207,7 @@ public class Client implements IConnection {
             messageBatch.add(message);
             if (messageBatch.isFull()) {
                 MessageBatch toBeFlushed = messageBatch;
-                flushRequest(channel, toBeFlushed, blocking);
+                flushRequest(channel, toBeFlushed);
                 messageBatch = null;
             }
         }
@@ -221,7 +219,7 @@ public class Client implements IConnection {
                 // Flush as fast as we can to reduce the latency
                 MessageBatch toBeFlushed = messageBatch;
                 messageBatch = null;
-                flushRequest(channel, toBeFlushed, blocking);
+                flushRequest(channel, toBeFlushed);
                 
             } else {
                 // when channel is NOT writable, it means the internal netty buffer is full. 
@@ -246,7 +244,7 @@ public class Client implements IConnection {
                 Channel channel = channelRef.get();
                 if (channel != null) {
                     flushCheckTimer.set(Long.MAX_VALUE);
-                    flushRequest(channel, toBeFlushed, true);
+                    flushRequest(channel, toBeFlushed);
                 }
                 messageBatch = null;
             }
@@ -266,7 +264,7 @@ public class Client implements IConnection {
                 MessageBatch toBeFlushed = messageBatch;
                 Channel channel = channelRef.get();
                 if (channel != null) {
-                    flushRequest(channel, toBeFlushed, true);
+                    flushRequest(channel, toBeFlushed);
                 }
                 messageBatch = null;
             }
@@ -307,8 +305,7 @@ public class Client implements IConnection {
         send(wrapper.iterator());
     }
 
-    private void flushRequest(Channel channel, final MessageBatch requests,
-            boolean blocking) {
+    private void flushRequest(Channel channel, final MessageBatch requests) {
         if (requests == null)
             return;
 
@@ -334,9 +331,5 @@ public class Client implements IConnection {
                 }
             }
         });
-
-        if (blocking) {
-            future.awaitUninterruptibly();
-        }
     }
 }
\ No newline at end of file


[14/32] git commit: STORM-297: rename StormClientHandler to StormClientErrorHandler, as all we do it to check errors in the class

Posted by bo...@apache.org.
STORM-297: rename StormClientHandler to StormClientErrorHandler, as all we do it to check errors in the class


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/1baf9b7f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/1baf9b7f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/1baf9b7f

Branch: refs/heads/master
Commit: 1baf9b7f47f1f1592391a9eb18b3ad0b340f7355
Parents: d85c470
Author: Sean Zhong <cl...@gmail.com>
Authored: Tue May 20 12:03:16 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Tue May 20 12:03:16 2014 +0800

----------------------------------------------------------------------
 .../netty/StormClientErrorHandler.java          | 41 ++++++++++++++++++++
 .../messaging/netty/StormClientHandler.java     | 41 --------------------
 .../netty/StormClientPipelineFactory.java       |  2 +-
 3 files changed, 42 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/1baf9b7f/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientErrorHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientErrorHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientErrorHandler.java
new file mode 100644
index 0000000..ae317aa
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientErrorHandler.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import java.net.ConnectException;
+
+import org.jboss.netty.channel.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StormClientErrorHandler extends SimpleChannelUpstreamHandler  {
+    private static final Logger LOG = LoggerFactory.getLogger(StormClientErrorHandler.class);
+    private String name;
+    
+    StormClientErrorHandler(String name) {
+        this.name = name;
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) {
+        Throwable cause = event.getCause();
+        if (!(cause instanceof ConnectException)) {
+            LOG.info("Connection failed " + name, cause);
+        } 
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/1baf9b7f/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
deleted file mode 100644
index 4c70518..0000000
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.messaging.netty;
-
-import java.net.ConnectException;
-
-import org.jboss.netty.channel.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StormClientHandler extends SimpleChannelUpstreamHandler  {
-    private static final Logger LOG = LoggerFactory.getLogger(StormClientHandler.class);
-    private Client client;
-    
-    StormClientHandler(Client client) {
-        this.client = client;
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) {
-        Throwable cause = event.getCause();
-        if (!(cause instanceof ConnectException)) {
-            LOG.info("Connection failed " + client.name(), cause);
-        } 
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/1baf9b7f/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
index 6bad8e3..e6e8b3d 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
@@ -37,7 +37,7 @@ class StormClientPipelineFactory implements ChannelPipelineFactory {
         // Encoder
         pipeline.addLast("encoder", new MessageEncoder());
         // business logic.
-        pipeline.addLast("handler", new StormClientHandler(client));
+        pipeline.addLast("handler", new StormClientErrorHandler(client.name()));
 
         return pipeline;
     }


[18/32] git commit: STORM-297: Use a shared pool for netty client flusher threads.

Posted by bo...@apache.org.
STORM-297: Use a shared pool for netty client flusher threads.


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/deba5583
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/deba5583
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/deba5583

Branch: refs/heads/master
Commit: deba55833b8752c5275cb37ac3a1043583790963
Parents: baa3106
Author: Sean Zhong <cl...@gmail.com>
Authored: Tue May 20 20:46:11 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Tue May 20 21:19:37 2014 +0800

----------------------------------------------------------------------
 .../backtype/storm/messaging/netty/Client.java  | 32 ++++++++++++--------
 .../backtype/storm/messaging/netty/Context.java | 30 +++++++++++++++---
 2 files changed, 44 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/deba5583/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 59a8a5c..9765647 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -34,6 +34,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -59,10 +61,13 @@ public class Client implements IConnection {
     MessageBatch messageBatch = null;
     private AtomicLong flushCheckTimer;
     private int flushCheckInterval;
+    private ScheduledExecutorService scheduler;
 
     @SuppressWarnings("rawtypes")
-    Client(Map storm_conf, ChannelFactory factory, String host, int port) {
+    Client(Map storm_conf, ChannelFactory factory, 
+            ScheduledExecutorService scheduler, String host, int port) {
         this.factory = factory;
+        this.scheduler = scheduler;
         channelRef = new AtomicReference<Channel>(null);
         closing = false;
         pendings = new AtomicLong(0);
@@ -92,12 +97,18 @@ public class Client implements IConnection {
         // Start the connection attempt.
         remote_addr = new InetSocketAddress(host, port);
         
-        Thread flushChecker = new Thread(new Runnable() {
+        // setup the connection asyncly now
+        scheduler.execute(new Runnable() {
             @Override
-            public void run() {
-                //make sure we have a connection
+            public void run() {   
                 connect();
-                
+            }
+        });
+        
+        Runnable flusher = new Runnable() {
+            @Override
+            public void run() {
+
                 while(!closing) {
                     long flushCheckTime = flushCheckTimer.get();
                     long now = System.currentTimeMillis();
@@ -107,18 +118,13 @@ public class Client implements IConnection {
                             flush(channel);
                         }
                     }
-                    try {
-                        Thread.sleep(flushCheckInterval);
-                    } catch (InterruptedException e) {
-                        break;
-                    }
                 }
                 
             }
-        }, name() + "-flush-checker");
+        };
         
-        flushChecker.setDaemon(true);
-        flushChecker.start();
+        long initialDelay = Math.min(30L * 1000, max_sleep_ms * max_retries); //max wait for 30s
+        scheduler.scheduleWithFixedDelay(flusher, initialDelay, flushCheckInterval, TimeUnit.MILLISECONDS);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/deba5583/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
index 2e762ce..8f2b17f 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
@@ -18,9 +18,12 @@
 package backtype.storm.messaging.netty;
 
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.Map;
 import java.util.Vector;
 
@@ -29,14 +32,16 @@ import backtype.storm.messaging.IConnection;
 import backtype.storm.messaging.IContext;
 import backtype.storm.utils.Utils;
 
-import java.util.Map;
-import java.util.Vector;
-
 public class Context implements IContext {
+    private static final Logger LOG = LoggerFactory.getLogger(Context.class);
+        
     @SuppressWarnings("rawtypes")
     private Map storm_conf;
     private volatile Vector<IConnection> connections;
     private NioClientSocketChannelFactory clientChannelFactory;
+    
+    private ScheduledExecutorService clientScheduleService;
+    private final int MAX_CLIENT_SCHEDULER_THREAD_POOL_SIZE = 10;
 
     /**
      * initialization per Storm configuration 
@@ -57,6 +62,10 @@ public class Context implements IContext {
             clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
                     Executors.newCachedThreadPool(workerFactory));
         }
+        
+        int otherWorkers = Utils.getInt(storm_conf.get(Config.TOPOLOGY_WORKERS)) - 1;
+        int poolSize = Math.min(Math.max(1, otherWorkers), MAX_CLIENT_SCHEDULER_THREAD_POOL_SIZE);
+        clientScheduleService = Executors.newScheduledThreadPool(poolSize, new NettyRenameThreadFactory("client-schedule-service"));
     }
 
     /**
@@ -72,7 +81,8 @@ public class Context implements IContext {
      * establish a connection to a remote server
      */
     public IConnection connect(String storm_id, String host, int port) {        
-        IConnection client =  new Client(storm_conf, clientChannelFactory, host, port);
+        IConnection client =  new Client(storm_conf, clientChannelFactory, 
+                clientScheduleService, host, port);
         connections.add(client);
         return client;
     }
@@ -81,12 +91,22 @@ public class Context implements IContext {
      * terminate this context
      */
     public void term() {
+        clientScheduleService.shutdown();        
+        
         for (IConnection conn : connections) {
             conn.close();
         }
+        
+        try {
+            clientScheduleService.awaitTermination(30, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            LOG.error("Error when shutting down client scheduler", e);
+        }
+        
         connections = null;
 
         //we need to release resources associated with client channel factory
         clientChannelFactory.releaseExternalResources();
+
     }
 }


[05/32] git commit: STORM-297, add more comments

Posted by bo...@apache.org.
STORM-297, add more comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/c5e7a0d0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/c5e7a0d0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/c5e7a0d0

Branch: refs/heads/master
Commit: c5e7a0d0f137cc5f0ccf6116541c16d4a7727598
Parents: 2812e51
Author: Sean Zhong <cl...@gmail.com>
Authored: Thu May 8 10:32:46 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Thu May 8 10:32:46 2014 +0800

----------------------------------------------------------------------
 conf/defaults.yaml                               |  9 ++++++++-
 .../backtype/storm/messaging/netty/Client.java   |  4 +++-
 .../storm/messaging/netty/MessageDecoder.java    |  1 +
 .../backtype/storm/messaging/netty/Server.java   |  7 +++++++
 .../src/jvm/backtype/storm/utils/Utils.java      | 19 -------------------
 5 files changed, 19 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c5e7a0d0/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 4c15fce..cd40f5d 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -93,6 +93,8 @@ supervisor.enable: true
 ### worker.* configs are for task workers
 worker.childopts: "-Xmx768m"
 worker.heartbeat.frequency.secs: 1
+
+# control how many worker receiver threads we need per worker 
 worker.receiver.thread.count: 1
 
 task.heartbeat.frequency.secs: 3
@@ -109,10 +111,15 @@ storm.messaging.netty.buffer_size: 5242880 #5MB buffer
 storm.messaging.netty.max_retries: 30
 storm.messaging.netty.max_wait_ms: 1000
 storm.messaging.netty.min_wait_ms: 100
+
+# If the Netty messaging layer is busy(netty internal buffer not writable), the Netty client will try to batch message as more as possible up to the size of storm.messaging.netty.transfer.batch.size bytes, otherwise it will try to flush message as soon as possible to reduce latency.
 storm.messaging.netty.transfer.batch.size: 262144
+
+# If storm.messaging.netty.blocking is set to true, the Netty Client will send messages in synchronized way, otherwise it will do it in async way. Set storm.messaging.netty.blocking to false to improve the latency and throughput.
 storm.messaging.netty.blocking: false
-storm.messaging.netty.flush.check.interval.ms: 10
 
+# We check with this interval that whether the Netty channel is writable and try to write pending messages if it is.
+storm.messaging.netty.flush.check.interval.ms: 10
 
 ### topology.* configs are for specific executing storms
 topology.enable.message.timeouts: true

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c5e7a0d0/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 46fd47a..85a904c 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -117,7 +117,7 @@ public class Client implements IConnection {
                 }
                 
             }
-        }, "netty-client-flush-checker");
+        }, name() + "-flush-checker");
         
         flushChecker.setDaemon(true);
         flushChecker.start();
@@ -224,6 +224,8 @@ public class Client implements IConnection {
                 flushRequest(channel, toBeFlushed, blocking);
                 
             } else {
+                // when channel is NOT writable, it means the internal netty buffer is full. 
+                // In this case, we can try to buffer up more incoming messages.
                 flushCheckTimer.set(System.currentTimeMillis() + flushCheckInterval);
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c5e7a0d0/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
index 8291d78..72c3cf7 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
@@ -45,6 +45,7 @@ public class MessageDecoder extends FrameDecoder {
 
         List<Object> ret = new ArrayList<Object>();
 
+        // Use while loop, try to decode as more messages as possible in single call
         while (available >= 2) {
 
             // Mark the current buffer position before reading task/len field

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c5e7a0d0/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
index 71f01e0..f51af75 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
@@ -46,7 +46,11 @@ class Server implements IConnection {
     @SuppressWarnings("rawtypes")
     Map storm_conf;
     int port;
+    
+    // Create multiple queues for incoming messages. The size equals the number of receiver threads.
+    // For message which is sent to same task, it will be stored in the same queue to preserve the message order.
     private LinkedBlockingQueue<ArrayList<TaskMessage>>[] message_queue;
+    
     volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server");
     final ChannelFactory factory;
     final ServerBootstrap bootstrap;
@@ -126,12 +130,15 @@ class Server implements IConnection {
     }
     
     private Integer getMessageQueueId(int task) {
+      // try to construct the map from taskId -> queueId in round robin manner.
+      
       Integer queueId = taskToQueueId.get(task);
       if (null == queueId) {
         synchronized(taskToQueueId) {
           //assgin task to queue in round-robin manner
           if (null == taskToQueueId.get(task)) {
             queueId = roundRobinQueueId++;
+            
             taskToQueueId.put(task, queueId);
             if (roundRobinQueueId == queueCount) {
               roundRobinQueueId = 0;

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c5e7a0d0/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java
index b1892f1..6a0a447 100644
--- a/storm-core/src/jvm/backtype/storm/utils/Utils.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java
@@ -400,25 +400,6 @@ public class Utils {
         return ret;
     }
     
-    public static void redirectStreamAsync(Process process) {
-      redirectStreamAsync(process.getInputStream(), System.out);
-      redirectStreamAsync(process.getErrorStream(), System.err);
-    }
-    
-    static void redirectStreamAsync(final InputStream input,
-        final PrintStream output) {
-      new Thread(new Runnable() {
-        @Override
-        public void run() {
-          Scanner scanner = new Scanner(input);
-          while (scanner.hasNextLine()) {
-            output.println(scanner.nextLine());
-          }
-        }
-      }).start();
-    }
- 
-
     public static CuratorFramework newCuratorStarted(Map conf, List<String> servers, Object port) {
         CuratorFramework ret = newCurator(conf, servers, port);
         ret.start();


[08/32] git commit: STORM-297: Use recv(int flags, int clientId) to replace recv(int flags) in IConnection interface

Posted by bo...@apache.org.
STORM-297: Use recv(int flags, int clientId) to replace recv(int flags) in IConnection interface


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/c5c3571c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/c5c3571c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/c5c3571c

Branch: refs/heads/master
Commit: c5c3571ca15ee2dd675fb3cac44bd0f926ccfc67
Parents: 138a7a7
Author: Sean Zhong <cl...@gmail.com>
Authored: Tue May 20 10:56:26 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Tue May 20 10:56:26 2014 +0800

----------------------------------------------------------------------
 storm-core/src/clj/backtype/storm/messaging/local.clj   | 10 ----------
 .../src/jvm/backtype/storm/messaging/IConnection.java   |  6 ------
 .../src/jvm/backtype/storm/messaging/netty/Client.java  |  5 +----
 .../src/jvm/backtype/storm/messaging/netty/Server.java  | 12 +-----------
 .../clj/backtype/storm/messaging/netty_unit_test.clj    |  8 ++++----
 5 files changed, 6 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c5c3571c/storm-core/src/clj/backtype/storm/messaging/local.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/messaging/local.clj b/storm-core/src/clj/backtype/storm/messaging/local.clj
index de14806..801f22d 100644
--- a/storm-core/src/clj/backtype/storm/messaging/local.clj
+++ b/storm-core/src/clj/backtype/storm/messaging/local.clj
@@ -31,16 +31,6 @@
 
 (deftype LocalConnection [storm-id port queues-map lock queue]
   IConnection
-  (^Iterator recv [this ^int flags]
-    (when-not queue
-      (throw (IllegalArgumentException. "Cannot receive on this socket")))
-    (let [ret (ArrayList.)
-          msg (if (= flags 1) (.poll queue) (.take queue))]
-      (if msg
-        (do 
-          (.add ret msg)
-          (.iterator ret))
-        nil)))
   (^Iterator recv [this ^int flags ^int clientId]
     (when-not queue
       (throw (IllegalArgumentException. "Cannot receive on this socket")))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c5c3571c/storm-core/src/jvm/backtype/storm/messaging/IConnection.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/IConnection.java b/storm-core/src/jvm/backtype/storm/messaging/IConnection.java
index fe9caa7..ead4935 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/IConnection.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/IConnection.java
@@ -20,12 +20,6 @@ package backtype.storm.messaging;
 import java.util.Iterator;
 
 public interface IConnection {   
-    /**
-     * receive a batch message iterator (consists taskId and payload)
-     * @param flags 0: block, 1: non-block
-     * @return
-     */
-    public Iterator<TaskMessage> recv(int flags);
     
     /**
      * receive a batch message iterator (consists taskId and payload)

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c5c3571c/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 85a904c..8f0d7af 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -294,10 +294,7 @@ public class Client implements IConnection {
         }
     }
 
-    public Iterator<TaskMessage> recv(int flags) {
-        throw new RuntimeException("Client connection should not receive any messages");
-    }
-
+    @Override
     public Iterator<TaskMessage> recv(int flags, int clientId) {
         throw new RuntimeException("Client connection should not receive any messages");
     }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c5c3571c/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
index d551f02..20a147d 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
@@ -172,16 +172,6 @@ class Server implements IConnection {
           message_queue[receiverId].put(msgGroup);
         }
       }
-   }
-    
-    /**
-     * fetch a message from message queue synchronously (flags != 1) or asynchronously (flags==1)
-     */
-    public Iterator<TaskMessage> recv(int flags)  {
-      if (queueCount > 1) {
-        throw new RuntimeException("Use recv(int flags, int clientId) instead, as we have worker.receiver.thread.count=" + queueCount + " receive threads, clientId should be 0 <= clientId < " + queueCount);
-      }
-      return recv(flags, 0);
     }
     
     public Iterator<TaskMessage> recv(int flags, int receiverId)  {
@@ -210,7 +200,7 @@ class Server implements IConnection {
       }
       return null;
     }
-
+   
     /**
      * register a newly created channel
      * @param channel

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c5c3571c/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
index d76e245..ea7b8dc 100644
--- a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
@@ -37,7 +37,7 @@
         server (.bind context nil port)
         client (.connect context nil "localhost" port)
         _ (.send client task (.getBytes req_msg))
-        iter (.recv server 0)
+        iter (.recv server 0 0)
         resp (.next iter)]
     (is (= task (.task resp)))
     (is (= req_msg (String. (.message resp))))
@@ -59,7 +59,7 @@
         server (.bind context nil port)
         client (.connect context nil "localhost" port)
         _ (.send client task (.getBytes req_msg))
-        iter (.recv server 0)
+        iter (.recv server 0 0)
         resp (.next iter)]
     (is (= task (.task resp)))
     (is (= req_msg (String. (.message resp))))
@@ -84,7 +84,7 @@
                 (fn []
                   (Thread/sleep 1000)
                   (let [server (.bind context nil port)
-                        iter (.recv server 0)
+                        iter (.recv server 0 0)
                         resp (.next iter)]
                     (is (= task (.task resp)))
                     (is (= req_msg (String. (.message resp))))
@@ -116,7 +116,7 @@
     (let [resp (ArrayList.)
           received (atom 0)]
       (while (< @received (- 100000 1))
-        (let [iter (.recv server 0)]
+        (let [iter (.recv server 0 0)]
           (while (.hasNext iter)
             (let [msg (.next iter)]
               (.add resp msg)


[13/32] git commit: STORM-297, add timeout when trying to send pending batches in close()

Posted by bo...@apache.org.
STORM-297, add timeout when trying to send pending batches in close()


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/d85c4707
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/d85c4707
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/d85c4707

Branch: refs/heads/master
Commit: d85c470747c22fa47a1f914388861b018ce9acc8
Parents: fe68fd9
Author: Sean Zhong <cl...@gmail.com>
Authored: Tue May 20 11:59:18 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Tue May 20 11:59:18 2014 +0800

----------------------------------------------------------------------
 .../src/jvm/backtype/storm/messaging/netty/Client.java  | 12 ++++++++++++
 1 file changed, 12 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/d85c4707/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 00ca449..d3980d4 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -260,6 +260,8 @@ public class Client implements IConnection {
     public synchronized void close() {
         if (!closing) {
             closing = true;
+            LOG.info("Closing Netty Client " + name());
+            
             if (null != messageBatch && !messageBatch.isEmpty()) {
                 MessageBatch toBeFlushed = messageBatch;
                 Channel channel = channelRef.get();
@@ -270,8 +272,18 @@ public class Client implements IConnection {
             }
         
             //wait for pendings to exit
+            final long timeoutMilliSeconds = 600 * 1000; //600 seconds
+            final long start = System.currentTimeMillis();
+            
+            LOG.info("Waiting for pending batchs to be sent with "+ name() + "..., timeout: {}ms, pendings: {}", timeoutMilliSeconds, pendings.get());
+            
             while(pendings.get() != 0) {
                 try {
+                    long delta = System.currentTimeMillis() - start;
+                    if (delta > timeoutMilliSeconds) {
+                        LOG.error("Timeout when sending pending batchs with {}..., there are still {} pending batchs not sent", name(), pendings.get());
+                        break;
+                    }
                     Thread.sleep(1000); //sleep 1s
                 } catch (InterruptedException e) {
                     break;


[28/32] git commit: STORM-297: avoid call IConneciton.send() when there is no message in the iterator.

Posted by bo...@apache.org.
STORM-297: avoid call IConneciton.send() when there is no message in the iterator.


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/b4422f13
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/b4422f13
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/b4422f13

Branch: refs/heads/master
Commit: b4422f13b6c609b5f68eb0a053f4bc08182f97d5
Parents: f6915d4
Author: Sean Zhong <cl...@gmail.com>
Authored: Sun Jun 8 01:03:05 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Sun Jun 8 01:03:05 2014 +0800

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b4422f13/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java b/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java
index c9f1b04..0e53632 100644
--- a/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java
+++ b/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java
@@ -50,7 +50,7 @@ public class TransferDrainer {
       if (null != connection) { 
         ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(hostPort);
         Iterator<TaskMessage> iter = getBundleIterator(bundle);
-        if (null != iter) {
+        if (null != iter && iter.hasNext()) {
           connection.send(iter);
         }
       }


[24/32] git commit: STORM-297: fix flusher bug, dead loop

Posted by bo...@apache.org.
STORM-297: fix flusher bug, dead loop


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/0bca1733
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/0bca1733
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/0bca1733

Branch: refs/heads/master
Commit: 0bca1733d64a92d8ec411f5fdf41ebc90a1bddef
Parents: 426d143
Author: Sean Zhong <cl...@gmail.com>
Authored: Tue Jun 3 14:53:10 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Tue Jun 3 14:53:10 2014 +0800

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/messaging/netty/Client.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/0bca1733/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 9765647..8d2d221 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -109,7 +109,7 @@ public class Client implements IConnection {
             @Override
             public void run() {
 
-                while(!closing) {
+                if(!closing) {
                     long flushCheckTime = flushCheckTimer.get();
                     long now = System.currentTimeMillis();
                     if (now > flushCheckTime) {


[06/32] git commit: STORM-297, fix the indention

Posted by bo...@apache.org.
STORM-297, fix the indention


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/deb55ec9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/deb55ec9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/deb55ec9

Branch: refs/heads/master
Commit: deb55ec918126400ed94ee49f88c32b9ffda7b5d
Parents: c5e7a0d
Author: Sean Zhong <cl...@gmail.com>
Authored: Tue May 20 10:33:15 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Tue May 20 10:33:15 2014 +0800

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/messaging/netty/Server.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/deb55ec9/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
index f51af75..48ca3fb 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
@@ -73,7 +73,7 @@ class Server implements IConnection {
         taskToQueueId = new HashMap<Integer, Integer>();
     
         message_queue = new LinkedBlockingQueue[queueCount];
-		    for (int i = 0; i < queueCount; i++) {
+        for (int i = 0; i < queueCount; i++) {
             message_queue[i] = new LinkedBlockingQueue<ArrayList<TaskMessage>>();
         }
         
@@ -247,7 +247,7 @@ class Server implements IConnection {
       throw new RuntimeException("Server connection should not send any messages");
     }
 	
-	 public String name() {
+    public String name() {
       return "Netty-server-localhost-" + port;
     }
 }


[02/32] git commit: STORM-297, add thread factory and TransferDrainer

Posted by bo...@apache.org.
STORM-297, add thread factory and TransferDrainer


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/be8f8327
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/be8f8327
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/be8f8327

Branch: refs/heads/master
Commit: be8f8327b3c2d36e89420d62fb8ca530ba726cf3
Parents: 861a92e
Author: Sean Zhong <cl...@gmail.com>
Authored: Wed May 7 11:17:53 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Wed May 7 11:17:53 2014 +0800

----------------------------------------------------------------------
 .../netty/NettyRenameThreadFactory.java         |  35 ++++++
 .../backtype/storm/utils/TransferDrainer.java   | 113 +++++++++++++++++++
 2 files changed, 148 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/be8f8327/storm-core/src/jvm/backtype/storm/messaging/netty/NettyRenameThreadFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/NettyRenameThreadFactory.java b/storm-core/src/jvm/backtype/storm/messaging/netty/NettyRenameThreadFactory.java
new file mode 100644
index 0000000..ea3f249
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/NettyRenameThreadFactory.java
@@ -0,0 +1,35 @@
+package backtype.storm.messaging.netty;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jboss.netty.util.ThreadNameDeterminer;
+import org.jboss.netty.util.ThreadRenamingRunnable;
+
+public class NettyRenameThreadFactory  implements ThreadFactory {
+    
+    static {
+      //Rename Netty threads
+      ThreadRenamingRunnable.setThreadNameDeterminer(ThreadNameDeterminer.CURRENT);
+    }
+  
+    final ThreadGroup group;
+    final AtomicInteger index = new AtomicInteger(1);
+    final String name;
+
+    NettyRenameThreadFactory(String name) {
+        SecurityManager s = System.getSecurityManager();
+        group = (s != null)? s.getThreadGroup() :
+                             Thread.currentThread().getThreadGroup();
+        this.name = name;
+    }
+
+    public Thread newThread(Runnable r) {
+        Thread t = new Thread(group, r, name + "-" + index.getAndIncrement(), 0);
+        if (t.isDaemon())
+            t.setDaemon(false);
+        if (t.getPriority() != Thread.NORM_PRIORITY)
+            t.setPriority(Thread.NORM_PRIORITY);
+        return t;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/be8f8327/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java b/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java
new file mode 100644
index 0000000..c9f1b04
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.utils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import backtype.storm.messaging.IConnection;
+import backtype.storm.messaging.TaskMessage;
+
+public class TransferDrainer {
+
+  private HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundles = new HashMap();
+  
+  public void add(HashMap<String, ArrayList<TaskMessage>> workerTupleSetMap) {
+    for (String key : workerTupleSetMap.keySet()) {
+      
+      ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(key);
+      if (null == bundle) {
+        bundle = new ArrayList<ArrayList<TaskMessage>>();
+        bundles.put(key, bundle);
+      }
+      
+      ArrayList tupleSet = workerTupleSetMap.get(key);
+      if (null != tupleSet && tupleSet.size() > 0) {
+        bundle.add(tupleSet);
+      }
+    } 
+  }
+  
+  public void send(HashMap<String, IConnection> connections) {
+    for (String hostPort : bundles.keySet()) {
+      IConnection connection = connections.get(hostPort);
+      if (null != connection) { 
+        ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(hostPort);
+        Iterator<TaskMessage> iter = getBundleIterator(bundle);
+        if (null != iter) {
+          connection.send(iter);
+        }
+      }
+    } 
+  }
+  
+  private Iterator<TaskMessage> getBundleIterator(final ArrayList<ArrayList<TaskMessage>> bundle) {
+    
+    if (null == bundle) {
+      return null;
+    }
+    
+    return new Iterator<TaskMessage> () {
+      
+      private int offset = 0;
+      private int size = 0;
+      {
+        for (ArrayList<TaskMessage> list : bundle) {
+            size += list.size();
+        }
+      }
+      
+      private int bundleOffset = 0;
+      private Iterator<TaskMessage> iter = bundle.get(bundleOffset).iterator();
+      
+      @Override
+      public boolean hasNext() {
+        if (offset < size) {
+          return true;
+        }
+        return false;
+      }
+
+      @Override
+      public TaskMessage next() {
+        TaskMessage msg = null;
+        if (iter.hasNext()) {
+          msg = iter.next(); 
+        } else {
+          bundleOffset++;
+          iter = bundle.get(bundleOffset).iterator();
+          msg = iter.next();
+        }
+        if (null != msg) {
+          offset++;
+        }
+        return msg;
+      }
+
+      @Override
+      public void remove() {
+        throw new RuntimeException("not supported");
+      }
+    };
+  }
+  
+  public void clear() {
+    bundles.clear();
+  }
+}
\ No newline at end of file


[29/32] git commit: STORM-297: wait for the bolt to finish before killing the topology, otherwise netty client may be killed message before all bolts finish.

Posted by bo...@apache.org.
STORM-297: wait for the bolt to finish before killing the topology, otherwise netty client may be killed message before all bolts finish.


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/5f7520af
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/5f7520af
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/5f7520af

Branch: refs/heads/master
Commit: 5f7520af012f9e5ab3bf7e051b0410aaf765bd61
Parents: b4422f1
Author: Sean Zhong <cl...@gmail.com>
Authored: Sun Jun 8 01:55:50 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Sun Jun 8 01:55:50 2014 +0800

----------------------------------------------------------------------
 storm-core/src/clj/backtype/storm/testing.clj         | 9 +++++++--
 storm-core/test/clj/backtype/storm/messaging_test.clj | 2 +-
 2 files changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5f7520af/storm-core/src/clj/backtype/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/testing.clj b/storm-core/src/clj/backtype/storm/testing.clj
index 3ce2c3f..9a61d75 100644
--- a/storm-core/src/clj/backtype/storm/testing.clj
+++ b/storm-core/src/clj/backtype/storm/testing.clj
@@ -435,7 +435,8 @@
     ))
 
 ;; TODO: mock-sources needs to be able to mock out state spouts as well
-(defnk complete-topology [cluster-map topology :mock-sources {} :storm-conf {} :cleanup-state true :topology-name nil]
+;; kill-waiting: seconds to wait before we kill the topology after all spout completed
+(defnk complete-topology [cluster-map topology :mock-sources {} :storm-conf {} :cleanup-state true :topology-name nil :kill-waiting 0]
   ;; TODO: the idea of mocking for transactional topologies should be done an
   ;; abstraction level above... should have a complete-transactional-topology for this
   (let [{topology :topology capturer :capturer} (capture-topology topology)
@@ -470,7 +471,11 @@
     (let [storm-id (common/get-storm-id state storm-name)]
       (while-timeout TEST-TIMEOUT-MS (not (every? exhausted? (spout-objects spouts)))
         (simulate-wait cluster-map))
-
+      
+      ;; spout finished not necesary means the topology finished, If the topology requires more time, set the kill-waiting
+      ;; to bigger value    
+      (Thread/sleep (* 1000 kill-waiting))
+      
       (.killTopologyWithOpts (:nimbus cluster-map) storm-name (doto (KillOptions.) (.set_wait_secs 0)))
       (while-timeout TEST-TIMEOUT-MS (.assignment-info state storm-id nil)
         (simulate-wait cluster-map))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5f7520af/storm-core/test/clj/backtype/storm/messaging_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/messaging_test.clj b/storm-core/test/clj/backtype/storm/messaging_test.clj
index c719c68..aefee0a 100644
--- a/storm-core/test/clj/backtype/storm/messaging_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging_test.clj
@@ -85,7 +85,7 @@
                                                  :parallelism-hint 4)
                         })
             results (complete-topology cluster
-                                       topology)]
+                                       topology :kill-waiting 10)]
         
         ;; No error Tuple from Bolt TestEventOrderCheckBolt
         (is (empty? (read-tuples results "2"))))))


[15/32] git commit: STORM-297, remove config option netty.blocking

Posted by bo...@apache.org.
STORM-297, remove config option netty.blocking


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/44e47e82
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/44e47e82
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/44e47e82

Branch: refs/heads/master
Commit: 44e47e82d03743cbec3fe6aff17c76711520c061
Parents: 1baf9b7
Author: Sean Zhong <cl...@gmail.com>
Authored: Tue May 20 12:06:40 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Tue May 20 12:06:40 2014 +0800

----------------------------------------------------------------------
 conf/defaults.yaml | 3 ---
 1 file changed, 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/44e47e82/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index cd40f5d..a5c2c39 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -115,9 +115,6 @@ storm.messaging.netty.min_wait_ms: 100
 # If the Netty messaging layer is busy(netty internal buffer not writable), the Netty client will try to batch message as more as possible up to the size of storm.messaging.netty.transfer.batch.size bytes, otherwise it will try to flush message as soon as possible to reduce latency.
 storm.messaging.netty.transfer.batch.size: 262144
 
-# If storm.messaging.netty.blocking is set to true, the Netty Client will send messages in synchronized way, otherwise it will do it in async way. Set storm.messaging.netty.blocking to false to improve the latency and throughput.
-storm.messaging.netty.blocking: false
-
 # We check with this interval that whether the Netty channel is writable and try to write pending messages if it is.
 storm.messaging.netty.flush.check.interval.ms: 10
 


[07/32] git commit: STORM-297, use config name instead of literal constants

Posted by bo...@apache.org.
STORM-297, use config name instead of literal constants


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/138a7a7f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/138a7a7f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/138a7a7f

Branch: refs/heads/master
Commit: 138a7a7f0bc4aceb829915106673b969552cee85
Parents: deb55ec
Author: Sean Zhong <cl...@gmail.com>
Authored: Tue May 20 10:34:36 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Tue May 20 10:34:36 2014 +0800

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/messaging/netty/Server.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/138a7a7f/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
index 48ca3fb..d551f02 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
@@ -68,7 +68,7 @@ class Server implements IConnection {
         this.storm_conf = storm_conf;
         this.port = port;
         
-        queueCount = Utils.getInt(storm_conf.get("worker.receiver.thread.count"), 1);
+        queueCount = Utils.getInt(storm_conf.get(Config.WORKER_RECEIVER_THREAD_COUNT), 1);
         roundRobinQueueId = 0;
         taskToQueueId = new HashMap<Integer, Integer>();
     


[03/32] git commit: add thread name for flush checker

Posted by bo...@apache.org.
add thread name for flush checker


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/70a46be1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/70a46be1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/70a46be1

Branch: refs/heads/master
Commit: 70a46be1fd24935f92c5d27e509d0800ae950f80
Parents: be8f832
Author: Sean Zhong <cl...@gmail.com>
Authored: Thu May 8 09:58:08 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Thu May 8 09:58:08 2014 +0800

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/messaging/netty/Client.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/70a46be1/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 09b045b..46fd47a 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -117,7 +117,7 @@ public class Client implements IConnection {
                 }
                 
             }
-        });
+        }, "netty-client-flush-checker");
         
         flushChecker.setDaemon(true);
         flushChecker.start();


[20/32] git commit: STORM-297: give timer thread a meaningful name

Posted by bo...@apache.org.
STORM-297: give timer thread a meaningful name


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/04b4907e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/04b4907e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/04b4907e

Branch: refs/heads/master
Commit: 04b4907e10c152072dcb1fcdae9d0198d7dc9603
Parents: f18d98f
Author: Sean Zhong <cl...@gmail.com>
Authored: Tue May 20 22:10:43 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Tue May 20 22:10:43 2014 +0800

----------------------------------------------------------------------
 storm-core/src/clj/backtype/storm/daemon/worker.clj | 15 ++++++++-------
 storm-core/src/clj/backtype/storm/timer.clj         |  5 +++--
 2 files changed, 11 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/04b4907e/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj
index 2648237..437e8dd 100644
--- a/storm-core/src/clj/backtype/storm/daemon/worker.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj
@@ -165,11 +165,12 @@
   ;; actually just do it via interfaces. just need to make sure to hide setResource from tasks
   {})
 
-(defn mk-halting-timer []
+(defn mk-halting-timer [timer-name]
   (mk-timer :kill-fn (fn [t]
                        (log-error t "Error when processing event")
                        (halt-process! 20 "Error when processing an event")
-                       )))
+                       )
+            :timer-name timer-name))
 
 (defn worker-data [conf mq-context storm-id assignment-id port worker-id]
   (let [cluster-state (cluster/mk-distributed-cluster-state conf)
@@ -202,11 +203,11 @@
       :storm-conf storm-conf
       :topology topology
       :system-topology (system-topology! storm-conf topology)
-      :heartbeat-timer (mk-halting-timer)
-      :refresh-connections-timer (mk-halting-timer)
-      :refresh-active-timer (mk-halting-timer)
-      :executor-heartbeat-timer (mk-halting-timer)
-      :user-timer (mk-halting-timer)
+      :heartbeat-timer (mk-halting-timer "heartbeat-timer")
+      :refresh-connections-timer (mk-halting-timer "refresh-connections-timer")
+      :refresh-active-timer (mk-halting-timer "refresh-active-timer")
+      :executor-heartbeat-timer (mk-halting-timer "executor-heartbeat-timer")
+      :user-timer (mk-halting-timer "user-timer")
       :task->component (HashMap. (storm-task-info topology storm-conf)) ; for optimized access when used in tasks later on
       :component->stream->fields (component->stream->fields (:system-topology <>))
       :component->sorted-tasks (->> (:task->component <>) reverse-map (map-val sort))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/04b4907e/storm-core/src/clj/backtype/storm/timer.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/timer.clj b/storm-core/src/clj/backtype/storm/timer.clj
index 78b1f1c..9c5a99f 100644
--- a/storm-core/src/clj/backtype/storm/timer.clj
+++ b/storm-core/src/clj/backtype/storm/timer.clj
@@ -23,7 +23,7 @@
 ;; The timer defined in this file is very similar to java.util.Timer, except it integrates with
 ;; Storm's time simulation capabilities. This lets us test code that does asynchronous work on the timer thread
 
-(defnk mk-timer [:kill-fn (fn [& _] )]
+(defnk mk-timer [:kill-fn (fn [& _] ) :timer-name nil]
   (let [queue (PriorityQueue. 10
                               (reify Comparator
                                 (compare [this o1 o2]
@@ -35,6 +35,7 @@
         active (atom true)
         lock (Object.)
         notifier (Semaphore. 0)
+        thread-name (if timer-name timer-name "timer")
         timer-thread (Thread.
                       (fn []
                         (while @active
@@ -63,7 +64,7 @@
                                 (reset! active false)
                                 (throw t))
                               )))
-                        (.release notifier)))]
+                        (.release notifier)) thread-name)]
     (.setDaemon timer-thread true)
     (.setPriority timer-thread Thread/MAX_PRIORITY)
     (.start timer-thread)


[30/32] git commit: Revert "STORM-297: wait for the bolt to finish before killing the topology, otherwise netty client may be killed message before all bolts finish."

Posted by bo...@apache.org.
Revert "STORM-297: wait for the bolt to finish before killing the topology, otherwise netty client may be killed message before all bolts finish."

This reverts commit 5f7520af012f9e5ab3bf7e051b0410aaf765bd61.


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/f7a78023
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/f7a78023
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/f7a78023

Branch: refs/heads/master
Commit: f7a78023abf5119526d0e6e9c8d9e6122545cafb
Parents: 5f7520a
Author: Sean Zhong <cl...@gmail.com>
Authored: Sun Jun 8 03:37:52 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Sun Jun 8 03:37:52 2014 +0800

----------------------------------------------------------------------
 storm-core/src/clj/backtype/storm/testing.clj         | 9 ++-------
 storm-core/test/clj/backtype/storm/messaging_test.clj | 2 +-
 2 files changed, 3 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f7a78023/storm-core/src/clj/backtype/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/testing.clj b/storm-core/src/clj/backtype/storm/testing.clj
index 9a61d75..3ce2c3f 100644
--- a/storm-core/src/clj/backtype/storm/testing.clj
+++ b/storm-core/src/clj/backtype/storm/testing.clj
@@ -435,8 +435,7 @@
     ))
 
 ;; TODO: mock-sources needs to be able to mock out state spouts as well
-;; kill-waiting: seconds to wait before we kill the topology after all spout completed
-(defnk complete-topology [cluster-map topology :mock-sources {} :storm-conf {} :cleanup-state true :topology-name nil :kill-waiting 0]
+(defnk complete-topology [cluster-map topology :mock-sources {} :storm-conf {} :cleanup-state true :topology-name nil]
   ;; TODO: the idea of mocking for transactional topologies should be done an
   ;; abstraction level above... should have a complete-transactional-topology for this
   (let [{topology :topology capturer :capturer} (capture-topology topology)
@@ -471,11 +470,7 @@
     (let [storm-id (common/get-storm-id state storm-name)]
       (while-timeout TEST-TIMEOUT-MS (not (every? exhausted? (spout-objects spouts)))
         (simulate-wait cluster-map))
-      
-      ;; spout finished not necesary means the topology finished, If the topology requires more time, set the kill-waiting
-      ;; to bigger value    
-      (Thread/sleep (* 1000 kill-waiting))
-      
+
       (.killTopologyWithOpts (:nimbus cluster-map) storm-name (doto (KillOptions.) (.set_wait_secs 0)))
       (while-timeout TEST-TIMEOUT-MS (.assignment-info state storm-id nil)
         (simulate-wait cluster-map))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f7a78023/storm-core/test/clj/backtype/storm/messaging_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/messaging_test.clj b/storm-core/test/clj/backtype/storm/messaging_test.clj
index aefee0a..c719c68 100644
--- a/storm-core/test/clj/backtype/storm/messaging_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging_test.clj
@@ -85,7 +85,7 @@
                                                  :parallelism-hint 4)
                         })
             results (complete-topology cluster
-                                       topology :kill-waiting 10)]
+                                       topology)]
         
         ;; No error Tuple from Bolt TestEventOrderCheckBolt
         (is (empty? (read-tuples results "2"))))))


[04/32] git commit: STORM-257, update the config name in defaults.yaml

Posted by bo...@apache.org.
STORM-257, update the config name in defaults.yaml


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/2812e511
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/2812e511
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/2812e511

Branch: refs/heads/master
Commit: 2812e511c025f39308ba30be921ed0ab01d15e94
Parents: 70a46be
Author: Sean Zhong <cl...@gmail.com>
Authored: Thu May 8 10:05:49 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Thu May 8 10:05:49 2014 +0800

----------------------------------------------------------------------
 conf/defaults.yaml                            | 5 +++++
 storm-core/src/jvm/backtype/storm/Config.java | 6 +++---
 2 files changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2812e511/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 2dbba24..4c15fce 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -93,6 +93,7 @@ supervisor.enable: true
 ### worker.* configs are for task workers
 worker.childopts: "-Xmx768m"
 worker.heartbeat.frequency.secs: 1
+worker.receiver.thread.count: 1
 
 task.heartbeat.frequency.secs: 3
 task.refresh.poll.secs: 10
@@ -108,6 +109,10 @@ storm.messaging.netty.buffer_size: 5242880 #5MB buffer
 storm.messaging.netty.max_retries: 30
 storm.messaging.netty.max_wait_ms: 1000
 storm.messaging.netty.min_wait_ms: 100
+storm.messaging.netty.transfer.batch.size: 262144
+storm.messaging.netty.blocking: false
+storm.messaging.netty.flush.check.interval.ms: 10
+
 
 ### topology.* configs are for specific executing storms
 topology.enable.message.timeouts: true

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2812e511/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 98718a3..fa5d9d1 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -88,19 +88,19 @@ public class Config extends HashMap<String, Object> {
     /**
      * If the Netty messaging layer is busy, the Netty client will try to batch message as more as possible up to the size of STORM_NETTY_MESSAGE_BATCH_SIZE bytes
      */
-    public static final String STORM_NETTY_MESSAGE_BATCH_SIZE = "netty.transfer.batch.size";
+    public static final String STORM_NETTY_MESSAGE_BATCH_SIZE = "storm.messaging.netty.transfer.batch.size";
     public static final Object STORM_NETTY_MESSAGE_BATCH_SIZE_SCHEMA = Number.class;
 
     /**
      * This control whether we do Netty message transfer in a synchronized way or async way. 
      */
-    public static final String STORM_NETTY_BLOCKING = "netty.blocking";
+    public static final String STORM_NETTY_BLOCKING = "storm.messaging.netty.blocking";
     public static final Object STORM_NETTY_BLOCKING_SCHEMA = Boolean.class;
     
     /**
      * We check with this interval that whether the Netty channel is writable and try to write pending messages
      */
-    public static final String STORM_NETTY_FLUSH_CHECK_INTERVAL_MS = "netty.flush.check.interval.ms";
+    public static final String STORM_NETTY_FLUSH_CHECK_INTERVAL_MS = "storm.messaging.netty.flush.check.interval.ms";
     public static final Object STORM_NETTY_FLUSH_CHECK_INTERVAL_MS_SCHEMA = Number.class;
     
     


[32/32] git commit: Merge branch 'storm_async_netty_and_batch_api' of https://github.com/clockfly/incubator-storm into STORM-297

Posted by bo...@apache.org.
Merge branch 'storm_async_netty_and_batch_api' of https://github.com/clockfly/incubator-storm into STORM-297

Pulled in test fixes for STORM-297


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/1a57fcf6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/1a57fcf6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/1a57fcf6

Branch: refs/heads/master
Commit: 1a57fcf6b015c23f56aea896397bca03fc109ead
Parents: e21642d 23f5724
Author: Robert (Bobby) Evans <bo...@apache.org>
Authored: Mon Jun 9 08:35:29 2014 -0500
Committer: Robert (Bobby) Evans <bo...@apache.org>
Committed: Mon Jun 9 08:35:29 2014 -0500

----------------------------------------------------------------------
 .../storm/testing/TestEventLogSpout.java        | 60 +++++++++++++++-----
 .../backtype/storm/utils/TransferDrainer.java   |  2 +-
 2 files changed, 48 insertions(+), 14 deletions(-)
----------------------------------------------------------------------



[16/32] git commit: STORM-297: avoid duplicate Channel Null Check

Posted by bo...@apache.org.
STORM-297: avoid duplicate Channel Null Check


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/baa31061
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/baa31061
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/baa31061

Branch: refs/heads/master
Commit: baa310618ed639c312f2281bc8364d55e5977174
Parents: 44e47e8
Author: Sean Zhong <cl...@gmail.com>
Authored: Tue May 20 12:26:49 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Tue May 20 12:26:49 2014 +0800

----------------------------------------------------------------------
 .../src/jvm/backtype/storm/messaging/netty/Client.java   | 11 ++++-------
 1 file changed, 4 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/baa31061/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index d3980d4..59a8a5c 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -104,7 +104,7 @@ public class Client implements IConnection {
                     if (now > flushCheckTime) {
                         Channel channel = channelRef.get();
                         if (null != channel && channel.isWritable()) {
-                            flush();
+                            flush(channel);
                         }
                     }
                     try {
@@ -237,15 +237,12 @@ public class Client implements IConnection {
         return "";
     }
 
-    private synchronized void flush() {
+    private synchronized void flush(Channel channel) {
         if (!closing) {
             if (null != messageBatch && !messageBatch.isEmpty()) {
                 MessageBatch toBeFlushed = messageBatch;
-                Channel channel = channelRef.get();
-                if (channel != null) {
-                    flushCheckTimer.set(Long.MAX_VALUE);
-                    flushRequest(channel, toBeFlushed);
-                }
+                flushCheckTimer.set(Long.MAX_VALUE);
+                flushRequest(channel, toBeFlushed);
                 messageBatch = null;
             }
         }


[31/32] git commit: STORM-297: use acked spout to avoid topology be shutting down before bolt finish.

Posted by bo...@apache.org.
STORM-297: use acked spout to avoid topology be shutting down before bolt finish.


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/23f57242
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/23f57242
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/23f57242

Branch: refs/heads/master
Commit: 23f572420b4b1b24e79b53067c9a2cb703599756
Parents: f7a7802
Author: Sean Zhong <cl...@gmail.com>
Authored: Sun Jun 8 03:41:46 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Sun Jun 8 03:41:46 2014 +0800

----------------------------------------------------------------------
 .../storm/testing/TestEventLogSpout.java        | 60 +++++++++++++++-----
 1 file changed, 47 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/23f57242/storm-core/src/jvm/backtype/storm/testing/TestEventLogSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/testing/TestEventLogSpout.java b/storm-core/src/jvm/backtype/storm/testing/TestEventLogSpout.java
index a34484d..1570aeb 100644
--- a/storm-core/src/jvm/backtype/storm/testing/TestEventLogSpout.java
+++ b/storm-core/src/jvm/backtype/storm/testing/TestEventLogSpout.java
@@ -17,6 +17,7 @@
  */
 package backtype.storm.testing;
 
+import static backtype.storm.utils.Utils.get;
 import backtype.storm.topology.OutputFieldsDeclarer;
 
 import java.util.HashMap;
@@ -36,25 +37,40 @@ import backtype.storm.tuple.Values;
 public class TestEventLogSpout extends BaseRichSpout {
     public static Logger LOG = LoggerFactory.getLogger(TestEventLogSpout.class);
     
+    private static final Map<String, Integer> acked = new HashMap<String, Integer>();
+    private static final Map<String, Integer> failed = new HashMap<String, Integer>();
+    
     private String uid;
     private long totalCount;
-    private final static Map<String, AtomicLong> totalEmitCount = new HashMap<String, AtomicLong>();
     
     SpoutOutputCollector _collector;
     private long eventId = 0;
     private long myCount;
     private int source;
     
+    public static int getNumAcked(String stormId) {
+        synchronized(acked) {
+            return get(acked, stormId, 0);
+        }
+    }
+
+    public static int getNumFailed(String stormId) {
+        synchronized(failed) {
+            return get(failed, stormId, 0);
+        }
+    }
+    
     public TestEventLogSpout(long totalCount) {
         this.uid = UUID.randomUUID().toString();
-        this.totalCount = totalCount;
         
-        synchronized (totalEmitCount) {
-            if (null == totalEmitCount.get(uid)) {
-                totalEmitCount.put(uid, new AtomicLong(0));
-            }
-            
+        synchronized(acked) {
+            acked.put(uid, 0);
         }
+        synchronized(failed) {
+            failed.put(uid, 0);
+        }
+        
+        this.totalCount = totalCount;
     }
         
     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
@@ -69,13 +85,26 @@ public class TestEventLogSpout extends BaseRichSpout {
     }
     
     public void cleanup() {
-        synchronized(totalEmitCount) {            
-            totalEmitCount.remove(uid);
+        synchronized(acked) {            
+            acked.remove(uid);
+        } 
+        synchronized(failed) {            
+            failed.remove(uid);
         }
     }
     
     public boolean completed() {
-        Long totalEmitted = totalEmitCount.get(uid).get();
+        
+        int ackedAmt;
+        int failedAmt;
+        
+        synchronized(acked) {
+            ackedAmt = acked.get(uid);
+        }
+        synchronized(failed) {
+            failedAmt = failed.get(uid);
+        }
+        int totalEmitted = ackedAmt + failedAmt;
         
         if (totalEmitted >= totalCount) {
             return true;
@@ -87,16 +116,21 @@ public class TestEventLogSpout extends BaseRichSpout {
         if (eventId < myCount) { 
             eventId++;
             _collector.emit(new Values(source, eventId), eventId);
-            totalEmitCount.get(uid).incrementAndGet();
         }        
     }
     
     public void ack(Object msgId) {
-
+        synchronized(acked) {
+            int curr = get(acked, uid, 0);
+            acked.put(uid, curr+1);
+        }
     }
 
     public void fail(Object msgId) {
-        
+        synchronized(failed) {
+            int curr = get(failed, uid, 0);
+            failed.put(uid, curr+1);
+        }
     }
     
     public void declareOutputFields(OutputFieldsDeclarer declarer) {


[27/32] git commit: Added STORM-297 sto Changelog.

Posted by bo...@apache.org.
Added STORM-297 sto Changelog.


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/e21642d8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/e21642d8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/e21642d8

Branch: refs/heads/master
Commit: e21642d87903869ab7f9239afd20c21ae4cb280e
Parents: 701ac94
Author: Robert (Bobby) Evans <bo...@apache.org>
Authored: Sat Jun 7 10:31:59 2014 -0500
Committer: Robert (Bobby) Evans <bo...@apache.org>
Committed: Sat Jun 7 10:31:59 2014 -0500

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e21642d8/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index cdda3fe..17b839c 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.9.2-incubating (unreleased)
+ * STORM-297: Performance scaling with CPU
  * STORM-244: DRPC timeout can return null instead of throwing an exception
  * STORM-63: remove timeout drpc request from its function's request queue
  * STORM-313: Remove log-level-page from logviewer


[21/32] git commit: STORM-297: Add a comment about why using java collection directly in clojure

Posted by bo...@apache.org.
STORM-297: Add a comment about why using java collection directly in clojure


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/9f9a2a55
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/9f9a2a55
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/9f9a2a55

Branch: refs/heads/master
Commit: 9f9a2a556986cc67452081f0bd763e3ed1da8f75
Parents: 04b4907
Author: Sean Zhong <cl...@gmail.com>
Authored: Tue May 20 22:23:05 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Tue May 20 22:23:05 2014 +0800

----------------------------------------------------------------------
 storm-core/src/clj/backtype/storm/daemon/worker.clj | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/9f9a2a55/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj
index 437e8dd..4ae7f0a 100644
--- a/storm-core/src/clj/backtype/storm/daemon/worker.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj
@@ -119,6 +119,8 @@
         (fast-list-iter [[task tuple :as pair] tuple-batch]
           (if (local-tasks task)
             (.add local pair)
+            
+            ;;Using java objects directly to avoid performance issues in java code
             (let [node+port (get @task->node+port task)]
               (when (not (.get remoteMap node+port))
                 (.put remoteMap node+port (ArrayList.)))


[19/32] git commit: STORM-297: tune the client scheduler pool size

Posted by bo...@apache.org.
STORM-297: tune the client scheduler pool size


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/f18d98fc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/f18d98fc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/f18d98fc

Branch: refs/heads/master
Commit: f18d98fc45f89d058ebbd2578e2c9e82cfedc722
Parents: deba558 9eecaf8
Author: Sean Zhong <cl...@gmail.com>
Authored: Tue May 20 21:22:45 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Tue May 20 21:22:45 2014 +0800

----------------------------------------------------------------------

----------------------------------------------------------------------



[11/32] git commit: STORM-297: remove param thread-name when we know we are consuming from a disruptor queue, just use queue name as thread name will suffice

Posted by bo...@apache.org.
STORM-297: remove param thread-name when we know we are consuming from a disruptor queue, just use queue name as thread name will suffice


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/6b9da080
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/6b9da080
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/6b9da080

Branch: refs/heads/master
Commit: 6b9da0804e59935bbe280570bead20954e779c40
Parents: 19e72e8
Author: Sean Zhong <cl...@gmail.com>
Authored: Tue May 20 11:32:20 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Tue May 20 11:32:20 2014 +0800

----------------------------------------------------------------------
 storm-core/src/clj/backtype/storm/disruptor.clj | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/6b9da080/storm-core/src/clj/backtype/storm/disruptor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/disruptor.clj b/storm-core/src/clj/backtype/storm/disruptor.clj
index 28393eb..a199055 100644
--- a/storm-core/src/clj/backtype/storm/disruptor.clj
+++ b/storm-core/src/clj/backtype/storm/disruptor.clj
@@ -83,8 +83,7 @@
 (defn halt-with-interrupt! [^DisruptorQueue queue]
   (.haltWithInterrupt queue))
 
-(defnk consume-loop* [^DisruptorQueue queue handler :kill-fn (fn [error] (halt-process! 1 "Async loop died!"))
-                      :thread-name nil]
+(defnk consume-loop* [^DisruptorQueue queue handler :kill-fn (fn [error] (halt-process! 1 "Async loop died!"))]
   (let [ret (async-loop
               (fn []
                 (consume-batch-when-available queue handler)
@@ -98,5 +97,5 @@
 
 (defmacro consume-loop [queue & handler-args]
   `(let [handler# (handler ~@handler-args)]
-     (consume-loop* ~queue handler# :thread-name (.getName queue))
+     (consume-loop* ~queue handler#)
      ))


[23/32] git commit: STORM-297: add test case to test multiple receiver thread should reserve message order

Posted by bo...@apache.org.
STORM-297: add test case to test multiple receiver thread should reserve message order


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/426d1437
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/426d1437
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/426d1437

Branch: refs/heads/master
Commit: 426d143784ee3610aa3582d43316f1344abd6275
Parents: 20b4f8b
Author: Sean Zhong <cl...@gmail.com>
Authored: Tue Jun 3 14:44:09 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Tue Jun 3 14:44:09 2014 +0800

----------------------------------------------------------------------
 .../storm/testing/TestEventLogSpout.java        | 105 +++++++++++++++++++
 .../storm/testing/TestEventOrderCheckBolt.java  |  76 ++++++++++++++
 .../test/clj/backtype/storm/messaging_test.clj  |  35 ++++++-
 3 files changed, 215 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/426d1437/storm-core/src/jvm/backtype/storm/testing/TestEventLogSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/testing/TestEventLogSpout.java b/storm-core/src/jvm/backtype/storm/testing/TestEventLogSpout.java
new file mode 100644
index 0000000..a34484d
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/testing/TestEventLogSpout.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.testing;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+
+public class TestEventLogSpout extends BaseRichSpout {
+    public static Logger LOG = LoggerFactory.getLogger(TestEventLogSpout.class);
+    
+    private String uid;
+    private long totalCount;
+    private final static Map<String, AtomicLong> totalEmitCount = new HashMap<String, AtomicLong>();
+    
+    SpoutOutputCollector _collector;
+    private long eventId = 0;
+    private long myCount;
+    private int source;
+    
+    public TestEventLogSpout(long totalCount) {
+        this.uid = UUID.randomUUID().toString();
+        this.totalCount = totalCount;
+        
+        synchronized (totalEmitCount) {
+            if (null == totalEmitCount.get(uid)) {
+                totalEmitCount.put(uid, new AtomicLong(0));
+            }
+            
+        }
+    }
+        
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+        _collector = collector;
+        this.source = context.getThisTaskId();
+        long taskCount = context.getComponentTasks(context.getThisComponentId()).size();
+        myCount = totalCount / taskCount;
+    }
+    
+    public void close() {
+        
+    }
+    
+    public void cleanup() {
+        synchronized(totalEmitCount) {            
+            totalEmitCount.remove(uid);
+        }
+    }
+    
+    public boolean completed() {
+        Long totalEmitted = totalEmitCount.get(uid).get();
+        
+        if (totalEmitted >= totalCount) {
+            return true;
+        }
+        return false;
+    }
+        
+    public void nextTuple() {
+        if (eventId < myCount) { 
+            eventId++;
+            _collector.emit(new Values(source, eventId), eventId);
+            totalEmitCount.get(uid).incrementAndGet();
+        }        
+    }
+    
+    public void ack(Object msgId) {
+
+    }
+
+    public void fail(Object msgId) {
+        
+    }
+    
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("source", "eventId"));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/426d1437/storm-core/src/jvm/backtype/storm/testing/TestEventOrderCheckBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/testing/TestEventOrderCheckBolt.java b/storm-core/src/jvm/backtype/storm/testing/TestEventOrderCheckBolt.java
new file mode 100644
index 0000000..1f80362
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/testing/TestEventOrderCheckBolt.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.testing;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+public class TestEventOrderCheckBolt extends BaseRichBolt {
+    public static Logger LOG = LoggerFactory.getLogger(TestEventOrderCheckBolt.class);
+    
+    private int _count;
+    OutputCollector _collector;
+    Map<Integer, Long> recentEventId = new HashMap<Integer, Long>();
+
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        _collector = collector;
+        _count = 0;
+    }
+
+    public void execute(Tuple input) {
+        Integer sourceId = input.getInteger(0);
+        Long eventId = input.getLong(1);
+        Long recentEvent = recentEventId.get(sourceId);
+
+        if (null != recentEvent && eventId <= recentEvent) {
+            String error = "Error: event id is not in strict order! event source Id: "
+                    + sourceId + ", last event Id: " + recentEvent + ", current event Id: " + eventId;
+
+            _collector.emit(input, new Values(error));
+        }
+        recentEventId.put(sourceId, eventId);
+
+        _collector.ack(input);
+    }
+
+    public void cleanup() {
+
+    }
+
+    public Fields getOutputFields() {
+        return new Fields("error");
+    }
+
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("error"));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/426d1437/storm-core/test/clj/backtype/storm/messaging_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/messaging_test.clj b/storm-core/test/clj/backtype/storm/messaging_test.clj
index 94b9168..c719c68 100644
--- a/storm-core/test/clj/backtype/storm/messaging_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging_test.clj
@@ -15,7 +15,7 @@
 ;; limitations under the License.
 (ns backtype.storm.messaging-test
   (:use [clojure test])
-  (:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount])
+  (:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestEventLogSpout TestEventOrderCheckBolt])
   (:use [backtype.storm bootstrap testing])
   (:use [backtype.storm.daemon common])
   )
@@ -56,3 +56,36 @@
         (is (ms= (apply concat (repeat 6 [[1] [2] [3] [4]]))
                  (read-tuples results "2")))))))
 
+(extend-type TestEventLogSpout
+  CompletableSpout
+  (exhausted? [this]
+    (-> this .completed))
+  (cleanup [this]
+    (.cleanup this))
+  (startup [this]
+    ))
+
+;; Test Adding more receiver threads won't violate the message delivery order gurantee
+(deftest test-receiver-message-order 
+  (with-simulated-time-local-cluster [cluster :supervisors 1 :ports-per-supervisor 2
+                                        :daemon-conf {TOPOLOGY-WORKERS 2
+                                                      ;; Configure multiple receiver threads per worker 
+                                                      WORKER-RECEIVER-THREAD-COUNT 2
+                                                      STORM-LOCAL-MODE-ZMQ  true 
+                                                      STORM-MESSAGING-TRANSPORT 
+                                                      "backtype.storm.messaging.netty.Context"}]
+      (let [topology (thrift/mk-topology
+                       
+                       ;; TestEventLogSpout output(sourceId, eventId), eventId is Monotonically increasing
+                       {"1" (thrift/mk-spout-spec (TestEventLogSpout. 4000) :parallelism-hint 8)}
+                       
+                       ;; field grouping, message from same "source" task will be delivered to same bolt task
+                       ;; When received message order is not kept, Emit an error Tuple 
+                       {"2" (thrift/mk-bolt-spec {"1" ["source"]} (TestEventOrderCheckBolt.)
+                                                 :parallelism-hint 4)
+                        })
+            results (complete-topology cluster
+                                       topology)]
+        
+        ;; No error Tuple from Bolt TestEventOrderCheckBolt
+        (is (empty? (read-tuples results "2"))))))


[09/32] git commit: STORM-297: log server error

Posted by bo...@apache.org.
STORM-297: log server error


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/7b88cf35
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/7b88cf35
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/7b88cf35

Branch: refs/heads/master
Commit: 7b88cf352e2ab4d9df0c068432140b13821ba057
Parents: c5c3571
Author: Sean Zhong <cl...@gmail.com>
Authored: Tue May 20 11:15:05 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Tue May 20 11:15:05 2014 +0800

----------------------------------------------------------------------
 .../src/jvm/backtype/storm/messaging/netty/StormServerHandler.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7b88cf35/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
index 8b93e31..bf9b79e 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
@@ -60,7 +60,7 @@ class StormServerHandler extends SimpleChannelUpstreamHandler  {
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
-        e.getCause().printStackTrace();
+        LOG.error("server errors in handling the request", e.getCause());
         server.closeChannel(e.getChannel());
     }
 }


[25/32] git commit: STORM-297: rename "worker.receiver.thread.count" to "topology.worker.receiver.thread.count"

Posted by bo...@apache.org.
STORM-297: rename "worker.receiver.thread.count" to "topology.worker.receiver.thread.count"


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/f6915d45
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/f6915d45
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/f6915d45

Branch: refs/heads/master
Commit: f6915d45a61593f95bf779558a32f807d583932c
Parents: 0bca173
Author: Sean Zhong <cl...@gmail.com>
Authored: Wed Jun 4 18:14:55 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Wed Jun 4 18:14:55 2014 +0800

----------------------------------------------------------------------
 conf/defaults.yaml                            | 2 +-
 storm-core/src/jvm/backtype/storm/Config.java | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f6915d45/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index a5c2c39..b33167d 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -95,7 +95,7 @@ worker.childopts: "-Xmx768m"
 worker.heartbeat.frequency.secs: 1
 
 # control how many worker receiver threads we need per worker 
-worker.receiver.thread.count: 1
+topology.worker.receiver.thread.count: 1
 
 task.heartbeat.frequency.secs: 3
 task.refresh.poll.secs: 10

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f6915d45/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index d069293..f808e55 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -477,7 +477,7 @@ public class Config extends HashMap<String, Object> {
     /**
      * control how many worker receiver threads we need per worker
      */
-    public static final String WORKER_RECEIVER_THREAD_COUNT = "worker.receiver.thread.count";
+    public static final String WORKER_RECEIVER_THREAD_COUNT = "topology.worker.receiver.thread.count";
     public static final Object WORKER_RECEIVER_THREAD_COUNT_SCHEMA = Number.class;
     
     /**


[22/32] git commit: STORM-297: fix UT regression, when topology.workers is not specified.

Posted by bo...@apache.org.
STORM-297: fix UT regression, when topology.workers is not specified.


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/20b4f8b2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/20b4f8b2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/20b4f8b2

Branch: refs/heads/master
Commit: 20b4f8b2195a1bf214f63e10b1bbca4690c0290f
Parents: 9f9a2a5
Author: Sean Zhong <cl...@gmail.com>
Authored: Wed May 21 01:15:55 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Wed May 21 01:15:55 2014 +0800

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/messaging/netty/Context.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/20b4f8b2/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
index 8f2b17f..f592aff 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
@@ -63,7 +63,7 @@ public class Context implements IContext {
                     Executors.newCachedThreadPool(workerFactory));
         }
         
-        int otherWorkers = Utils.getInt(storm_conf.get(Config.TOPOLOGY_WORKERS)) - 1;
+        int otherWorkers = Utils.getInt(storm_conf.get(Config.TOPOLOGY_WORKERS), 1) - 1;
         int poolSize = Math.min(Math.max(1, otherWorkers), MAX_CLIENT_SCHEDULER_THREAD_POOL_SIZE);
         clientScheduleService = Executors.newScheduledThreadPool(poolSize, new NettyRenameThreadFactory("client-schedule-service"));
     }


[12/32] git commit: STORM-297: use primitive type int instead of Integer

Posted by bo...@apache.org.
STORM-297: use primitive type int instead of Integer


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/fe68fd96
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/fe68fd96
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/fe68fd96

Branch: refs/heads/master
Commit: fe68fd968bdfab3d902f86956ae04bd3d46b444e
Parents: 6b9da08
Author: Sean Zhong <cl...@gmail.com>
Authored: Tue May 20 11:34:03 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Tue May 20 11:34:03 2014 +0800

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/messaging/netty/Client.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe68fd96/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 18582b4..00ca449 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -52,7 +52,7 @@ public class Client implements IConnection {
     private final int buffer_size;
     private boolean closing;
 
-    private Integer messageBatchSize;
+    private int messageBatchSize;
     
     private AtomicLong pendings;
 


[17/32] git commit: STORM-297: Use a shared pool for netty client flusher threads.

Posted by bo...@apache.org.
STORM-297: Use a shared pool for netty client flusher threads.


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/9eecaf89
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/9eecaf89
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/9eecaf89

Branch: refs/heads/master
Commit: 9eecaf890d06df8d94c73227498a572f4d4cc96d
Parents: baa3106
Author: Sean Zhong <cl...@gmail.com>
Authored: Tue May 20 20:46:11 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Tue May 20 20:46:11 2014 +0800

----------------------------------------------------------------------
 .../backtype/storm/messaging/netty/Client.java  | 32 ++++++++++++--------
 .../backtype/storm/messaging/netty/Context.java | 30 +++++++++++++++---
 2 files changed, 44 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/9eecaf89/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 59a8a5c..9765647 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -34,6 +34,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -59,10 +61,13 @@ public class Client implements IConnection {
     MessageBatch messageBatch = null;
     private AtomicLong flushCheckTimer;
     private int flushCheckInterval;
+    private ScheduledExecutorService scheduler;
 
     @SuppressWarnings("rawtypes")
-    Client(Map storm_conf, ChannelFactory factory, String host, int port) {
+    Client(Map storm_conf, ChannelFactory factory, 
+            ScheduledExecutorService scheduler, String host, int port) {
         this.factory = factory;
+        this.scheduler = scheduler;
         channelRef = new AtomicReference<Channel>(null);
         closing = false;
         pendings = new AtomicLong(0);
@@ -92,12 +97,18 @@ public class Client implements IConnection {
         // Start the connection attempt.
         remote_addr = new InetSocketAddress(host, port);
         
-        Thread flushChecker = new Thread(new Runnable() {
+        // setup the connection asyncly now
+        scheduler.execute(new Runnable() {
             @Override
-            public void run() {
-                //make sure we have a connection
+            public void run() {   
                 connect();
-                
+            }
+        });
+        
+        Runnable flusher = new Runnable() {
+            @Override
+            public void run() {
+
                 while(!closing) {
                     long flushCheckTime = flushCheckTimer.get();
                     long now = System.currentTimeMillis();
@@ -107,18 +118,13 @@ public class Client implements IConnection {
                             flush(channel);
                         }
                     }
-                    try {
-                        Thread.sleep(flushCheckInterval);
-                    } catch (InterruptedException e) {
-                        break;
-                    }
                 }
                 
             }
-        }, name() + "-flush-checker");
+        };
         
-        flushChecker.setDaemon(true);
-        flushChecker.start();
+        long initialDelay = Math.min(30L * 1000, max_sleep_ms * max_retries); //max wait for 30s
+        scheduler.scheduleWithFixedDelay(flusher, initialDelay, flushCheckInterval, TimeUnit.MILLISECONDS);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/9eecaf89/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
index 2e762ce..3f82e81 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
@@ -18,9 +18,12 @@
 package backtype.storm.messaging.netty;
 
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.Map;
 import java.util.Vector;
 
@@ -29,14 +32,16 @@ import backtype.storm.messaging.IConnection;
 import backtype.storm.messaging.IContext;
 import backtype.storm.utils.Utils;
 
-import java.util.Map;
-import java.util.Vector;
-
 public class Context implements IContext {
+    private static final Logger LOG = LoggerFactory.getLogger(Context.class);
+        
     @SuppressWarnings("rawtypes")
     private Map storm_conf;
     private volatile Vector<IConnection> connections;
     private NioClientSocketChannelFactory clientChannelFactory;
+    
+    private ScheduledExecutorService clientScheduleService;
+    private final int MAX_CLIENT_SCHEDULER_THREAD_POOL_SIZE = 10;
 
     /**
      * initialization per Storm configuration 
@@ -57,6 +62,10 @@ public class Context implements IContext {
             clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
                     Executors.newCachedThreadPool(workerFactory));
         }
+        
+        int workers = Utils.getInt(storm_conf.get(Config.TOPOLOGY_WORKERS));
+        int poolSize = Math.min(workers, MAX_CLIENT_SCHEDULER_THREAD_POOL_SIZE); 
+        clientScheduleService = Executors.newScheduledThreadPool(poolSize, new NettyRenameThreadFactory("client-schedule-service"));
     }
 
     /**
@@ -72,7 +81,8 @@ public class Context implements IContext {
      * establish a connection to a remote server
      */
     public IConnection connect(String storm_id, String host, int port) {        
-        IConnection client =  new Client(storm_conf, clientChannelFactory, host, port);
+        IConnection client =  new Client(storm_conf, clientChannelFactory, 
+                clientScheduleService, host, port);
         connections.add(client);
         return client;
     }
@@ -81,12 +91,22 @@ public class Context implements IContext {
      * terminate this context
      */
     public void term() {
+        clientScheduleService.shutdown();        
+        
         for (IConnection conn : connections) {
             conn.close();
         }
+        
+        try {
+            clientScheduleService.awaitTermination(30, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            LOG.error("Error when shutting down client scheduler", e);
+        }
+        
         connections = null;
 
         //we need to release resources associated with client channel factory
         clientChannelFactory.releaseExternalResources();
+
     }
 }